From d3cdbf19b53e24a1417364b098f7b8f71f36a208 Mon Sep 17 00:00:00 2001 From: wzy <wzy19931122ai@163.com> Date: Sun, 28 Feb 2021 19:47:04 +0800 Subject: [PATCH] modify --- src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 8 ++-- src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java | 7 +++ src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java | 6 +- src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java | 8 +--- src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 8 +-- src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java | 2 src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java | 23 +++++++++++ src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 6 +- 8 files changed, 45 insertions(+), 23 deletions(-) diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java index 33bc9b8..2917b8d 100644 --- a/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java +++ b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java @@ -1,5 +1,6 @@ package com.xcong.excoin.netty.client; +import com.xcong.excoin.netty.common.ClientChannelManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; @@ -19,7 +20,7 @@ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { - log.info("消息收到"); + ClientChannelManager.addServerGroup(ctx.channel()); } @Override @@ -27,4 +28,8 @@ log.info("消息收到"); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + ClientChannelManager.removeServerGroup(ctx.channel()); + } } \ No newline at end of file diff --git a/src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java new file mode 100644 index 0000000..014c933 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java @@ -0,0 +1,23 @@ +package com.xcong.excoin.netty.common; + +import io.netty.channel.Channel; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + +public class ClientChannelManager { + private static ChannelGroup SERVER_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + public static ChannelGroup getServerGroup() { + return SERVER_GROUP; + } + + public static void addServerGroup(Channel channel) { + SERVER_GROUP.add(channel); + } + + public static void removeServerGroup(Channel channel) { + SERVER_GROUP.remove(channel); + } + +} diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java similarity index 99% rename from src/main/java/com/xcong/excoin/netty/common/ChannelManager.java rename to src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java index 7932c00..c6f262c 100644 --- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java +++ b/src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java @@ -18,7 +18,7 @@ * @email wangdoubleone@gmail.com * @date 2019-05-06 */ -public class ChannelManager { +public class ServerChannelManager { private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java index 5471a7f..3241e3c 100644 --- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java +++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java @@ -1,11 +1,9 @@ package com.xcong.excoin.netty.dispatch; import com.alibaba.fastjson.JSONObject; -import com.xcong.excoin.netty.bean.RequestBean; -import com.xcong.excoin.netty.bean.SubRequest; import com.xcong.excoin.netty.bean.SubResponse; import com.xcong.excoin.netty.bean.UnSubResponse; -import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; import io.netty.channel.ChannelHandlerContext; @@ -41,7 +39,7 @@ return; } - ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]); + ServerChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]); SubResponse subResponse = new SubResponse(); subResponse.setSubbed(sub); subResponse.setId(jsonObject.getString("id")); @@ -56,7 +54,7 @@ return; } - ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]); + ServerChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]); UnSubResponse resp = new UnSubResponse(); resp.setSubbed(sub); resp.setId(jsonObject.getString("id")); diff --git a/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java index bc93484..11d7a65 100644 --- a/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java +++ b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java @@ -1,6 +1,6 @@ package com.xcong.excoin.netty.handler; -import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.ServerChannelManager; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -20,13 +20,13 @@ @Override public void channelActive(ChannelHandlerContext ctx) { log.info("[tcp客户端连入服务器]"); - ChannelManager.addTcpChannel(ctx.channel()); + ServerChannelManager.addTcpChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("[tcp客户端断开服务器]"); - ChannelManager.removeTcpChannel(ctx.channel()); + ServerChannelManager.removeTcpChannel(ctx.channel()); } @Override diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java index a8b448e..2c9c019 100644 --- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java +++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java @@ -1,7 +1,7 @@ package com.xcong.excoin.netty.handler; -import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.dispatch.MsgDispatch; @@ -50,13 +50,13 @@ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id()); - ChannelManager.addWebSocketChannel(ctx.channel()); + ServerChannelManager.addWebSocketChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("[离开websocket服务器]-->{}", ctx.channel().id()); - ChannelManager.removeWebSocketChannel(ctx.channel()); + ServerChannelManager.removeWebSocketChannel(ctx.channel()); } @Override diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java index d1c9322..8e76216 100644 --- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java +++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java @@ -3,7 +3,7 @@ import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.bean.ResponseBean; -import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.NettyTools; import io.netty.channel.Channel; import org.springframework.stereotype.Component; @@ -19,7 +19,7 @@ public void webReqConnection(RequestBean requestBean) { - Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); + Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush(NettyTools.webSocketBytes("this is ok")); } @@ -30,12 +30,12 @@ String type = jsonObject.getString("type"); ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null); - Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); + Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); } public void defaultReq(RequestBean requestBean) { - Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); + Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush("this is error type"); } } diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java index 603e2c6..1f62b16 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java @@ -2,14 +2,10 @@ import com.alibaba.fastjson.JSONObject; import com.huobi.client.SubscriptionClient; -import com.huobi.client.SubscriptionOptions; -import com.huobi.client.model.Candlestick; -import com.huobi.client.model.enums.CandlestickInterval; import com.xcong.excoin.modules.symbols.service.SymbolsService; -import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; -import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -51,7 +47,7 @@ log.info("#=======价格更新开启=======#"); subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> { - ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent))); + ServerChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent))); // String symbol = tradeEvent.getSymbol(); // // 根据symbol判断做什么操作 // symbol = CoinTypeConvert.convert(symbol); -- Gitblit v1.9.1