| | |
| | | |
| | | private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | |
| | | private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>(); |
| | |
| | | // 当前连接到服务器的通道(tcp和websocket) |
| | | private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | |
| | | public static void addTcpChannel(Channel channel) { |
| | | TCP_GROUP.add(channel); |
| | | } |
| | | |
| | | public static void removeTcpChannel(Channel channel) { |
| | | TCP_GROUP.remove(channel); |
| | | } |
| | | |
| | | public static void addWebSocketChannel(Channel channel) { |
| | | WEBSOCKET_GROUP.add(channel); |
| | | CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); |
| | | } |
| | | |
| | | public static ChannelGroup getTcpGroup() { |
| | | return TCP_GROUP; |
| | | } |
| | | |
| | | public static void removeWebSocketChannel(Channel channel) { |
| | |
| | | } |
| | | |
| | | public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { |
| | | String t = ""; |
| | | if (type.contains("depth")) { |
| | | String[] s = type.split("_"); |
| | | type = s[0]; |
| | | t = s[1]; |
| | | } |
| | | switch (type) { |
| | | case "kline" : |
| | | ChannelGroup kline = KLINE_MAP.get(symbol); |
| | |
| | | KLINE_MAP.put(symbol, kline); |
| | | break; |
| | | case "depth" : |
| | | ChannelGroup depth = DEPTH_MAP.get(symbol); |
| | | String key = symbol + "_" + t; |
| | | ChannelGroup depth = DEPTH_MAP.get(key); |
| | | if (depth == null) { |
| | | depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | depth.add(channel); |
| | | DEPTH_MAP.put(symbol, depth); |
| | | DEPTH_MAP.put(key, depth); |
| | | break; |
| | | case "trade" : |
| | | ChannelGroup trade = TRADE_MAP.get(symbol); |
| | |
| | | } |
| | | |
| | | public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { |
| | | String t = ""; |
| | | if (type.contains("depth")) { |
| | | String[] s = type.split("_"); |
| | | type = s[0]; |
| | | t = s[1]; |
| | | } |
| | | switch (type) { |
| | | case "kline" : |
| | | ChannelGroup kline = KLINE_MAP.get(symbol); |
| | |
| | | KLINE_MAP.put(symbol, kline); |
| | | break; |
| | | case "depth" : |
| | | ChannelGroup depth = DEPTH_MAP.get(symbol); |
| | | String key = symbol + "_" + t; |
| | | ChannelGroup depth = DEPTH_MAP.get(key); |
| | | if (depth == null) { |
| | | return; |
| | | } |
| | | depth.remove(channel); |
| | | DEPTH_MAP.put(symbol, depth); |
| | | DEPTH_MAP.put(key, depth); |
| | | break; |
| | | case "trade" : |
| | | ChannelGroup trade = TRADE_MAP.get(symbol); |