From 5285d249e702eeab3ad3847aaa24523ef0f1a544 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Wed, 24 Feb 2021 18:11:06 +0800
Subject: [PATCH] add websocket

---
 src/main/java/com/xcong/excoin/netty/common/ChannelManager.java          |   82 ++++++++++++++++
 src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java              |   47 +++++++++
 src/main/java/com/xcong/excoin/netty/bean/SubRequest.java                |   18 +++
 src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java              |   18 +++
 src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java           |   44 +++++++-
 src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java             |   21 ++++
 src/main/java/com/xcong/excoin/netty/bean/SubResponse.java               |   21 ++++
 src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java |    5 
 8 files changed, 245 insertions(+), 11 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java b/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java
new file mode 100644
index 0000000..39699b2
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java
@@ -0,0 +1,18 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class SubRequest implements Serializable {
+    private static final long serialVersionUID = -5661972007013485438L;
+    private String sub;
+
+    private String id;
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java b/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java
new file mode 100644
index 0000000..845b9d5
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java
@@ -0,0 +1,21 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class SubResponse implements Serializable {
+
+    private String id;
+
+    private String status;
+
+    private String subbed;
+
+    private long ts;
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java b/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java
new file mode 100644
index 0000000..2515065
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java
@@ -0,0 +1,18 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class UnSubRequest implements Serializable {
+
+    private static final long serialVersionUID = -7082425849051404678L;
+    private String unsub;
+
+    private String id;
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java b/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java
new file mode 100644
index 0000000..313939d
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java
@@ -0,0 +1,21 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class UnSubResponse implements Serializable {
+
+    private String id;
+
+    private String status;
+
+    private String subbed;
+
+    private long ts;
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 5d158f6..3937137 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -8,6 +8,8 @@
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -19,6 +21,12 @@
 public class ChannelManager {
 
     private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+    private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
+
+    private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
+
+    private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
 
     // 当前连接到服务器的通道(tcp和websocket)
     private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
@@ -52,5 +60,79 @@
         WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
     }
 
+    public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+        switch (type) {
+            case "kline" :
+                ChannelGroup kline = KLINE_MAP.get(symbol);
+                if (kline == null) {
+                    kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+                }
+                kline.add(channel);
+                KLINE_MAP.put(symbol, kline);
+                break;
+            case "depth" :
+                ChannelGroup depth = DEPTH_MAP.get(symbol);
+                if (depth == null) {
+                    depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+                }
+                depth.add(channel);
+                DEPTH_MAP.put(symbol, depth);
+                break;
+            case "trade" :
+                ChannelGroup trade = TRADE_MAP.get(symbol);
+                if (trade == null) {
+                    trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+                }
+                trade.add(channel);
+                TRADE_MAP.put(symbol, trade);
+                break;
+            default:
+                break;
+        }
+    }
+
+    public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+        switch (type) {
+            case "kline" :
+                ChannelGroup kline = KLINE_MAP.get(symbol);
+                if (kline == null) {
+                    return;
+                }
+                kline.remove(channel);
+                KLINE_MAP.put(symbol, kline);
+                break;
+            case "depth" :
+                ChannelGroup depth = DEPTH_MAP.get(symbol);
+                if (depth == null) {
+                    return;
+                }
+                depth.remove(channel);
+                DEPTH_MAP.put(symbol, depth);
+                break;
+            case "trade" :
+                ChannelGroup trade = TRADE_MAP.get(symbol);
+                if (trade == null) {
+                    return;
+                }
+                trade.remove(channel);
+                TRADE_MAP.put(symbol, trade);
+                break;
+            default:
+                break;
+        }
+    }
+
+    public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
+        switch (type) {
+            case "kline" :
+                return KLINE_MAP.get(symbol);
+            case "depth" :
+                return DEPTH_MAP.get(symbol);
+            case "trade" :
+                return TRADE_MAP.get(symbol);
+            default:
+                return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+        }
+    }
 
 }
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
index 540fd36..5471a7f 100644
--- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -2,6 +2,10 @@
 
 import com.alibaba.fastjson.JSONObject;
 import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.bean.SubRequest;
