From 74ca5bc0f40e3b91464c8972392271d24dd5f066 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Mon, 14 Sep 2020 11:05:48 +0800
Subject: [PATCH] 撮合交易代码提交

---
 src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java                       |    4 +
 src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java       |    8 +++-
 src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java |   11 ++++-
 src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java                    |   12 ++++--
 src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml                              |   18 +++++++++
 src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java                          |   10 ++++-
 src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java                         |   17 ++++----
 src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java                     |   12 +++++-
 src/main/java/com/xcong/excoin/trade/CoinTrader.java                                     |   10 ++--
 9 files changed, 76 insertions(+), 26 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
index beed401..12f59a0 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
@@ -15,5 +15,7 @@
 
 	OrderCoinsEntity findWalletCoinOrderByOrderNo(@Param("orderNo")String orderNo);
 
-	List<OrderCoinsEntity> selectAllEntrustingCoinOrderList();
+	List<OrderCoinsEntity> selectAllEntrustingCoinOrderList(List<String> list);
+
+	List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list);
 }
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
index 200f4a7..07a9886 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -744,7 +744,9 @@
     @Override
     @Transactional(rollbackFor = Exception.class)
     public void dealEntrustCoinOrder() {
-        List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList();
+        List<String> ignoreTypes = new ArrayList<>();
+        ignoreTypes.add("NEKK");
+        List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList(ignoreTypes);
         if (CollUtil.isNotEmpty(list)) {
             for (OrderCoinsEntity orderCoinsEntity : list) {
                 BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(orderCoinsEntity.getSymbol() + "/USDT")));
@@ -813,7 +815,9 @@
     public void handleOrder(List<ExchangeTrade> trades){
         // 处理撮合交易的订单
         for(ExchangeTrade exchangeTrade : trades){
-
+            if(exchangeTrade==null){
+                continue;
+            }
             BigDecimal amount = exchangeTrade.getAmount();
             Long buyOrderId = exchangeTrade.getBuyOrderId();
             BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
index 7add4f8..e61a588 100644
--- a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
@@ -39,10 +39,14 @@
         CoinProcessor processor = processorFactory.getProcessor(symbol);
         Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap();
         Collection<Candlestick> values = currentKlineMap.values();
-        BigDecimal newPrice = trades.get(trades.size()-1).getPrice();
+        BigDecimal newPrice =null;
         for (Candlestick candlestick : values) {
             for (ExchangeTrade exchangeTrade : trades) {
+                if(exchangeTrade==null){
+                    continue;
+                }
                 processor.processTrade(candlestick, exchangeTrade);
+                newPrice=exchangeTrade.getPrice();
             }
         }
 
@@ -51,6 +55,9 @@
         key = StrUtil.format(key, symbolUsdt);
         redisUtils.set(key,currentKlineMap);
         // 更新最新价
-        redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
+        if(newPrice!=null){
+            redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
+        }
+
     }
 }
diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
index 90a3e9c..91a81fd 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -14,6 +14,7 @@
 import com.xcong.excoin.processor.DefaultCoinProcessor;
 import com.xcong.excoin.processor.MarketService;
 import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
+import com.xcong.excoin.rabbit.producer.ExchangeProducer;
 import com.xcong.excoin.trade.CoinTrader;
 import com.xcong.excoin.trade.CoinTraderFactory;
 import com.xcong.excoin.utils.CoinTypeConvert;
@@ -56,12 +57,15 @@
     @Resource
     private CoinProcessorFactory processorFactory;
 
+    @Resource
+    ExchangeProducer exchangeProducer;
+
     @PostConstruct
     public void initCoinTrade() {
         log.info("#=======撮合交易器开启=======#");
         String symbol = "NEKK";
         CoinTrader newTrader = new CoinTrader(symbol);
-        newTrader.setOrderCoinService(coinService);
+        newTrader.setExchangeProducer(exchangeProducer);
         //newTrader.setKafkaTemplate(kafkaTemplate);
         //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
         //newTrader.setCoinScale(coin.getCoinScale());
@@ -70,7 +74,9 @@
 
         // 创建成功以后需要对未处理订单预处理
         log.info("======CoinTrader Process: " + symbol + "======");
-        List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
+        List<String> symbolList = new ArrayList<>();
+        symbolList.add(symbol);
+        List<OrderCoinsEntity> orders = orderCoinsDao.selectCoinOrderOnTrade(symbolList);
         List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
         List<OrderCoinsEntity> completedOrders = new ArrayList<>();
         orders.forEach(order -> {
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
index 110c1fc..946af01 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -33,9 +33,9 @@
 	@Resource
 	private RedisUtils redisUtils;
 
-	@Scheduled(cron = "0/2 * * * * *")
+	@Scheduled(cron = "0/10 * * * * *")
     public void tradePlate(){
-		redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()));
+		redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20));
 		Candlestick candlestick = new Candlestick();
 		candlestick.setOpen(new BigDecimal("10.33"));
 		candlestick.setHigh(new BigDecimal("15.23"));
@@ -46,7 +46,7 @@
 		candlestick.setId(1599840000L);
 		candlestick.setCount(100002);
 		candlestick.setClose(new BigDecimal("12.2323"));
-		redisUtils.set("NEKK/USDT",candlestick);
+		//redisUtils.set("NEKK/USDT",candlestick);
 		// [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
 		TradePlateModel tradePlateModel = new TradePlateModel();
 		List<BigDecimal> buy;
@@ -61,11 +61,11 @@
 			sell.add(new BigDecimal(Math.random()*i*2));
 			tradePlateModel.getSell().add(sell);
 		}
