From 21f93fe6b95e868659bc1af9658a0cd7ba43b203 Mon Sep 17 00:00:00 2001
From: xiaoyong931011 <15274802129@163.com>
Date: Wed, 26 May 2021 20:15:20 +0800
Subject: [PATCH] Merge branch 'otc' of http://120.27.238.55:7000/r/exchange into otc

---
 src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java               |   21 +++--
 src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java                  |   11 ++
 src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java                     |   72 ++++-------------
 src/main/java/com/xcong/excoin/netty/common/ChannelManager.java                 |   28 ++++--
 src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java                |    6 
 src/main/java/com/xcong/excoin/netty/common/NettyTools.java                     |    7 +
 src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java                  |   28 ++++---
 src/main/java/com/xcong/excoin/netty/common/Contans.java                        |   12 --
 src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java        |   15 ++-
 src/main/java/com/xcong/excoin/netty/bean/RequestBean.java                      |   10 ++
 src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java |    8 +-
 11 files changed, 110 insertions(+), 108 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
index 22bd4ab..7ec536a 100644
--- a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
+++ b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
@@ -15,6 +15,16 @@
 
     private Object data;
 
+    private String channelId;
+
+    public String getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(String channelId) {
+        this.channelId = channelId;
+    }
+
     public Object getData() {
         return data;
     }
diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
index 78a1fbf..a5e63a7 100644
--- a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
+++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,77 +10,39 @@
 
     private static final long serialVersionUID = 1L;
 
-    public static final String SUCCESS = "200";
+    private Integer type;
 
-    private String status;
+    private Integer status;
 
-    private String type;
+    private Object data;
 
