package com.xcong.excoin.rabbit.consumer; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.websocket.CandlestickModel; import com.xcong.excoin.websocket.NewCandlestick; import com.xcong.excoin.websocket.TradePlateSendWebSocket; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; /** * websocket 只能后台撮合交易那台开启 */ @Slf4j @Component @ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true") public class ExchangeConsumer { @Resource private TradePlateSendWebSocket tradePlateSendWebSocket; @Resource private RedisUtils redisUtils; @Resource private HandleKlineService handleKlineService; @Resource private OrderCoinService orderCoinService; /** * 发送盘口信息 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) public void tradePlate(String content) { tradePlateSendWebSocket.sendMessagePlate("ROC/USDT",content,null); } /** * 处理订单 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE) public void handleTradeExchange(String content) { // log.info("#处理订单---->{}#", content); List exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); // 去掉空的 暂时这样 Iterator iterator = exchangeTrades.iterator(); while (iterator.hasNext()){ if(iterator.next()==null){ iterator.remove(); } } if(CollectionUtils.isEmpty(exchangeTrades)){ return; } // 先处理处理用户订单 orderCoinService.handleOrder(exchangeTrades); // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 String symbol = exchangeTrades.get(0).getSymbol(); String symbolUsdt = symbol; if(!symbol.contains("USDT")){ symbolUsdt = symbol+"/USDT"; } String key = "NEW_KINE_{}"; key = StrUtil.format(key, symbolUsdt); Object o = redisUtils.get(key); Map currentKlineMap = (Map )o; Set> entries = currentKlineMap.entrySet(); for(Map.Entry map : entries){ String ch = "market.{}.kline.{}"; Candlestick value = map.getValue(); String key1 = map.getKey(); String chKey = key1; if(key1.equals("1hour")){ chKey = "60min"; } // 转换 NewCandlestick newCandlestick= new NewCandlestick(); String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt); ch = StrUtil.format(ch, nekkusdt,chKey); newCandlestick.setCh(ch); CandlestickModel model = new CandlestickModel(); model.setVol(value.getVolume()); model.setLow(value.getLow()); model.setOpen(value.getOpen()); model.setHigh(value.getHigh()); model.setCount(value.getCount()); model.setAmount(value.getAmount()); model.setId(value.getTimestamp()/1000); model.setTimestamp(value.getTimestamp()/1000); model.setClose(value.getClose()); newCandlestick.setTick(model); tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); } } /** * 撮合交易订单全部完成 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE) public void doComplete(String content) { log.debug("#完成的订单---->{}#", content); List exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class); if(CollectionUtils.isEmpty(exchangeTrades)){ return; } orderCoinService.completeOrder(exchangeTrades); } }