| package com.xcong.excoin.rabbit.consumer; | 
|   | 
| import cn.hutool.core.util.StrUtil; | 
| import com.alibaba.fastjson.JSONObject; | 
| import com.huobi.client.model.Candlestick; | 
| import com.xcong.excoin.configurations.RabbitMqConfig; | 
| import com.xcong.excoin.modules.coin.service.OrderCoinService; | 
| import com.xcong.excoin.modules.exchange.service.HandleKlineService; | 
| import com.xcong.excoin.trade.ExchangeTrade; | 
| import com.xcong.excoin.utils.CoinTypeConvert; | 
| import com.xcong.excoin.utils.RedisUtils; | 
| import com.xcong.excoin.websocket.CandlestickModel; | 
| import com.xcong.excoin.websocket.NewCandlestick; | 
| import com.xcong.excoin.websocket.TradePlateSendWebSocket; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.apache.commons.collections.CollectionUtils; | 
| import org.springframework.amqp.rabbit.annotation.RabbitListener; | 
| import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; | 
| import org.springframework.stereotype.Component; | 
|   | 
| import javax.annotation.Resource; | 
| import java.util.Iterator; | 
| import java.util.List; | 
| import java.util.Map; | 
| import java.util.Set; | 
|   | 
| /** | 
|  *  websocket 只能后台撮合交易那台开启 | 
|  */ | 
| @Slf4j | 
| @Component | 
| @ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true") | 
| public class ExchangeConsumer { | 
|   | 
|     @Resource | 
|     private TradePlateSendWebSocket tradePlateSendWebSocket; | 
|   | 
|     @Resource | 
|     private RedisUtils redisUtils; | 
|   | 
|     @Resource | 
|     private HandleKlineService handleKlineService; | 
|   | 
|     @Resource | 
|     private OrderCoinService orderCoinService; | 
|   | 
|     /** | 
|      *  发送盘口信息 | 
|      * @param content | 
|      */ | 
|     @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) | 
|     public void tradePlate(String content) { | 
|         tradePlateSendWebSocket.sendMessagePlate("ROC/USDT",content,null); | 
|     } | 
|   | 
|     /** | 
|      *  处理订单 | 
|      * @param content | 
|      */ | 
|     @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE) | 
|     public void handleTradeExchange(String content) { | 
|        // log.info("#处理订单---->{}#", content); | 
|         List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); | 
|         // 去掉空的  暂时这样 | 
|         Iterator<ExchangeTrade> iterator = exchangeTrades.iterator(); | 
|         while (iterator.hasNext()){ | 
|             if(iterator.next()==null){ | 
|                 iterator.remove(); | 
|             } | 
|         } | 
|         if(CollectionUtils.isEmpty(exchangeTrades)){ | 
|             return; | 
|         } | 
|         // 处理K线 并更新最新价 | 
|         handleKlineService.handleExchangeOrderToKline(exchangeTrades); | 
|         // 推送最新K线 | 
|         String symbol = exchangeTrades.get(0).getSymbol(); | 
|         String symbolUsdt = symbol; | 
|         if(!symbol.contains("USDT")){ | 
|             symbolUsdt = symbol+"/USDT"; | 
|         } | 
|         String key = "NEW_KINE_{}"; | 
|         key = StrUtil.format(key, symbolUsdt); | 
|         Object o = redisUtils.get(key); | 
|         Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; | 
|         Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); | 
|   | 
|   | 
|         for(Map.Entry<String, Candlestick> map : entries){ | 
|             String ch = "market.{}.kline.{}"; | 
|             Candlestick value = map.getValue(); | 
|             String key1 = map.getKey(); | 
|             String chKey = key1; | 
|             if(key1.equals("1hour")){ | 
|                 chKey = "60min"; | 
|             } | 
|             // 转换 | 
|             NewCandlestick newCandlestick= new NewCandlestick(); | 
|             String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt); | 
|             ch = StrUtil.format(ch, nekkusdt,chKey); | 
|             newCandlestick.setCh(ch); | 
|             CandlestickModel model = new CandlestickModel(); | 
|             model.setVol(value.getVolume()); | 
|             model.setLow(value.getLow()); | 
|             model.setOpen(value.getOpen()); | 
|             model.setHigh(value.getHigh()); | 
|             model.setCount(value.getCount()); | 
|             model.setAmount(value.getAmount()); | 
|             model.setId(value.getTimestamp()/1000); | 
|             model.setTimestamp(value.getTimestamp()/1000); | 
|             model.setClose(value.getClose()); | 
|             newCandlestick.setTick(model); | 
|             tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); | 
|         } | 
|         // 处理用户订单 | 
|         orderCoinService.handleOrder(exchangeTrades); | 
|     } | 
|   | 
| } |