+import com.xcong.excoin.netty.bean.SubResponse;
+import com.xcong.excoin.netty.bean.UnSubResponse;
+import com.xcong.excoin.netty.common.ChannelManager;
 import com.xcong.excoin.netty.common.NettyTools;
 import com.xcong.excoin.netty.logic.MsgLogic;
 import io.netty.channel.ChannelHandlerContext;
@@ -27,14 +31,38 @@
     private MsgLogic msgLogic;
 
     public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
-        RequestBean requestBean = null;
-        try {
-            requestBean = JSONObject.parseObject(msg, RequestBean.class);
-            requestBean.setChannelId(ctx.channel().id().asShortText());
-            msgLogic.webSocketMsgLogic(requestBean);
-        } catch (Exception e) {
-            log.info("#websocket json error:{}#", e);
-            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
+        JSONObject jsonObject = JSONObject.parseObject(msg);
+        if (jsonObject.containsKey("sub")) {
+            String sub = jsonObject.getString("sub");
+            log.info("{}", sub);
+            String[] split = sub.split("\\.");
+            if (split.length != 4) {
+                ctx.writeAndFlush("error");
+                return;
+            }
+
+            ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]);
+            SubResponse subResponse = new SubResponse();
+            subResponse.setSubbed(sub);
+            subResponse.setId(jsonObject.getString("id"));
+            subResponse.setTs(System.currentTimeMillis());
+            subResponse.setStatus("ok");
+            ctx.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(subResponse)));
+        } else if (jsonObject.containsKey("unsub")) {
+            String sub = jsonObject.getString("unsub");
+            String[] split = sub.split("\\.");
+            if (split.length != 4) {
+                ctx.writeAndFlush("error");
+                return;
+            }
+
+            ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]);
+            UnSubResponse resp = new UnSubResponse();
+            resp.setSubbed(sub);
+            resp.setId(jsonObject.getString("id"));
+            resp.setTs(System.currentTimeMillis());
+            resp.setStatus("ok");
+            ctx.writeAndFlush(resp);
         }
     }
 
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
index 3e57246..a8b448e 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -61,7 +61,7 @@
 
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-//        LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
+//        log.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
         if (msg instanceof FullHttpRequest) {
             // 以http请求形式接入,但是走的是websocket
             handleHttpRequest(ctx, (FullHttpRequest) msg);
@@ -88,7 +88,6 @@
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
         log.info("[触发器触发]");
-//        super.userEventTriggered(ctx, evt);
         if (evt instanceof IdleStateEvent) {
             IdleStateEvent event = (IdleStateEvent) evt;
             if (event.state() == IdleState.READER_IDLE) {
@@ -101,7 +100,7 @@
                     times = 0;
                 }
                 /*读超时*/
-                log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
+//                log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
                 // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
                 if (times >= MAX_UN_REC_PING_TIMES) {
                     log.info("===服务端===(写超时,关闭chanel)");
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
new file mode 100644
index 0000000..cd6cae0
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.quartz.job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.SubscriptionClient;
+import com.huobi.client.SubscriptionOptions;
+import com.huobi.client.model.Candlestick;
+import com.huobi.client.model.enums.CandlestickInterval;
+import com.huobi.client.model.enums.MBPLevelEnums;
+import com.huobi.client.model.event.PriceDepthEvent;
+import com.xcong.excoin.netty.server.WebSocketServer;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author wzy
+ * @date 2021-02-23
+ **/
+@Slf4j
+@Component
+public class KLineDataJob {
+
+    @Autowired
+    WebSocketServer webSocketServer;
+
+    @PostConstruct
+    public void data() throws Exception {
+        webSocketServer.start();
+        log.info("==================");
+        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
+        subscriptionOptions.setConnectionDelayOnFailure(5);
+        subscriptionOptions.setUri("wss://api.hadax.com/ws");
+        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
+        });
+
+        subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
+//            log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids()));
+//            log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks()));
+        });
+    }
+}

--
Gitblit v1.9.1