-    private String info;
-
-    private String channelId;
-
-    private Map<Object, Object> mapInfo = new HashMap<>();
-
-    private List<?> row;
-
-    public static ResponseBean ok(String type, String info) {
-        ResponseBean responseBean = new ResponseBean();
-        responseBean.status = SUCCESS;
-        responseBean.type = type;
-        responseBean.info = info;
-        return responseBean;
-    }
-
-    public String getStatus() {
-        return status;
-    }
-
-    public void setStatus(String status) {
-        this.status = status;
-    }
-
-    public String getType() {
+    public Integer getType() {
         return type;
     }
 
-    public void setType(String type) {
+    public void setType(Integer type) {
         this.type = type;
     }
 
-    public String getInfo() {
-        return info;
+    public Integer getStatus() {
+        return status;
     }
 
-    public void setInfo(String info) {
-        this.info = info;
+    public void setStatus(Integer status) {
+        this.status = status;
     }
 
-    public String getChannelId() {
-        return channelId;
+    public Object getData() {
+        return data;
     }
 
-    public void setChannelId(String channelId) {
-        this.channelId = channelId;
+    public void setData(Object data) {
+        this.data = data;
     }
 
-    public Map<Object, Object> getMapInfo() {
-        return mapInfo;
-    }
-
-    public void setMapInfo(Map<Object, Object> mapInfo) {
-        this.mapInfo = mapInfo;
-    }
-
-    public void putInfo(Object key, Object value) {
-        this.mapInfo.put(key, value);
-    }
-
-    public List<?> getRow() {
-        return row;
-    }
-
-    public void setRow(List<?> row) {
-        this.row = row;
+    public static ResponseBean fail(){
+        ResponseBean res = new ResponseBean();
+        res.setStatus(0);
+        return res;
     }
 }
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 0167807..04163d6 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -23,14 +23,20 @@
     // 当前连接到服务器的通道(tcp和websocket)
     private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
 
+    // key - 用户ID value - 通道ID
+    private static final ConcurrentMap<String, ChannelId> MEMBER_CHANNEL = new ConcurrentHashMap<>();
+
+    // key - 通道 value - 用户
+    private static final ConcurrentMap<ChannelId, String> CHANNEL_MEMBER = new ConcurrentHashMap<>();
+
     public static void addWebSocketChannel(Channel channel) {
         WEBSOCKET_GROUP.add(channel);
         CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
     }
 
     public static void addWsChannel(Channel channel, Long memberId) {
-        WEBSOCKET_GROUP.add(channel);
-        CHANNEL_MAP.put(memberId.toString(), channel.id());
+        MEMBER_CHANNEL.put(memberId.toString(), channel.id());
+        CHANNEL_MEMBER.put(channel.id(), memberId.toString());
     }
 
     public static void removeWebSocketChannel(Channel channel) {
@@ -39,8 +45,8 @@
     }
 
     public static void removeWsChannel(Channel channel, Long memberId) {
-        WEBSOCKET_GROUP.remove(channel);
-        CHANNEL_MAP.remove(memberId.toString());
+        MEMBER_CHANNEL.remove(memberId.toString());
+        CHANNEL_MEMBER.remove(channel.id());
     }
 
     public static Channel findWebSocketChannel(String id){
@@ -49,8 +55,12 @@
     }
 
     public static Channel findWsChannel(Long id){
-        ChannelId channelId = CHANNEL_MAP.get(id.toString());
+        ChannelId channelId = MEMBER_CHANNEL.get(id.toString());
         return WEBSOCKET_GROUP.find(channelId);
+    }
+
+    public static String findWsMemberId(Channel channel) {
+        return CHANNEL_MEMBER.get(channel.id());
     }
 
     public static ChannelGroup getWebSocketGroup() {
@@ -61,10 +71,10 @@
         if (WEBSOCKET_GROUP.size() == 0) {
             return;
         }
-        ResponseBean responseBean = ResponseBean.ok(type, null);
-        responseBean.putInfo("data", object);
-        String msg = JSONObject.toJSONString(responseBean);
-        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
+//        ResponseBean responseBean = ResponseBean.ok(type, null);
+//        responseBean.putInfo("data", object);
+//        String msg = JSONObject.toJSONString(responseBean);
+//        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
     }
 
 
diff --git a/src/main/java/com/xcong/excoin/netty/common/Contans.java b/src/main/java/com/xcong/excoin/netty/common/Contans.java
index 1a64913..17bc2c2 100644
--- a/src/main/java/com/xcong/excoin/netty/common/Contans.java
+++ b/src/main/java/com/xcong/excoin/netty/common/Contans.java
@@ -8,16 +8,8 @@
 
     public static final String HEART_BEAT = "ping pong pang";
 
-    public static final String WEB_REQ_CONNECTION = "000_000";
+    public static final int AUTH_CHECK = 1;
 
-    public static final String HOME_SYMBOLS = "001_001";
-
-    public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001";
-
-    public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002";
-
-    public static final String ORDER_PRE_ORDER_DATA = "003_001";
-
-    public static final String ORDER_FIND_TRUST_ORDER = "003_002";
+    public static final int MESSAGE = 2;
 
 }
diff --git a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
index ead90db..413888b 100644
--- a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
+++ b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
@@ -1,5 +1,7 @@
 package com.xcong.excoin.netty.common;
 
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.ResponseBean;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
@@ -21,6 +23,11 @@
         return Unpooled.copiedBuffer((msg + "_split").getBytes());
     }
 
+    public static TextWebSocketFrame wsSendMsg(ResponseBean responseBean) {
+        String res = JSONObject.toJSONString(responseBean);
+        return new TextWebSocketFrame(res);
+    }
+
     public static TextWebSocketFrame webSocketBytes(String msg) {
         return new TextWebSocketFrame(msg);
     }
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
index 9a296df..9742e1a 100644
--- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -1,9 +1,14 @@
 package com.xcong.excoin.netty.dispatch;
 
+import cn.hutool.core.util.StrUtil;
 import com.alibaba.fastjson.JSONObject;
 import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.bean.ResponseBean;
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.Contans;
 import com.xcong.excoin.netty.common.NettyTools;
 import com.xcong.excoin.netty.logic.MsgLogic;
+import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.BeansException;
@@ -19,27 +24,26 @@
  */
 @Slf4j
 @Component("msgDispatch")
-public class MsgDispatch implements ApplicationContextAware {
-
-    private ApplicationContext applicationContext;
+public class MsgDispatch {
 
     @Autowired
     private MsgLogic msgLogic;
 
     public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
-        log.info("==========={}", msg);
-        RequestBean requestBean = null;
+        RequestBean requestBean = JSONObject.parseObject(msg, RequestBean.class);
+
+        // 判断当前通道用户是否已经登陆
+        if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) {
+            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123"));
+            return;
+        }
+
         try {
+            requestBean.setChannelId(ctx.channel().id().asShortText());
             msgLogic.webSocketMsgLogic(requestBean);
         } catch (Exception e) {
             log.info("#websocket json error:{}#", e);
-            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
+            ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail()));
         }
-    }
-
-
-    @Override
-    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
-        this.applicationContext = applicationContext;
     }
 }
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
index 2ceb827..7b5cd3d 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -5,6 +5,7 @@
 import com.xcong.excoin.netty.common.Contans;
 import com.xcong.excoin.netty.common.NettyTools;
 import com.xcong.excoin.netty.dispatch.MsgDispatch;
