package com.xcong.excoin.rabbit.consumer; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.websocket.CandlestickModel; import com.xcong.excoin.websocket.NewCandlestick; import com.xcong.excoin.websocket.TradePlateSendWebSocket; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; /** * @author wzy * @date 2020-05-25 **/ @Slf4j @Component public class ExchangeConsumer { @Resource private TradePlateSendWebSocket tradePlateSendWebSocket; @Resource private RedisUtils redisUtils; @Resource private HandleKlineService handleKlineService; @Resource private OrderCoinService orderCoinService; /** * 发送盘口信息 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) public void tradePlate(String content) { tradePlateSendWebSocket.sendMessagePlate("CPV/USDT",content,null); } /** * 处理订单 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE) public void handleTradeExchange(String content) { // log.info("#处理订单---->{}#", content); List exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); // 去掉空的 暂时这样 Iterator iterator = exchangeTrades.iterator(); while (iterator.hasNext()){ if(iterator.next()==null){ iterator.remove(); } } if(CollectionUtils.isEmpty(exchangeTrades)){ return; } // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 String symbol = exchangeTrades.get(0).getSymbol(); String symbolUsdt = symbol; if(!symbol.contains("USDT")){ symbolUsdt = symbol+"/USDT"; } String key = "NEW_KINE_{}"; key = StrUtil.format(key, symbolUsdt); Object o = redisUtils.get(key); Map currentKlineMap = (Map )o; Set> entries = currentKlineMap.entrySet(); for(Map.Entry map : entries){ String ch = "market.{}.kline.{}"; Candlestick value = map.getValue(); String key1 = map.getKey(); String chKey = key1; if(key1.equals("1hour")){ chKey = "60min"; } // 转换 NewCandlestick newCandlestick= new NewCandlestick(); String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt); ch = StrUtil.format(ch, nekkusdt,chKey); newCandlestick.setCh(ch); CandlestickModel model = new CandlestickModel(); model.setVol(value.getVolume()); model.setLow(value.getLow()); model.setOpen(value.getOpen()); model.setHigh(value.getHigh()); model.setCount(value.getCount()); model.setAmount(value.getAmount()); model.setId(value.getTimestamp()/1000); model.setTimestamp(value.getTimestamp()/1000); model.setClose(value.getClose()); newCandlestick.setTick(model); tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); } // 处理用户订单 orderCoinService.handleOrder(exchangeTrades); } /** * 更新最新K线 * @param content */ // @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) // public void newKling(String content) { // log.info("#---->{}#", content); // // 最新K线的币种 // String key = "NEW_KINE_{}"; // key = StrUtil.format(key, content); // Object o = redisUtils.get(key); // Map currentKlineMap = (Map)o; // // 推送最新K线 // Set> entries = currentKlineMap.entrySet(); // for(Map.Entry map : entries){ // tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null); // } // // } }