wzy
2021-02-28 d3cdbf19b53e24a1417364b098f7b8f71f36a208
modify
1 files added
1 files renamed
6 files modified
68 ■■■■■ changed files
src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java 7 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java 23 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java 2 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java 8 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java 6 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java 6 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java 8 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java 8 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
@@ -1,5 +1,6 @@
package com.xcong.excoin.netty.client;
import com.xcong.excoin.netty.common.ClientChannelManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
@@ -19,7 +20,7 @@
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("消息收到");
        ClientChannelManager.addServerGroup(ctx.channel());
    }
    @Override
@@ -27,4 +28,8 @@
        log.info("消息收到");
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ClientChannelManager.removeServerGroup(ctx.channel());
    }
}
src/main/java/com/xcong/excoin/netty/common/ClientChannelManager.java
New file
@@ -0,0 +1,23 @@
package com.xcong.excoin.netty.common;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
public class ClientChannelManager {
    private static ChannelGroup SERVER_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    public static ChannelGroup getServerGroup() {
        return SERVER_GROUP;
    }
    public static void addServerGroup(Channel channel) {
        SERVER_GROUP.add(channel);
    }
    public static void removeServerGroup(Channel channel) {
        SERVER_GROUP.remove(channel);
    }
}
src/main/java/com/xcong/excoin/netty/common/ServerChannelManager.java
File was renamed from src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -18,7 +18,7 @@
 * @email wangdoubleone@gmail.com
 * @date 2019-05-06
 */
public class ChannelManager {
public class ServerChannelManager {
    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -1,11 +1,9 @@
package com.xcong.excoin.netty.dispatch;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.bean.SubRequest;
import com.xcong.excoin.netty.bean.SubResponse;
import com.xcong.excoin.netty.bean.UnSubResponse;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
import io.netty.channel.ChannelHandlerContext;
@@ -41,7 +39,7 @@
                return;
            }
            ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]);
            ServerChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]);
            SubResponse subResponse = new SubResponse();
            subResponse.setSubbed(sub);
            subResponse.setId(jsonObject.getString("id"));
@@ -56,7 +54,7 @@
                return;
            }
            ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]);
            ServerChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]);
            UnSubResponse resp = new UnSubResponse();
            resp.setSubbed(sub);
            resp.setId(jsonObject.getString("id"));
src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
@@ -1,6 +1,6 @@
package com.xcong.excoin.netty.handler;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.ServerChannelManager;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -20,13 +20,13 @@
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("[tcp客户端连入服务器]");
        ChannelManager.addTcpChannel(ctx.channel());
        ServerChannelManager.addTcpChannel(ctx.channel());
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("[tcp客户端断开服务器]");
        ChannelManager.removeTcpChannel(ctx.channel());
        ServerChannelManager.removeTcpChannel(ctx.channel());
    }
    @Override
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -1,7 +1,7 @@
package com.xcong.excoin.netty.handler;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.Contans;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.dispatch.MsgDispatch;
@@ -50,13 +50,13 @@
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id());
        ChannelManager.addWebSocketChannel(ctx.channel());
        ServerChannelManager.addWebSocketChannel(ctx.channel());
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        log.info("[离开websocket服务器]-->{}", ctx.channel().id());
        ChannelManager.removeWebSocketChannel(ctx.channel());
        ServerChannelManager.removeWebSocketChannel(ctx.channel());
    }
    @Override
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -3,7 +3,7 @@
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.bean.ResponseBean;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import io.netty.channel.Channel;
import org.springframework.stereotype.Component;
@@ -19,7 +19,7 @@
    public void webReqConnection(RequestBean requestBean) {
        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
        Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId());
        channel.writeAndFlush(NettyTools.webSocketBytes("this is ok"));
    }
@@ -30,12 +30,12 @@
        String type = jsonObject.getString("type");
        ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null);
        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
        Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId());
        channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
    }
    public void defaultReq(RequestBean requestBean) {
        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
        Channel channel = ServerChannelManager.findWebSocketChannel(requestBean.getChannelId());
        channel.writeAndFlush("this is error type");
    }
}
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -2,14 +2,10 @@
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.huobi.client.model.Candlestick;
import com.huobi.client.model.enums.CandlestickInterval;
import com.xcong.excoin.modules.symbols.service.SymbolsService;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.ServerChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -51,7 +47,7 @@
        log.info("#=======价格更新开启=======#");
        subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
            ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent)));
            ServerChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent)));
//            String symbol = tradeEvent.getSymbol();
//            // 根据symbol判断做什么操作
//            symbol = CoinTypeConvert.convert(symbol);