+import com.xcong.excoin.utils.SpringContextHolder;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.*;
@@ -29,9 +30,11 @@
 import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
 
 /**
- * @author wzy
- * @email wangdoubleone@gmail.com
- * @date 2019-05-06
+ * 项目启动时,在控制台有
+ * Unable to proxy interface-implementing method [public final void io.netty.channel.ChannelInitializer.channelRegistered(io.netty.channel.ChannelHandlerContext) throws java.lang.Exception] because it is marked as final: Consider using interface-based JDK proxies instead!
+ * 输出
+ * 表明,此类将走代理enhancerbyspringcglib代理
+ * 此时,获取到此类将为null(不知道原因),从而导致netty连接在初始化时会有空指针异常
  */
 @Slf4j
 @Component
@@ -44,8 +47,8 @@
 
     private WebSocketServerHandshaker handshaker;
 
-    @Resource(name = "msgDispatch")
-    private MsgDispatch msgDispatch;
+//    @Resource(name = "msgDispatch")
+//    private MsgDispatch msgDispatch;
 
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -76,7 +79,7 @@
                 if (content.contains(Contans.HEART_BEAT)) {
                     resetTimes(ctx.channel());
                 } else {
-                    this.msgDispatch.webSocketDispatch(ctx, content);
+                    SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content);
                 }
             } catch (ClassCastException e) {
                 content = ((CloseWebSocketFrame) frame).reasonText();
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
index 54cb224..c202638 100644
--- a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,8 +19,8 @@
 @Component
 public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
 
-    @Autowired
-    private WebSocketServerHandler webSocketServerHandler;
+//    @Autowired
+//    private WebSocketServerHandler webSocketServerHandler;
 
     @Override
     protected void initChannel(NioSocketChannel ch) throws Exception {
@@ -32,8 +32,8 @@
         cp.addLast(new HttpObjectAggregator(65536));
         cp.addLast(new ChunkedWriteHandler());
         // 心跳
-        ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
+//        ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
         // 自定义业务handler
-        cp.addLast(webSocketServerHandler);
+        cp.addLast(new WebSocketServerHandler());
     }
 }
diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
index 2b0b493..16b6f52 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -17,5 +17,16 @@
 @Component
 public class WebSocketLogic {
 
+    public void authCheck(RequestBean requestBean) {
+        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+
+        ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString()));
+
+        ResponseBean responseBean = new ResponseBean();
+        responseBean.setType(requestBean.getType());
+        responseBean.setStatus(1);
+        channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+    }
+
 
 }
diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
index 7fb6fa9..875461a 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -1,9 +1,13 @@
 package com.xcong.excoin.netty.logic.impl;
 
+import cn.hutool.core.util.StrUtil;
 import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.common.ChannelManager;
 import com.xcong.excoin.netty.common.Contans;
+import com.xcong.excoin.netty.common.NettyTools;
 import com.xcong.excoin.netty.logic.MsgLogic;
 import com.xcong.excoin.netty.logic.WebSocketLogic;
+import io.netty.channel.Channel;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
@@ -20,16 +24,15 @@
 
     @Override
     public void webSocketMsgLogic(RequestBean requestBean) {
-//        switch (requestBean.getType()) {
-//            case Contans.WEB_REQ_CONNECTION :
-//                webSocketLogic.webReqConnection(requestBean);
-//                break;
+
+        switch (requestBean.getType()) {
+            case Contans.AUTH_CHECK:
+                webSocketLogic.authCheck(requestBean);
+                break;
 //            case Contans.HOME_SYMBOLS:
-//                webSocketLogic.reqHomeSymbols(requestBean);
-//            default:
-//                webSocketLogic.defaultReq(requestBean);
-//                break;
-//        }
+            default:
+                break;
+        }
     }
 
 
diff --git a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
index f40c91d..8ef1aef 100644
--- a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
+++ b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
@@ -24,8 +24,8 @@
 
     private ChannelFuture channelFuture;
 
-    @Autowired
-    private WebSocketServerInitializer webSocketServerInitializer;
+//    @Autowired
+//    private WebSocketServerInitializer webSocketServerInitializer;
 
     @Override
     public void start() throws Exception {
@@ -34,7 +34,7 @@
             ServerBootstrap b = new ServerBootstrap();
             b.group(boss, work)
                     .channel(NioServerSocketChannel.class)
-                    .childHandler(webSocketServerInitializer);
+                    .childHandler(new WebSocketServerInitializer());
 
             channelFuture = b.bind(9998).sync();
 

--
Gitblit v1.9.1