From d8d653b40cc6565c72cccd28de831474e5d5c512 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Sun, 28 Feb 2021 15:16:18 +0800
Subject: [PATCH] add netty

---
 src/main/java/com/xcong/excoin/netty/common/ChannelManager.java |   37 +++++++++++++++++++++++++++++++++----
 1 files changed, 33 insertions(+), 4 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 3937137..7932c00 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -22,6 +22,8 @@
 
     private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
+    private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
     private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
 
     private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
@@ -31,9 +33,22 @@
     // 当前连接到服务器的通道(tcp和websocket)
     private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
 
+
+    public static void addTcpChannel(Channel channel) {
+        TCP_GROUP.add(channel);
+    }
+
+    public static void removeTcpChannel(Channel channel) {
+        TCP_GROUP.remove(channel);
+    }
+
     public static void addWebSocketChannel(Channel channel) {
         WEBSOCKET_GROUP.add(channel);
         CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
+    }
+
+    public static ChannelGroup getTcpGroup() {
+        return TCP_GROUP;
     }
 
     public static void removeWebSocketChannel(Channel channel) {
@@ -61,6 +76,12 @@
     }
 
     public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+        String t = "";
+        if (type.contains("depth")) {
+            String[] s = type.split("_");
+            type = s[0];
+            t = s[1];
+        }
         switch (type) {
             case "kline" :
                 ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -71,12 +92,13 @@
                 KLINE_MAP.put(symbol, kline);
                 break;
             case "depth" :
-                ChannelGroup depth = DEPTH_MAP.get(symbol);
+                String key = symbol + "_" + t;
+                ChannelGroup depth = DEPTH_MAP.get(key);
                 if (depth == null) {
                     depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                 }
                 depth.add(channel);
-                DEPTH_MAP.put(symbol, depth);
+                DEPTH_MAP.put(key, depth);
                 break;
             case "trade" :
                 ChannelGroup trade = TRADE_MAP.get(symbol);
@@ -92,6 +114,12 @@
     }
 
     public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+        String t = "";
+        if (type.contains("depth")) {
+            String[] s = type.split("_");
+            type = s[0];
+            t = s[1];
+        }
         switch (type) {
             case "kline" :
                 ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -102,12 +130,13 @@
                 KLINE_MAP.put(symbol, kline);
                 break;
             case "depth" :
-                ChannelGroup depth = DEPTH_MAP.get(symbol);
+                String key = symbol + "_" + t;
+                ChannelGroup depth = DEPTH_MAP.get(key);
                 if (depth == null) {
                     return;
                 }
                 depth.remove(channel);
-                DEPTH_MAP.put(symbol, depth);
+                DEPTH_MAP.put(key, depth);
                 break;
             case "trade" :
                 ChannelGroup trade = TRADE_MAP.get(symbol);

--
Gitblit v1.9.1