package com.xcong.excoin.netty.common;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.xcong.excoin.netty.bean.ResponseBean;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelId;
|
import io.netty.channel.group.ChannelGroup;
|
import io.netty.channel.group.DefaultChannelGroup;
|
import io.netty.util.concurrent.GlobalEventExecutor;
|
|
import javax.validation.constraints.NotBlank;
|
import javax.validation.constraints.NotNull;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentMap;
|
|
/**
|
* @author wzy
|
* @email wangdoubleone@gmail.com
|
* @date 2019-05-06
|
*/
|
public class ChannelManager {
|
|
private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
|
private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
|
|
private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
|
|
private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
|
|
// 当前连接到服务器的通道(tcp和websocket)
|
private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
|
|
public static void addWebSocketChannel(Channel channel) {
|
WEBSOCKET_GROUP.add(channel);
|
CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
|
}
|
|
public static void removeWebSocketChannel(Channel channel) {
|
WEBSOCKET_GROUP.remove(channel);
|
CHANNEL_MAP.remove(channel.id().asShortText());
|
}
|
|
public static Channel findWebSocketChannel(String id){
|
ChannelId channelId = CHANNEL_MAP.get(id);
|
return WEBSOCKET_GROUP.find(channelId);
|
}
|
|
public static ChannelGroup getWebSocketGroup() {
|
return WEBSOCKET_GROUP;
|
}
|
|
public static void send2All(Object object, String type) {
|
if (WEBSOCKET_GROUP.size() == 0) {
|
return;
|
}
|
ResponseBean responseBean = ResponseBean.ok(type, null);
|
responseBean.putInfo("data", object);
|
String msg = JSONObject.toJSONString(responseBean);
|
WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
|
}
|
|
public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
|
switch (type) {
|
case "kline" :
|
ChannelGroup kline = KLINE_MAP.get(symbol);
|
if (kline == null) {
|
kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
}
|
kline.add(channel);
|
KLINE_MAP.put(symbol, kline);
|
break;
|
case "depth" :
|
ChannelGroup depth = DEPTH_MAP.get(symbol);
|
if (depth == null) {
|
depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
}
|
depth.add(channel);
|
DEPTH_MAP.put(symbol, depth);
|
break;
|
case "trade" :
|
ChannelGroup trade = TRADE_MAP.get(symbol);
|
if (trade == null) {
|
trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
}
|
trade.add(channel);
|
TRADE_MAP.put(symbol, trade);
|
break;
|
default:
|
break;
|
}
|
}
|
|
public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
|
switch (type) {
|
case "kline" :
|
ChannelGroup kline = KLINE_MAP.get(symbol);
|
if (kline == null) {
|
return;
|
}
|
kline.remove(channel);
|
KLINE_MAP.put(symbol, kline);
|
break;
|
case "depth" :
|
ChannelGroup depth = DEPTH_MAP.get(symbol);
|
if (depth == null) {
|
return;
|
}
|
depth.remove(channel);
|
DEPTH_MAP.put(symbol, depth);
|
break;
|
case "trade" :
|
ChannelGroup trade = TRADE_MAP.get(symbol);
|
if (trade == null) {
|
return;
|
}
|
trade.remove(channel);
|
TRADE_MAP.put(symbol, trade);
|
break;
|
default:
|
break;
|
}
|
}
|
|
public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
|
switch (type) {
|
case "kline" :
|
return KLINE_MAP.get(symbol);
|
case "depth" :
|
return DEPTH_MAP.get(symbol);
|
case "trade" :
|
return TRADE_MAP.get(symbol);
|
default:
|
return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
|
}
|
}
|
|
}
|