From 094de2df61a1585f475b1afa4a68cf9fcc371629 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Wed, 24 Feb 2021 18:12:15 +0800
Subject: [PATCH] Merge branch 'whole_new_trc20' into data_reform
---
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 82 +++++++++++++++++++++++++++++++++++++++++
1 files changed, 82 insertions(+), 0 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 5d158f6..3937137 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -8,6 +8,8 @@
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -19,6 +21,12 @@
public class ChannelManager {
private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
+
+ private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
+
+ private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
// 当前连接到服务器的通道(tcp和websocket)
private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
@@ -52,5 +60,79 @@
WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
}
+ public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+ switch (type) {
+ case "kline" :
+ ChannelGroup kline = KLINE_MAP.get(symbol);
+ if (kline == null) {
+ kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ kline.add(channel);
+ KLINE_MAP.put(symbol, kline);
+ break;
+ case "depth" :
+ ChannelGroup depth = DEPTH_MAP.get(symbol);
+ if (depth == null) {
+ depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ depth.add(channel);
+ DEPTH_MAP.put(symbol, depth);
+ break;
+ case "trade" :
+ ChannelGroup trade = TRADE_MAP.get(symbol);
+ if (trade == null) {
+ trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ trade.add(channel);
+ TRADE_MAP.put(symbol, trade);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+ switch (type) {
+ case "kline" :
+ ChannelGroup kline = KLINE_MAP.get(symbol);
+ if (kline == null) {
+ return;
+ }
+ kline.remove(channel);
+ KLINE_MAP.put(symbol, kline);
+ break;
+ case "depth" :
+ ChannelGroup depth = DEPTH_MAP.get(symbol);
+ if (depth == null) {
+ return;
+ }
+ depth.remove(channel);
+ DEPTH_MAP.put(symbol, depth);
+ break;
+ case "trade" :
+ ChannelGroup trade = TRADE_MAP.get(symbol);
+ if (trade == null) {
+ return;
+ }
+ trade.remove(channel);
+ TRADE_MAP.put(symbol, trade);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
+ switch (type) {
+ case "kline" :
+ return KLINE_MAP.get(symbol);
+ case "depth" :
+ return DEPTH_MAP.get(symbol);
+ case "trade" :
+ return TRADE_MAP.get(symbol);
+ default:
+ return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ }
}
--
Gitblit v1.9.1