Helius
2021-02-24 5285d249e702eeab3ad3847aaa24523ef0f1a544
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -8,6 +8,8 @@
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -19,6 +21,12 @@
public class ChannelManager {
    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
    // 当前连接到服务器的通道(tcp和websocket)
    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
@@ -52,5 +60,79 @@
        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
    }
    public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                kline.add(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                if (depth == null) {
                    depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                depth.add(channel);
                DEPTH_MAP.put(symbol, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                trade.add(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
    public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    return;
                }
                kline.remove(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                if (depth == null) {
                    return;
                }
                depth.remove(channel);
                DEPTH_MAP.put(symbol, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    return;
                }
                trade.remove(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
    public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
        switch (type) {
            case "kline" :
                return KLINE_MAP.get(symbol);
            case "depth" :
                return DEPTH_MAP.get(symbol);
            case "trade" :
                return TRADE_MAP.get(symbol);
            default:
                return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
    }
}