package com.xcong.excoin.netty.common; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.ResponseBean; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import javax.validation.constraints.NotBlank; import javax.validation.constraints.NotNull; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-06 */ public class ChannelManager { private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static final ConcurrentMap DEPTH_MAP = new ConcurrentHashMap<>(); private static final ConcurrentMap TRADE_MAP = new ConcurrentHashMap<>(); private static final ConcurrentMap KLINE_MAP = new ConcurrentHashMap<>(); // 当前连接到服务器的通道(tcp和websocket) private static final ConcurrentMap CHANNEL_MAP = new ConcurrentHashMap<>(); public static void addTcpChannel(Channel channel) { TCP_GROUP.add(channel); } public static void removeTcpChannel(Channel channel) { TCP_GROUP.remove(channel); } public static void addWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.add(channel); CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); } public static ChannelGroup getTcpGroup() { return TCP_GROUP; } public static void removeWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.remove(channel); CHANNEL_MAP.remove(channel.id().asShortText()); } public static Channel findWebSocketChannel(String id){ ChannelId channelId = CHANNEL_MAP.get(id); return WEBSOCKET_GROUP.find(channelId); } public static ChannelGroup getWebSocketGroup() { return WEBSOCKET_GROUP; } public static void send2All(Object object, String type) { if (WEBSOCKET_GROUP.size() == 0) { return; } ResponseBean responseBean = ResponseBean.ok(type, null); responseBean.putInfo("data", object); String msg = JSONObject.toJSONString(responseBean); WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); } public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { String t = ""; if (type.contains("depth")) { String[] s = type.split("_"); type = s[0]; t = s[1]; } switch (type) { case "kline" : ChannelGroup kline = KLINE_MAP.get(symbol); if (kline == null) { kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } kline.add(channel); KLINE_MAP.put(symbol, kline); break; case "depth" : String key = symbol + "_" + t; ChannelGroup depth = DEPTH_MAP.get(key); if (depth == null) { depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } depth.add(channel); DEPTH_MAP.put(key, depth); break; case "trade" : ChannelGroup trade = TRADE_MAP.get(symbol); if (trade == null) { trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } trade.add(channel); TRADE_MAP.put(symbol, trade); break; default: break; } } public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { String t = ""; if (type.contains("depth")) { String[] s = type.split("_"); type = s[0]; t = s[1]; } switch (type) { case "kline" : ChannelGroup kline = KLINE_MAP.get(symbol); if (kline == null) { return; } kline.remove(channel); KLINE_MAP.put(symbol, kline); break; case "depth" : String key = symbol + "_" + t; ChannelGroup depth = DEPTH_MAP.get(key); if (depth == null) { return; } depth.remove(channel); DEPTH_MAP.put(key, depth); break; case "trade" : ChannelGroup trade = TRADE_MAP.get(symbol); if (trade == null) { return; } trade.remove(channel); TRADE_MAP.put(symbol, trade); break; default: break; } } public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) { switch (type) { case "kline" : return KLINE_MAP.get(symbol); case "depth" : return DEPTH_MAP.get(symbol); case "trade" : return TRADE_MAP.get(symbol); default: return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } } }