From d3cdbf19b53e24a1417364b098f7b8f71f36a208 Mon Sep 17 00:00:00 2001
From: wzy <wzy19931122ai@163.com>
Date: Sun, 28 Feb 2021 19:47:04 +0800
Subject: [PATCH] modify
---
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 8 ++--
src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java | 7 +++
src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java | 6 +-
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java | 8 +---
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 8 +--
src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java | 2
src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java | 23 +++++++++++
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 6 +-
8 files changed, 45 insertions(+), 23 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
index 33bc9b8..2917b8d 100644
--- a/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
@@ -1,5 +1,6 @@
package com.xcong.excoin.netty.client;
+import com.xcong.excoin.netty.common.ClientChannelManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@@ -19,7 +20,7 @@
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
- log.info("消息收到");
+ ClientChannelManager.addServerGroup(ctx.channel());
}
@Override
@@ -27,4 +28,8 @@
log.info("消息收到");
}
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ ClientChannelManager.removeServerGroup(ctx.channel());
+ }
}
\ No newline at end of file
diff --git a/src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java
new file mode 100644
index 0000000..014c933
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java
@@ -0,0 +1,23 @@
+package com.xcong.excoin.netty.common;
+
+import io.netty.channel.Channel;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+public class ClientChannelManager {
+ private static ChannelGroup SERVER_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ public static ChannelGroup getServerGroup() {
+ return SERVER_GROUP;
+ }
+
+ public static void addServerGroup(Channel channel) {
+ SERVER_GROUP.add(channel);
+ }
+
+ public static void removeServerGroup(Channel channel) {
+ SERVER_GROUP.remove(channel);
+ }
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java
similarity index 99%
rename from src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
rename to src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java
index 7932c00..c6f262c 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java
@@ -18,7 +18,7 @@
* @email wangdoubleone@gmail.com
* @date 2019-05-06
*/
-public class ChannelManager {
+public class ServerChannelManager {
private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
index 5471a7f..3241e3c 100644
--- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -1,11 +1,9 @@
package com.xcong.excoin.netty.dispatch;
import com.alibaba.fastjson.JSONObject;
-import com.xcong.excoin.netty.bean.RequestBean;
-import com.xcong.excoin.netty.bean.SubRequest;
import com.xcong.excoin.netty.bean.SubResponse;
import com.xcong.excoin.netty.bean.UnSubResponse;
-import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
import io.netty.channel.ChannelHandlerContext;
@@ -41,7 +39,7 @@
return;
}
- ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]);
+ ServerChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]);
SubResponse subResponse = new SubResponse();
subResponse.setSubbed(sub);
subResponse.setId(jsonObject.getString("id"));
@@ -56,7 +54,7 @@
return;
}
- ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]);
+ ServerChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]);
UnSubResponse resp = new UnSubResponse();
resp.setSubbed(sub);
resp.setId(jsonObject.getString("id"));
diff --git a/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
index bc93484..11d7a65 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
@@ -1,6 +1,6 @@
package com.xcong.excoin.netty.handler;
-import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.ServerChannelManager;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -20,13 +20,13 @@
@Override
public void channelActive(ChannelHandlerContext ctx) {
log.info("[tcp客户端连入服务器]");
- ChannelManager.addTcpChannel(ctx.channel());
+ ServerChannelManager.addTcpChannel(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("[tcp客户端断开服务器]");
- ChannelManager.removeTcpChannel(ctx.channel());
+ ServerChannelManager.removeTcpChannel(ctx.channel());
}
@Override
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
index a8b448e..2c9c019 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -1,7 +1,7 @@
package com.xcong.excoin.netty.handler;
-import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.Contans;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.dispatch.MsgDispatch;
@@ -50,13 +50,13 @@
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id());
- ChannelManager.addWebSocketChannel(ctx.channel());
+ ServerChannelManager.addWebSocketChannel(ctx.channel());
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("[离开websocket服务器]-->{}", ctx.channel().id());
- ChannelManager.removeWebSocketChannel(ctx.channel());
+ ServerChannelManager.removeWebSocketChannel(ctx.channel());
}
@Override
diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
index d1c9322..8e76216 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -3,7 +3,7 @@
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.bean.ResponseBean;
-import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import io.netty.channel.Channel;
import org.springframework.stereotype.Component;
@@ -19,7 +19,7 @@
public void webReqConnection(RequestBean requestBean) {
- Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+ Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId());
channel.writeAndFlush(NettyTools.webSocketBytes("this is ok"));
}
@@ -30,12 +30,12 @@
String type = jsonObject.getString("type");
ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null);
- Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+ Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId());
channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
}
public void defaultReq(RequestBean requestBean) {
- Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+ Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId());
channel.writeAndFlush("this is error type");
}
}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
index 603e2c6..1f62b16 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -2,14 +2,10 @@
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.SubscriptionClient;
-import com.huobi.client.SubscriptionOptions;
-import com.huobi.client.model.Candlestick;
-import com.huobi.client.model.enums.CandlestickInterval;
import com.xcong.excoin.modules.symbols.service.SymbolsService;
-import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
-import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,7 +47,7 @@
log.info("#=======价格更新开启=======#");
subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
- ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent)));
+ ServerChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent)));
// String symbol = tradeEvent.getSymbol();
// // 根据symbol判断做什么操作
// symbol = CoinTypeConvert.convert(symbol);
--
Gitblit v1.9.1