Helius
2021-02-28 d8d653b40cc6565c72cccd28de831474e5d5c512
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -22,6 +22,8 @@
    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
@@ -31,9 +33,22 @@
    // 当前连接到服务器的通道(tcp和websocket)
    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
    public static void addTcpChannel(Channel channel) {
        TCP_GROUP.add(channel);
    }
    public static void removeTcpChannel(Channel channel) {
        TCP_GROUP.remove(channel);
    }
    public static void addWebSocketChannel(Channel channel) {
        WEBSOCKET_GROUP.add(channel);
        CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
    }
    public static ChannelGroup getTcpGroup() {
        return TCP_GROUP;
    }
    public static void removeWebSocketChannel(Channel channel) {
@@ -61,6 +76,12 @@
    }
    public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        String t = "";
        if (type.contains("depth")) {
            String[] s = type.split("_");
            type = s[0];
            t = s[1];
        }
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -71,12 +92,13 @@
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                String key = symbol + "_" + t;
                ChannelGroup depth = DEPTH_MAP.get(key);
                if (depth == null) {
                    depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                depth.add(channel);
                DEPTH_MAP.put(symbol, depth);
                DEPTH_MAP.put(key, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
@@ -92,6 +114,12 @@
    }
    public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        String t = "";
        if (type.contains("depth")) {
            String[] s = type.split("_");
            type = s[0];
            t = s[1];
        }
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -102,12 +130,13 @@
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                String key = symbol + "_" + t;
                ChannelGroup depth = DEPTH_MAP.get(key);
                if (depth == null) {
                    return;
                }
                depth.remove(channel);
                DEPTH_MAP.put(symbol, depth);
                DEPTH_MAP.put(key, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);