-
-		plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null);
-		plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
-		plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
-		plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+        log.info("准备发送消息");
+		plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null);
+		plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
+		plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+		plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
 	}
 
     /**
@@ -156,4 +156,5 @@
         	}
         });
     }
+
 }
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
index 9b03546..9c9793a 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -14,6 +14,7 @@
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -44,8 +45,8 @@
      */
     @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
     public void tradePlate(String content) {
-        log.info("#---->{}#", content);
-        tradePlateSendWebSocket.sendMessagePlate(content,null);
+        log.info("#盘口信息消费者---->{}#", content);
+        tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null);
     }
 
     /**
@@ -56,6 +57,13 @@
     public void handleTradeExchange(String content) {
         log.info("#---->{}#", content);
         List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
+        // 去掉空的  暂时这样
+        Iterator<ExchangeTrade> iterator = exchangeTrades.iterator();
+        while (iterator.hasNext()){
+            if(iterator.next()==null){
+                iterator.remove();
+            }
+        }
         // 处理K线 并更新最新价
         handleKlineService.handleExchangeOrderToKline(exchangeTrades);
         // 推送最新K线
diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
index 46289af..ca79fe9 100644
--- a/src/main/java/com/xcong/excoin/trade/CoinTrader.java
+++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -17,7 +17,6 @@
 public class CoinTrader {
     private String symbol;
     private ExchangeProducer exchangeProducer;
-    private OrderCoinService orderCoinService;
     //交易币种的精度
     private int coinScale = 4;
     //基币的精度
@@ -752,11 +751,12 @@
         return count;
     }
 
-    public OrderCoinService getOrderCoinService() {
-        return orderCoinService;
+
+    public ExchangeProducer getExchangeProducer() {
+        return exchangeProducer;
     }
 
-    public void setOrderCoinService(OrderCoinService orderCoinService) {
-        this.orderCoinService = orderCoinService;
+    public void setExchangeProducer(ExchangeProducer exchangeProducer) {
+        this.exchangeProducer = exchangeProducer;
     }
 }
diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
index b955f11..bb8bcb3 100644
--- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
+++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -75,6 +75,7 @@
         JSONObject jsonObject = JSON.parseObject(message);
         // 盘口的判断
         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
+            log.info("订阅盘口消息:{}", session.getId());
             String sub = jsonObject.get("sub").toString();
             String symbol = sub.split("\\.")[1];
             symbol = CoinTypeConvert.convert(symbol);
@@ -89,6 +90,7 @@
         // 取消盘口订阅
         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
             // `market.${symbol}.kline.${strPeriod}
+            log.info("取消订阅盘口消息:{}", session.getId());
             String unsub = jsonObject.get("unsub").toString();
             String[] split = unsub.split("\\.");
             String symbol = split[1];
@@ -110,6 +112,7 @@
         // 取消订阅 {unsub: xxx(标识)}
         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
             // 订阅
+            log.info("订阅最新K线消息:{}", session.getId());
             String sub = jsonObject.get("sub").toString();
             String[] split = sub.split("\\.");
             String symbol = split[1];
@@ -132,6 +135,7 @@
         // 取消订阅
         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
             // `market.${symbol}.kline.${strPeriod}
+            log.info("取消订阅最新K消息:{}", session.getId());
             String unsub = jsonObject.get("unsub").toString();
             String[] split = unsub.split("\\.");
             String strPeriod = split[3];
@@ -173,14 +177,14 @@
      *
      * @param message 消息内容
      */
-    public void sendMessagePlate(String message, Session fromSession) {
-        if (tradeplateClients.containsKey("nekkusdt")) {
-            Map<String, Session> nekk = tradeplateClients.get("nekkusdt");
+    public void sendMessagePlate(String symbol,String message, Session fromSession) {
+        if (tradeplateClients.containsKey(symbol)) {
+            Map<String, Session> nekk = tradeplateClients.get(symbol);
             for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
                 Session toSession = sessionEntry.getValue();
                 // 排除掉自己
                 //if (!fromSession.getId().equals(toSession.getId())) {
-                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+                log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
                 boolean open = toSession.isOpen();
                 if (open) {
                     toSession.getAsyncRemote().sendText(message);
diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
index b844a93..aa02fee 100644
--- a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
+++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
@@ -25,5 +25,23 @@
 		select *
 		from coins_order
 		where order_status=1
+		<if test="list != null">
+			and symbol not in
+			<foreach collection="list" separator="," item="item" open="(" close=")">
+				#{item}
+			</foreach>
+		</if>
+	</select>
+
+	<select id="selectCoinOrderOnTrade" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
+		select *
+		from coins_order
+		where order_status=1
+		<if test="list != null">
+			and symbol in
+			<foreach collection="list" separator="," item="item" open="(" close=")">
+				#{item}
+			</foreach>
+		</if>
 	</select>
 </mapper>

--
Gitblit v1.9.1