From d174d6963d62b3bd176f9e7ba3cf0d7f75a91b69 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Wed, 16 Sep 2020 16:03:22 +0800
Subject: [PATCH] 撮合交易代码提交

---
 src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java |  109 +++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 87 insertions(+), 22 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
index bb8bcb3..15d4168 100644
--- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
+++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -2,8 +2,11 @@
 
 import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.model.Candlestick;
 import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.trade.CoinTraderFactory;
 import com.xcong.excoin.utils.CoinTypeConvert;
 import com.xcong.excoin.utils.RedisUtils;
 import com.xcong.excoin.utils.SpringContextHolder;
@@ -15,9 +18,8 @@
 import javax.websocket.*;
 import javax.websocket.server.PathParam;
 import javax.websocket.server.ServerEndpoint;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
+import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -53,12 +55,19 @@
     @OnClose
     public void onClose(Session session) {
         onlineCount.decrementAndGet(); // 在线数减1
-//        Collection<Map<String, Session>> values = tradeplateClients.values();
-//        if(CollectionUtils.isNotEmpty(values)){
-//            for(Map<String,Session> map : values){
-//                map.remove(session.getId());
-//            }
-//        }
+        Collection<Map<String, Session>> values = tradeplateClients.values();
+        if(CollectionUtils.isNotEmpty(values)){
+            for(Map<String,Session> map : values){
+                map.remove(session.getId());
+            }
+        }
+
+        Collection<Map<String, Session>> klineClientsValues = klineClients.values();
+        if(CollectionUtils.isNotEmpty(klineClientsValues)){
+            for(Map<String,Session> map : klineClientsValues){
+                map.remove(session.getId());
+            }
+        }
         log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
     }
 
@@ -69,13 +78,13 @@
      */
     @OnMessage
     public void onMessage(String message, Session session) {
-        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
+        //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
         // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
         //}
         JSONObject jsonObject = JSON.parseObject(message);
         // 盘口的判断
         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
-            log.info("订阅盘口消息:{}", session.getId());
+            //log.info("订阅盘口消息:{}", session.getId());
             String sub = jsonObject.get("sub").toString();
             String symbol = sub.split("\\.")[1];
             symbol = CoinTypeConvert.convert(symbol);
@@ -86,11 +95,34 @@
                 map.put(session.getId(), session);
                 tradeplateClients.put(symbol, map);
             }
+            // 发送一次盘口
+            CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class);
+            // 发送订阅消息
+            String nekk = factory.getTrader("NEKK").sendTradePlateMessage();
+            SubResultModel subResultModel = new SubResultModel();
+            subResultModel.setId("nekkusdt");
+            subResultModel.setSubbed(sub);
+            synchronized (session){
+                try {
+                    session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel));
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+            }
+           synchronized (session){
+               try {
+                   session.getBasicRemote().sendText(nekk);
+               } catch (IOException e) {
+                   e.printStackTrace();
+               }
+           }
+
+
         }
         // 取消盘口订阅
         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
             // `market.${symbol}.kline.${strPeriod}
-            log.info("取消订阅盘口消息:{}", session.getId());
+            //log.info("取消订阅盘口消息:{}", session.getId());
             String unsub = jsonObject.get("unsub").toString();
             String[] split = unsub.split("\\.");
             String symbol = split[1];
@@ -112,38 +144,48 @@
         // 取消订阅 {unsub: xxx(标识)}
         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
             // 订阅
-            log.info("订阅最新K线消息:{}", session.getId());
             String sub = jsonObject.get("sub").toString();
+            log.info("订阅最新K线消息:{}", sub);
             String[] split = sub.split("\\.");
             String symbol = split[1];
             symbol = CoinTypeConvert.convert(symbol);
             String period = split[3];
+            if("60min".equals(period)){
+                period = "1hour";
+            }
             String key = symbol + "-" + period;
