From bc1350318fc28f23b22a07d2e41c5e00c504ddaa Mon Sep 17 00:00:00 2001 From: Helius <wangdoubleone@gmail.com> Date: Thu, 17 Jun 2021 14:24:08 +0800 Subject: [PATCH] modify --- src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 132 +++++++++++++++++++++++--------------------- 1 files changed, 69 insertions(+), 63 deletions(-) diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java index df5900f..76dda01 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; @@ -15,6 +16,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -24,11 +26,11 @@ import java.util.Set; /** - * @author wzy - * @date 2020-05-25 - **/ + * websocket 只能后台撮合交易那台开启 + */ @Slf4j @Component +@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true") public class ExchangeConsumer { @Resource @@ -49,8 +51,8 @@ */ @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) public void tradePlate(String content) { - //log.info("#盘口信息消费者---->{}#", content); - tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null); + //log.info("--发送盘口消息--"); + tradePlateSendWebSocket.sendMessagePlate("BZZ/USDT",content,null); } /** @@ -61,6 +63,9 @@ public void handleTradeExchange(String content) { // log.info("#处理订单---->{}#", content); List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); + if(CollectionUtils.isEmpty(exchangeTrades)){ + return; + } // 去掉空的 暂时这样 Iterator<ExchangeTrade> iterator = exchangeTrades.iterator(); while (iterator.hasNext()){ @@ -71,68 +76,69 @@ if(CollectionUtils.isEmpty(exchangeTrades)){ return; } - // 处理K线 并更新最新价 - handleKlineService.handleExchangeOrderToKline(exchangeTrades); - // 推送最新K线 - String symbol = exchangeTrades.get(0).getSymbol(); - String symbolUsdt = symbol; - if(!symbol.contains("USDT")){ - symbolUsdt = symbol+"/USDT"; - } - String key = "NEW_KINE_{}"; - key = StrUtil.format(key, symbolUsdt); - Object o = redisUtils.get(key); - Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; - Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); - - - for(Map.Entry<String, Candlestick> map : entries){ - String ch = "market.{}.kline.{}"; - Candlestick value = map.getValue(); - String key1 = map.getKey(); - String chKey = key1; - if(key1.equals("1hour")){ - chKey = "60min"; - } - // 转换 - NewCandlestick newCandlestick= new NewCandlestick(); - String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt); - ch = StrUtil.format(ch, nekkusdt,chKey); - newCandlestick.setCh(ch); - CandlestickModel model = new CandlestickModel(); - model.setVol(value.getVolume()); - model.setLow(value.getLow()); - model.setOpen(value.getOpen()); - model.setHigh(value.getHigh()); - model.setCount(value.getCount()); - model.setAmount(value.getAmount()); - model.setId(value.getTimestamp()/1000); - model.setTimestamp(value.getTimestamp()/1000); - model.setClose(value.getClose()); - newCandlestick.setTick(model); - tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); - } - // 处理用户订单 + // 先处理处理用户订单 orderCoinService.handleOrder(exchangeTrades); + try{ + // 处理K线 并更新最新价 + handleKlineService.handleExchangeOrderToKline(exchangeTrades); + // 推送最新K线 + String symbol = exchangeTrades.get(0).getSymbol(); + String symbolUsdt = symbol; + if(!symbol.contains("USDT")){ + symbolUsdt = symbol+"/USDT"; + } + String key = "NEW_KINE_{}"; + key = StrUtil.format(key, symbolUsdt); + Object o = redisUtils.get(key); + Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; + Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); + + + for(Map.Entry<String, Candlestick> map : entries){ + String ch = "market.{}.kline.{}"; + Candlestick value = map.getValue(); + String key1 = map.getKey(); + String chKey = key1; + if(key1.equals("1hour")){ + chKey = "60min"; + } + // 转换 + NewCandlestick newCandlestick= new NewCandlestick(); + String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt); + ch = StrUtil.format(ch, nekkusdt,chKey); + newCandlestick.setCh(ch); + CandlestickModel model = new CandlestickModel(); + model.setVol(value.getVolume()); + model.setLow(value.getLow()); + model.setOpen(value.getOpen()); + model.setHigh(value.getHigh()); + model.setCount(value.getCount()); + model.setAmount(value.getAmount()); + model.setId(value.getTimestamp()/1000); + model.setTimestamp(value.getTimestamp()/1000); + model.setClose(value.getClose()); + newCandlestick.setTick(model); + tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); + } + + }catch (Exception e){ + e.printStackTrace(); + } + } /** - * 更新最新K线 + * 撮合交易订单全部完成 * @param content */ -// @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) -// public void newKling(String content) { -// log.info("#---->{}#", content); -// // 最新K线的币种 -// String key = "NEW_KINE_{}"; -// key = StrUtil.format(key, content); -// Object o = redisUtils.get(key); -// Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o; -// // 推送最新K线 -// Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); -// for(Map.Entry<String, Candlestick> map : entries){ -// tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null); -// } -// -// } + @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE) + public void doComplete(String content) { + log.debug("#完成的订单---->{}#", content); + List<OrderCoinsEntity> exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class); + if(CollectionUtils.isEmpty(exchangeTrades)){ + return; + } + orderCoinService.completeOrder(exchangeTrades); + } + } -- Gitblit v1.9.1