| | |
| | | import io.netty.channel.group.DefaultChannelGroup; |
| | | import io.netty.util.concurrent.GlobalEventExecutor; |
| | | |
| | | import javax.validation.constraints.NotBlank; |
| | | import javax.validation.constraints.NotNull; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | |
| | |
| | | public class ChannelManager { |
| | | |
| | | private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | // 当前连接到服务器的通道(tcp和websocket) |
| | | private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); |
| | |
| | | WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); |
| | | } |
| | | |
| | | public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { |
| | | switch (type) { |
| | | case "kline" : |
| | | ChannelGroup kline = KLINE_MAP.get(symbol); |
| | | if (kline == null) { |
| | | kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | kline.add(channel); |
| | | KLINE_MAP.put(symbol, kline); |
| | | break; |
| | | case "depth" : |
| | | ChannelGroup depth = DEPTH_MAP.get(symbol); |
| | | if (depth == null) { |
| | | depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | depth.add(channel); |
| | | DEPTH_MAP.put(symbol, depth); |
| | | break; |
| | | case "trade" : |
| | | ChannelGroup trade = TRADE_MAP.get(symbol); |
| | | if (trade == null) { |
| | | trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | trade.add(channel); |
| | | TRADE_MAP.put(symbol, trade); |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | } |
| | | |
| | | public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { |
| | | switch (type) { |
| | | case "kline" : |
| | | ChannelGroup kline = KLINE_MAP.get(symbol); |
| | | if (kline == null) { |
| | | return; |
| | | } |
| | | kline.remove(channel); |
| | | KLINE_MAP.put(symbol, kline); |
| | | break; |
| | | case "depth" : |
| | | ChannelGroup depth = DEPTH_MAP.get(symbol); |
| | | if (depth == null) { |
| | | return; |
| | | } |
| | | depth.remove(channel); |
| | | DEPTH_MAP.put(symbol, depth); |
| | | break; |
| | | case "trade" : |
| | | ChannelGroup trade = TRADE_MAP.get(symbol); |
| | | if (trade == null) { |
| | | return; |
| | | } |
| | | trade.remove(channel); |
| | | TRADE_MAP.put(symbol, trade); |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | } |
| | | |
| | | public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) { |
| | | switch (type) { |
| | | case "kline" : |
| | | return KLINE_MAP.get(symbol); |
| | | case "depth" : |
| | | return DEPTH_MAP.get(symbol); |
| | | case "trade" : |
| | | return TRADE_MAP.get(symbol); |
| | | default: |
| | | return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | } |
| | | |
| | | } |