+            log.info("最新K线key:{}", key);
             if (klineClients.containsKey(key)) {
                 // 有这个币种K线
                 Map<String, Session> stringSessionMap = klineClients.get(key);
                 if (!stringSessionMap.containsKey(session.getId())) {
                     stringSessionMap.put(session.getId(), session);
+                    log.info("放入最新K线Map:{}", key);
                 }
             } else {
                 Map<String, Session> stringSessionMap = new HashMap<>();
                 stringSessionMap.put(session.getId(), session);
                 klineClients.put(key, stringSessionMap);
+                log.info("放入最新K线Map:{}", key);
             }
         }
 
         // 取消订阅
         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
             // `market.${symbol}.kline.${strPeriod}
-            log.info("取消订阅最新K消息:{}", session.getId());
+            //log.info("取消订阅最新K消息:{}", session.getId());
             String unsub = jsonObject.get("unsub").toString();
             String[] split = unsub.split("\\.");
             String strPeriod = split[3];
+            if("60min".equals(strPeriod)){
+                strPeriod = "1hour";
+            }
             String symbol = split[1];
             symbol = CoinTypeConvert.convert(symbol);
             String key = symbol + "-" + strPeriod;
             if (klineClients.containsKey(key)) {
                 klineClients.get(key).remove(session.getId());
+                //session.getAsyncRemote().sendText(message);
             }
         }
 
@@ -155,14 +197,38 @@
             String symbol = split[1];
             symbol = CoinTypeConvert.convert(symbol);
             String period = split[3];
+            if("60min".equals(period)){
+                period = "1hour";
+            }
             //String key = symbol+"-"+period;
             // String key = "KINE_BCH/USDT_1week";
             String key = "KINE_{}_{}";
             // 币币k线数据
-            key = StrUtil.format(key, symbol, period);
+            //key = StrUtil.format(key, symbol, period);
+            key = StrUtil.format(key, "NEKK/USDT", period);
             RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
             Object o = bean.get(key);
-            sendMessageHistory(JSON.toJSONString(o), session);
+            List<CandlestickModel> candlestickModels = new ArrayList<>();
+            CandlestickResult result = new CandlestickResult();
+            result.setRep(sub);
+            if(o!=null){
+                List<Candlestick> list = (List<Candlestick>)o;
+                for(Candlestick candlestick : list){
+                    CandlestickModel model = new CandlestickModel();
+                    model.setAmount(candlestick.getAmount());
+                    model.setClose(candlestick.getClose());
+                    model.setCount(candlestick.getCount());
+                    model.setHigh(candlestick.getHigh());
+                    model.setId(candlestick.getTimestamp()/1000);
+                    model.setOpen(candlestick.getOpen());
+                    model.setLow(candlestick.getLow());
+                    model.setVol(candlestick.getVolume());
+                    candlestickModels.add(model);
+                }
+                result.setData(candlestickModels);
+            }
+
+            sendMessageHistory(JSON.toJSONString(result), session);
         }
     }
 
@@ -184,7 +250,7 @@
                 Session toSession = sessionEntry.getValue();
                 // 排除掉自己
                 //if (!fromSession.getId().equals(toSession.getId())) {
-                log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
+               // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
                 boolean open = toSession.isOpen();
                 if (open) {
                     toSession.getAsyncRemote().sendText(message);
@@ -197,7 +263,7 @@
     }
 
     public void sendMessageHistory(String message, Session toSession) {
-        log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+      //  log.info("服务端给客户端[{}]发送历史K线", toSession.getId());
         boolean open = toSession.isOpen();
         if (open) {
             toSession.getAsyncRemote().sendText(message);
@@ -205,19 +271,18 @@
     }
 
     public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
+
         String key = symbol + "-" + period;
+        //log.info("发送最新K线[{}],数据[{}]",key,message);
         if (klineClients.containsKey(key)) {
             Map<String, Session> stringSessionMap = klineClients.get(key);
             for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
                 Session toSession = sessionEntry.getValue();
-                // 排除掉自己
-                //if (!fromSession.getId().equals(toSession.getId())) {
-                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                 boolean open = toSession.isOpen();
                 if (open) {
+                    log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message);
                     toSession.getAsyncRemote().sendText(message);
                 }
-                //  }
             }
         }
     }

--
Gitblit v1.9.1