Helius
2021-02-28 d8d653b40cc6565c72cccd28de831474e5d5c512
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -8,6 +8,8 @@
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -20,12 +22,33 @@
    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
    // 当前连接到服务器的通道(tcp和websocket)
    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
    public static void addTcpChannel(Channel channel) {
        TCP_GROUP.add(channel);
    }
    public static void removeTcpChannel(Channel channel) {
        TCP_GROUP.remove(channel);
    }
    public static void addWebSocketChannel(Channel channel) {
        WEBSOCKET_GROUP.add(channel);
        CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
    }
    public static ChannelGroup getTcpGroup() {
        return TCP_GROUP;
    }
    public static void removeWebSocketChannel(Channel channel) {
@@ -52,5 +75,93 @@
        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
    }
    public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        String t = "";
        if (type.contains("depth")) {
            String[] s = type.split("_");
            type = s[0];
            t = s[1];
        }
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                kline.add(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                String key = symbol + "_" + t;
                ChannelGroup depth = DEPTH_MAP.get(key);
                if (depth == null) {
                    depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                depth.add(channel);
                DEPTH_MAP.put(key, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                trade.add(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
    public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        String t = "";
        if (type.contains("depth")) {
            String[] s = type.split("_");
            type = s[0];
            t = s[1];
        }
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    return;
                }
                kline.remove(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                String key = symbol + "_" + t;
                ChannelGroup depth = DEPTH_MAP.get(key);
                if (depth == null) {
                    return;
                }
                depth.remove(channel);
                DEPTH_MAP.put(key, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    return;
                }
                trade.remove(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
    public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
        switch (type) {
            case "kline" :
                return KLINE_MAP.get(symbol);
            case "depth" :
                return DEPTH_MAP.get(symbol);
            case "trade" :
                return TRADE_MAP.get(symbol);
            default:
                return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
    }
}