Helius
2021-05-26 3985fbaef190b95d71ec458993fd58a527239045
modify
11 files modified
218 ■■■■ changed files
src/main/java/com/xcong/excoin/netty/bean/RequestBean.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java 72 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java 28 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/Contans.java 12 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/NettyTools.java 7 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java 28 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java 15 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java 8 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java 11 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java 21 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java 6 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
@@ -15,6 +15,16 @@
    private Object data;
    private String channelId;
    public String getChannelId() {
        return channelId;
    }
    public void setChannelId(String channelId) {
        this.channelId = channelId;
    }
    public Object getData() {
        return data;
    }
src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,77 +10,39 @@
    private static final long serialVersionUID = 1L;
    public static final String SUCCESS = "200";
    private Integer type;
    private String status;
    private Integer status;
    private String type;
    private Object data;
    private String info;
    private String channelId;
    private Map<Object, Object> mapInfo = new HashMap<>();
    private List<?> row;
    public static ResponseBean ok(String type, String info) {
        ResponseBean responseBean = new ResponseBean();
        responseBean.status = SUCCESS;
        responseBean.type = type;
        responseBean.info = info;
        return responseBean;
    }
    public String getStatus() {
        return status;
    }
    public void setStatus(String status) {
        this.status = status;
    }
    public String getType() {
    public Integer getType() {
        return type;
    }
    public void setType(String type) {
    public void setType(Integer type) {
        this.type = type;
    }
    public String getInfo() {
        return info;
    public Integer getStatus() {
        return status;
    }
    public void setInfo(String info) {
        this.info = info;
    public void setStatus(Integer status) {
        this.status = status;
    }
    public String getChannelId() {
        return channelId;
    public Object getData() {
        return data;
    }
    public void setChannelId(String channelId) {
        this.channelId = channelId;
    public void setData(Object data) {
        this.data = data;
    }
    public Map<Object, Object> getMapInfo() {
        return mapInfo;
    }
    public void setMapInfo(Map<Object, Object> mapInfo) {
        this.mapInfo = mapInfo;
    }
    public void putInfo(Object key, Object value) {
        this.mapInfo.put(key, value);
    }
    public List<?> getRow() {
        return row;
    }
    public void setRow(List<?> row) {
        this.row = row;
    public static ResponseBean fail(){
        ResponseBean res = new ResponseBean();
        res.setStatus(0);
        return res;
    }
}
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -23,14 +23,20 @@
    // 当前连接到服务器的通道(tcp和websocket)
    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
    // key - 用户ID value - 通道ID
    private static final ConcurrentMap<String, ChannelId> MEMBER_CHANNEL = new ConcurrentHashMap<>();
    // key - 通道 value - 用户
    private static final ConcurrentMap<ChannelId, String> CHANNEL_MEMBER = new ConcurrentHashMap<>();
    public static void addWebSocketChannel(Channel channel) {
        WEBSOCKET_GROUP.add(channel);
        CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
    }
    public static void addWsChannel(Channel channel, Long memberId) {
        WEBSOCKET_GROUP.add(channel);
        CHANNEL_MAP.put(memberId.toString(), channel.id());
        MEMBER_CHANNEL.put(memberId.toString(), channel.id());
        CHANNEL_MEMBER.put(channel.id(), memberId.toString());
    }
    public static void removeWebSocketChannel(Channel channel) {
@@ -39,8 +45,8 @@
    }
    public static void removeWsChannel(Channel channel, Long memberId) {
        WEBSOCKET_GROUP.remove(channel);
        CHANNEL_MAP.remove(memberId.toString());
        MEMBER_CHANNEL.remove(memberId.toString());
        CHANNEL_MEMBER.remove(channel.id());
    }
    public static Channel findWebSocketChannel(String id){
@@ -49,8 +55,12 @@
    }
    public static Channel findWsChannel(Long id){
        ChannelId channelId = CHANNEL_MAP.get(id.toString());
        ChannelId channelId = MEMBER_CHANNEL.get(id.toString());
        return WEBSOCKET_GROUP.find(channelId);
    }
    public static String findWsMemberId(Channel channel) {
        return CHANNEL_MEMBER.get(channel.id());
    }
    public static ChannelGroup getWebSocketGroup() {
@@ -61,10 +71,10 @@
        if (WEBSOCKET_GROUP.size() == 0) {
            return;
        }
        ResponseBean responseBean = ResponseBean.ok(type, null);
        responseBean.putInfo("data", object);
        String msg = JSONObject.toJSONString(responseBean);
        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
//        ResponseBean responseBean = ResponseBean.ok(type, null);
//        responseBean.putInfo("data", object);
//        String msg = JSONObject.toJSONString(responseBean);
//        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
    }
src/main/java/com/xcong/excoin/netty/common/Contans.java
@@ -8,16 +8,8 @@
    public static final String HEART_BEAT = "ping pong pang";
    public static final String WEB_REQ_CONNECTION = "000_000";
    public static final int AUTH_CHECK = 1;
    public static final String HOME_SYMBOLS = "001_001";
    public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001";
    public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002";
    public static final String ORDER_PRE_ORDER_DATA = "003_001";
    public static final String ORDER_FIND_TRUST_ORDER = "003_002";
    public static final int MESSAGE = 2;
}
src/main/java/com/xcong/excoin/netty/common/NettyTools.java
@@ -1,5 +1,7 @@
package com.xcong.excoin.netty.common;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.ResponseBean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
@@ -21,6 +23,11 @@
        return Unpooled.copiedBuffer((msg + "_split").getBytes());
    }
    public static TextWebSocketFrame wsSendMsg(ResponseBean responseBean) {
        String res = JSONObject.toJSONString(responseBean);
        return new TextWebSocketFrame(res);
    }
    public static TextWebSocketFrame webSocketBytes(String msg) {
        return new TextWebSocketFrame(msg);
    }
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -1,9 +1,14 @@
package com.xcong.excoin.netty.dispatch;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.bean.ResponseBean;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.Contans;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
@@ -19,27 +24,26 @@
 */
@Slf4j
@Component("msgDispatch")
public class MsgDispatch implements ApplicationContextAware {
    private ApplicationContext applicationContext;
public class MsgDispatch {
    @Autowired
    private MsgLogic msgLogic;
    public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
        log.info("==========={}", msg);
        RequestBean requestBean = null;
        RequestBean requestBean = JSONObject.parseObject(msg, RequestBean.class);
        // 判断当前通道用户是否已经登陆
        if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) {
            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123"));
            return;
        }
        try {
            requestBean.setChannelId(ctx.channel().id().asShortText());
            msgLogic.webSocketMsgLogic(requestBean);
        } catch (Exception e) {
            log.info("#websocket json error:{}#", e);
            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
            ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail()));
        }
    }
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }
}
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -5,6 +5,7 @@
import com.xcong.excoin.netty.common.Contans;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.dispatch.MsgDispatch;
import com.xcong.excoin.utils.SpringContextHolder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
@@ -29,9 +30,11 @@
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
/**
 * @author wzy
 * @email wangdoubleone@gmail.com
 * @date 2019-05-06
 * 项目启动时,在控制台有
 * Unable to proxy interface-implementing method [public final void io.netty.channel.ChannelInitializer.channelRegistered(io.netty.channel.ChannelHandlerContext) throws java.lang.Exception] because it is marked as final: Consider using interface-based JDK proxies instead!
 * 输出
 * 表明,此类将走代理enhancerbyspringcglib代理
 * 此时,获取到此类将为null(不知道原因),从而导致netty连接在初始化时会有空指针异常
 */
