Helius
2021-02-24 5285d249e702eeab3ad3847aaa24523ef0f1a544
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package com.xcong.excoin.netty.common;
 
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.ResponseBean;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
 
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
 
/**
 * @author wzy
 * @email wangdoubleone@gmail.com
 * @date 2019-05-06
 */
public class ChannelManager {
 
    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
    private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
 
    private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
 
    private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
 
    // 当前连接到服务器的通道(tcp和websocket)
    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
 
    public static void addWebSocketChannel(Channel channel) {
        WEBSOCKET_GROUP.add(channel);
        CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
    }
 
    public static void removeWebSocketChannel(Channel channel) {
        WEBSOCKET_GROUP.remove(channel);
        CHANNEL_MAP.remove(channel.id().asShortText());
    }
 
    public static Channel findWebSocketChannel(String id){
        ChannelId channelId = CHANNEL_MAP.get(id);
        return WEBSOCKET_GROUP.find(channelId);
    }
 
    public static ChannelGroup getWebSocketGroup() {
        return WEBSOCKET_GROUP;
    }
 
    public static void send2All(Object object, String type) {
        if (WEBSOCKET_GROUP.size() == 0) {
            return;
        }
        ResponseBean responseBean = ResponseBean.ok(type, null);
        responseBean.putInfo("data", object);
        String msg = JSONObject.toJSONString(responseBean);
        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
    }
 
    public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                kline.add(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                if (depth == null) {
                    depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                depth.add(channel);
                DEPTH_MAP.put(symbol, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                trade.add(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
 
    public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    return;
                }
                kline.remove(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                if (depth == null) {
                    return;
                }
                depth.remove(channel);
                DEPTH_MAP.put(symbol, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    return;
                }
                trade.remove(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
 
    public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
        switch (type) {
            case "kline" :
                return KLINE_MAP.get(symbol);
            case "depth" :
                return DEPTH_MAP.get(symbol);
            case "trade" :
                return TRADE_MAP.get(symbol);
            default:
                return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
    }
 
}