From f6b431207b7d5d0f87e49f3b379e465734897ccf Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Tue, 29 Sep 2020 17:25:16 +0800 Subject: [PATCH] 配置修改 --- src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 47 +++++++++++++++++++++++++++++++++++++++++++---- 1 files changed, 43 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java index 9b03546..384f1c2 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java @@ -7,13 +7,18 @@ import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; +import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; +import com.xcong.excoin.websocket.CandlestickModel; +import com.xcong.excoin.websocket.NewCandlestick; import com.xcong.excoin.websocket.TradePlateSendWebSocket; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,8 +49,7 @@ */ @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) public void tradePlate(String content) { - log.info("#---->{}#", content); - tradePlateSendWebSocket.sendMessagePlate(content,null); + tradePlateSendWebSocket.sendMessagePlate("CPV/USDT",content,null); } /** @@ -54,8 +58,18 @@ */ @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE) public void handleTradeExchange(String content) { - log.info("#---->{}#", content); + // log.info("#处理订单---->{}#", content); List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); + // 去掉空的 暂时这样 + Iterator<ExchangeTrade> iterator = exchangeTrades.iterator(); + while (iterator.hasNext()){ + if(iterator.next()==null){ + iterator.remove(); + } + } + if(CollectionUtils.isEmpty(exchangeTrades)){ + return; + } // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 @@ -69,8 +83,33 @@ Object o = redisUtils.get(key); Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); + + for(Map.Entry<String, Candlestick> map : entries){ - tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null); + String ch = "market.{}.kline.{}"; + Candlestick value = map.getValue(); + String key1 = map.getKey(); + String chKey = key1; + if(key1.equals("1hour")){ + chKey = "60min"; + } + // 转换 + NewCandlestick newCandlestick= new NewCandlestick(); + String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt); + ch = StrUtil.format(ch, nekkusdt,chKey); + newCandlestick.setCh(ch); + CandlestickModel model = new CandlestickModel(); + model.setVol(value.getVolume()); + model.setLow(value.getLow()); + model.setOpen(value.getOpen()); + model.setHigh(value.getHigh()); + model.setCount(value.getCount()); + model.setAmount(value.getAmount()); + model.setId(value.getTimestamp()/1000); + model.setTimestamp(value.getTimestamp()/1000); + model.setClose(value.getClose()); + newCandlestick.setTick(model); + tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); } // 处理用户订单 orderCoinService.handleOrder(exchangeTrades); -- Gitblit v1.9.1