@Slf4j
@Component
@@ -44,8 +47,8 @@
    private WebSocketServerHandshaker handshaker;
    @Resource(name = "msgDispatch")
    private MsgDispatch msgDispatch;
//    @Resource(name = "msgDispatch")
//    private MsgDispatch msgDispatch;
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -76,7 +79,7 @@
                if (content.contains(Contans.HEART_BEAT)) {
                    resetTimes(ctx.channel());
                } else {
                    this.msgDispatch.webSocketDispatch(ctx, content);
                    SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content);
                }
            } catch (ClassCastException e) {
                content = ((CloseWebSocketFrame) frame).reasonText();
src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,8 +19,8 @@
@Component
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
    @Autowired
    private WebSocketServerHandler webSocketServerHandler;
//    @Autowired
//    private WebSocketServerHandler webSocketServerHandler;
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
@@ -32,8 +32,8 @@
        cp.addLast(new HttpObjectAggregator(65536));
        cp.addLast(new ChunkedWriteHandler());
        // 心跳
        ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
//        ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
        // 自定义业务handler
        cp.addLast(webSocketServerHandler);
        cp.addLast(new WebSocketServerHandler());
    }
}
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -17,5 +17,16 @@
@Component
public class WebSocketLogic {
    public void authCheck(RequestBean requestBean) {
        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
        ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString()));
        ResponseBean responseBean = new ResponseBean();
        responseBean.setType(requestBean.getType());
        responseBean.setStatus(1);
        channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
    }
}
src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -1,9 +1,13 @@
package com.xcong.excoin.netty.logic.impl;
import cn.hutool.core.util.StrUtil;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.Contans;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
import com.xcong.excoin.netty.logic.WebSocketLogic;
import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -20,16 +24,15 @@
    @Override
    public void webSocketMsgLogic(RequestBean requestBean) {
//        switch (requestBean.getType()) {
//            case Contans.WEB_REQ_CONNECTION :
//                webSocketLogic.webReqConnection(requestBean);
//                break;
        switch (requestBean.getType()) {
            case Contans.AUTH_CHECK:
                webSocketLogic.authCheck(requestBean);
                break;
//            case Contans.HOME_SYMBOLS:
//                webSocketLogic.reqHomeSymbols(requestBean);
//            default:
//                webSocketLogic.defaultReq(requestBean);
//                break;
//        }
            default:
                break;
        }
    }
src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
@@ -24,8 +24,8 @@
    private ChannelFuture channelFuture;
    @Autowired
    private WebSocketServerInitializer webSocketServerInitializer;
//    @Autowired
//    private WebSocketServerInitializer webSocketServerInitializer;
    @Override
    public void start() throws Exception {
@@ -34,7 +34,7 @@
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(webSocketServerInitializer);
                    .childHandler(new WebSocketServerInitializer());
            channelFuture = b.bind(9998).sync();