src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
@@ -1,5 +1,6 @@ package com.xcong.excoin.netty.client; import com.xcong.excoin.netty.common.ClientChannelManager; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; @@ -19,7 +20,7 @@ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("消息收到"); ClientChannelManager.addServerGroup(ctx.channel()); } @Override @@ -27,4 +28,8 @@ log.info("消息收到"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { ClientChannelManager.removeServerGroup(ctx.channel()); } } src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java
New file @@ -0,0 +1,23 @@ package com.xcong.excoin.netty.common; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class ClientChannelManager { private static ChannelGroup SERVER_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static ChannelGroup getServerGroup() { return SERVER_GROUP; } public static void addServerGroup(Channel channel) { SERVER_GROUP.add(channel); } public static void removeServerGroup(Channel channel) { SERVER_GROUP.remove(channel); } } src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java
File was renamed from src/main/java/com/xcong/excoin/netty/common/ChannelManager.java @@ -18,7 +18,7 @@ * @email wangdoubleone@gmail.com * @date 2019-05-06 */ public class ChannelManager { public class ServerChannelManager { private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -1,11 +1,9 @@ package com.xcong.excoin.netty.dispatch; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.bean.SubRequest; import com.xcong.excoin.netty.bean.SubResponse; import com.xcong.excoin.netty.bean.UnSubResponse; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; import io.netty.channel.ChannelHandlerContext; @@ -41,7 +39,7 @@ return; } ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]); ServerChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]); SubResponse subResponse = new SubResponse(); subResponse.setSubbed(sub); subResponse.setId(jsonObject.getString("id")); @@ -56,7 +54,7 @@ return; } ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]); ServerChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]); UnSubResponse resp = new UnSubResponse(); resp.setSubbed(sub); resp.setId(jsonObject.getString("id")); src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
@@ -1,6 +1,6 @@ package com.xcong.excoin.netty.handler; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.ServerChannelManager; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; @@ -20,13 +20,13 @@ @Override public void channelActive(ChannelHandlerContext ctx) { log.info("[tcp客户端连入服务器]"); ChannelManager.addTcpChannel(ctx.channel()); ServerChannelManager.addTcpChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) { log.info("[tcp客户端断开服务器]"); ChannelManager.removeTcpChannel(ctx.channel()); ServerChannelManager.removeTcpChannel(ctx.channel()); } @Override src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -1,7 +1,7 @@ package com.xcong.excoin.netty.handler; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.dispatch.MsgDispatch; @@ -50,13 +50,13 @@ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id()); ChannelManager.addWebSocketChannel(ctx.channel()); ServerChannelManager.addWebSocketChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("[离开websocket服务器]-->{}", ctx.channel().id()); ChannelManager.removeWebSocketChannel(ctx.channel()); ServerChannelManager.removeWebSocketChannel(ctx.channel()); } @Override src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -3,7 +3,7 @@ import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.bean.ResponseBean; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.NettyTools; import io.netty.channel.Channel; import org.springframework.stereotype.Component; @@ -19,7 +19,7 @@ public void webReqConnection(RequestBean requestBean) { Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush(NettyTools.webSocketBytes("this is ok")); } @@ -30,12 +30,12 @@ String type = jsonObject.getString("type"); ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null); Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); } public void defaultReq(RequestBean requestBean) { Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush("this is error type"); } } src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -2,14 +2,10 @@ import com.alibaba.fastjson.JSONObject; import com.huobi.client.SubscriptionClient; import com.huobi.client.SubscriptionOptions; import com.huobi.client.model.Candlestick; import com.huobi.client.model.enums.CandlestickInterval; import com.xcong.excoin.modules.symbols.service.SymbolsService; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.ServerChannelManager; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -51,7 +47,7 @@ log.info("#=======价格更新开启=======#"); subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> { ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent))); ServerChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent))); // String symbol = tradeEvent.getSymbol(); // // 根据symbol判断做什么操作 // symbol = CoinTypeConvert.convert(symbol);