From 21f93fe6b95e868659bc1af9658a0cd7ba43b203 Mon Sep 17 00:00:00 2001
From: xiaoyong931011 <15274802129@163.com>
Date: Wed, 26 May 2021 20:15:20 +0800
Subject: [PATCH] Merge branch 'otc' of http://120.27.238.55:7000/r/exchange into otc
---
src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java | 21 +++--
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 11 ++
src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java | 72 ++++-------------
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 28 ++++--
src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java | 6
src/main/java/com/xcong/excoin/netty/common/NettyTools.java | 7 +
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 28 ++++---
src/main/java/com/xcong/excoin/netty/common/Contans.java | 12 --
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 15 ++-
src/main/java/com/xcong/excoin/netty/bean/RequestBean.java | 10 ++
src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java | 8 +-
11 files changed, 110 insertions(+), 108 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
index 22bd4ab..7ec536a 100644
--- a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
+++ b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
@@ -15,6 +15,16 @@
private Object data;
+ private String channelId;
+
+ public String getChannelId() {
+ return channelId;
+ }
+
+ public void setChannelId(String channelId) {
+ this.channelId = channelId;
+ }
+
public Object getData() {
return data;
}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
index 78a1fbf..a5e63a7 100644
--- a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
+++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,77 +10,39 @@
private static final long serialVersionUID = 1L;
- public static final String SUCCESS = "200";
+ private Integer type;
- private String status;
+ private Integer status;
- private String type;
+ private Object data;
- private String info;
-
- private String channelId;
-
- private Map<Object, Object> mapInfo = new HashMap<>();
-
- private List<?> row;
-
- public static ResponseBean ok(String type, String info) {
- ResponseBean responseBean = new ResponseBean();
- responseBean.status = SUCCESS;
- responseBean.type = type;
- responseBean.info = info;
- return responseBean;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public String getType() {
+ public Integer getType() {
return type;
}
- public void setType(String type) {
+ public void setType(Integer type) {
this.type = type;
}
- public String getInfo() {
- return info;
+ public Integer getStatus() {
+ return status;
}
- public void setInfo(String info) {
- this.info = info;
+ public void setStatus(Integer status) {
+ this.status = status;
}
- public String getChannelId() {
- return channelId;
+ public Object getData() {
+ return data;
}
- public void setChannelId(String channelId) {
- this.channelId = channelId;
+ public void setData(Object data) {
+ this.data = data;
}
- public Map<Object, Object> getMapInfo() {
- return mapInfo;
- }
-
- public void setMapInfo(Map<Object, Object> mapInfo) {
- this.mapInfo = mapInfo;
- }
-
- public void putInfo(Object key, Object value) {
- this.mapInfo.put(key, value);
- }
-
- public List<?> getRow() {
- return row;
- }
-
- public void setRow(List<?> row) {
- this.row = row;
+ public static ResponseBean fail(){
+ ResponseBean res = new ResponseBean();
+ res.setStatus(0);
+ return res;
}
}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 0167807..04163d6 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -23,14 +23,20 @@
// 当前连接到服务器的通道(tcp和websocket)
private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
+ // key - 用户ID value - 通道ID
+ private static final ConcurrentMap<String, ChannelId> MEMBER_CHANNEL = new ConcurrentHashMap<>();
+
+ // key - 通道 value - 用户
+ private static final ConcurrentMap<ChannelId, String> CHANNEL_MEMBER = new ConcurrentHashMap<>();
+
public static void addWebSocketChannel(Channel channel) {
WEBSOCKET_GROUP.add(channel);
CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
}
public static void addWsChannel(Channel channel, Long memberId) {
- WEBSOCKET_GROUP.add(channel);
- CHANNEL_MAP.put(memberId.toString(), channel.id());
+ MEMBER_CHANNEL.put(memberId.toString(), channel.id());
+ CHANNEL_MEMBER.put(channel.id(), memberId.toString());
}
public static void removeWebSocketChannel(Channel channel) {
@@ -39,8 +45,8 @@
}
public static void removeWsChannel(Channel channel, Long memberId) {
- WEBSOCKET_GROUP.remove(channel);
- CHANNEL_MAP.remove(memberId.toString());
+ MEMBER_CHANNEL.remove(memberId.toString());
+ CHANNEL_MEMBER.remove(channel.id());
}
public static Channel findWebSocketChannel(String id){
@@ -49,8 +55,12 @@
}
public static Channel findWsChannel(Long id){
- ChannelId channelId = CHANNEL_MAP.get(id.toString());
+ ChannelId channelId = MEMBER_CHANNEL.get(id.toString());
return WEBSOCKET_GROUP.find(channelId);
+ }
+
+ public static String findWsMemberId(Channel channel) {
+ return CHANNEL_MEMBER.get(channel.id());
}
public static ChannelGroup getWebSocketGroup() {
@@ -61,10 +71,10 @@
if (WEBSOCKET_GROUP.size() == 0) {
return;
}
- ResponseBean responseBean = ResponseBean.ok(type, null);
- responseBean.putInfo("data", object);
- String msg = JSONObject.toJSONString(responseBean);
- WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
+// ResponseBean responseBean = ResponseBean.ok(type, null);
+// responseBean.putInfo("data", object);
+// String msg = JSONObject.toJSONString(responseBean);
+// WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
}
diff --git a/src/main/java/com/xcong/excoin/netty/common/Contans.java b/src/main/java/com/xcong/excoin/netty/common/Contans.java
index 1a64913..17bc2c2 100644
--- a/src/main/java/com/xcong/excoin/netty/common/Contans.java
+++ b/src/main/java/com/xcong/excoin/netty/common/Contans.java
@@ -8,16 +8,8 @@
public static final String HEART_BEAT = "ping pong pang";
- public static final String WEB_REQ_CONNECTION = "000_000";
+ public static final int AUTH_CHECK = 1;
- public static final String HOME_SYMBOLS = "001_001";
-
- public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001";
-
- public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002";
-
- public static final String ORDER_PRE_ORDER_DATA = "003_001";
-
- public static final String ORDER_FIND_TRUST_ORDER = "003_002";
+ public static final int MESSAGE = 2;
}
diff --git a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
index ead90db..413888b 100644
--- a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
+++ b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
@@ -1,5 +1,7 @@
package com.xcong.excoin.netty.common;
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.ResponseBean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
@@ -21,6 +23,11 @@
return Unpooled.copiedBuffer((msg + "_split").getBytes());
}
+ public static TextWebSocketFrame wsSendMsg(ResponseBean responseBean) {
+ String res = JSONObject.toJSONString(responseBean);
+ return new TextWebSocketFrame(res);
+ }
+
public static TextWebSocketFrame webSocketBytes(String msg) {
return new TextWebSocketFrame(msg);
}
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
index 9a296df..9742e1a 100644
--- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -1,9 +1,14 @@
package com.xcong.excoin.netty.dispatch;
+import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.bean.ResponseBean;
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.Contans;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
+import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeansException;
@@ -19,27 +24,26 @@
*/
@Slf4j
@Component("msgDispatch")
-public class MsgDispatch implements ApplicationContextAware {
-
- private ApplicationContext applicationContext;
+public class MsgDispatch {
@Autowired
private MsgLogic msgLogic;
public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
- log.info("==========={}", msg);
- RequestBean requestBean = null;
+ RequestBean requestBean = JSONObject.parseObject(msg, RequestBean.class);
+
+ // 判断当前通道用户是否已经登陆
+ if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) {
+ ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123"));
+ return;
+ }
+
try {
+ requestBean.setChannelId(ctx.channel().id().asShortText());
msgLogic.webSocketMsgLogic(requestBean);
} catch (Exception e) {
log.info("#websocket json error:{}#", e);
- ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
+ ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail()));
}
- }
-
-
- @Override
- public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
- this.applicationContext = applicationContext;
}
}
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
index 2ceb827..7b5cd3d 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -5,6 +5,7 @@
import com.xcong.excoin.netty.common.Contans;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.dispatch.MsgDispatch;
+import com.xcong.excoin.utils.SpringContextHolder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
@@ -29,9 +30,11 @@
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
/**
- * @author wzy
- * @email wangdoubleone@gmail.com
- * @date 2019-05-06
+ * 项目启动时,在控制台有
+ * Unable to proxy interface-implementing method [public final void io.netty.channel.ChannelInitializer.channelRegistered(io.netty.channel.ChannelHandlerContext) throws java.lang.Exception] because it is marked as final: Consider using interface-based JDK proxies instead!
+ * 输出
+ * 表明,此类将走代理enhancerbyspringcglib代理
+ * 此时,获取到此类将为null(不知道原因),从而导致netty连接在初始化时会有空指针异常
*/
@Slf4j
@Component
@@ -44,8 +47,8 @@
private WebSocketServerHandshaker handshaker;
- @Resource(name = "msgDispatch")
- private MsgDispatch msgDispatch;
+// @Resource(name = "msgDispatch")
+// private MsgDispatch msgDispatch;
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
@@ -76,7 +79,7 @@
if (content.contains(Contans.HEART_BEAT)) {
resetTimes(ctx.channel());
} else {
- this.msgDispatch.webSocketDispatch(ctx, content);
+ SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content);
}
} catch (ClassCastException e) {
content = ((CloseWebSocketFrame) frame).reasonText();
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
index 54cb224..c202638 100644
--- a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,8 +19,8 @@
@Component
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
- @Autowired
- private WebSocketServerHandler webSocketServerHandler;
+// @Autowired
+// private WebSocketServerHandler webSocketServerHandler;
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
@@ -32,8 +32,8 @@
cp.addLast(new HttpObjectAggregator(65536));
cp.addLast(new ChunkedWriteHandler());
// 心跳
- ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
+// ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
// 自定义业务handler
- cp.addLast(webSocketServerHandler);
+ cp.addLast(new WebSocketServerHandler());
}
}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
index 2b0b493..16b6f52 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -17,5 +17,16 @@
@Component
public class WebSocketLogic {
+ public void authCheck(RequestBean requestBean) {
+ Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+
+ ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString()));
+
+ ResponseBean responseBean = new ResponseBean();
+ responseBean.setType(requestBean.getType());
+ responseBean.setStatus(1);
+ channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+ }
+
}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
index 7fb6fa9..875461a 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -1,9 +1,13 @@
package com.xcong.excoin.netty.logic.impl;
+import cn.hutool.core.util.StrUtil;
import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.Contans;
+import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
import com.xcong.excoin.netty.logic.WebSocketLogic;
+import io.netty.channel.Channel;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -20,16 +24,15 @@
@Override
public void webSocketMsgLogic(RequestBean requestBean) {
-// switch (requestBean.getType()) {
-// case Contans.WEB_REQ_CONNECTION :
-// webSocketLogic.webReqConnection(requestBean);
-// break;
+
+ switch (requestBean.getType()) {
+ case Contans.AUTH_CHECK:
+ webSocketLogic.authCheck(requestBean);
+ break;
// case Contans.HOME_SYMBOLS:
-// webSocketLogic.reqHomeSymbols(requestBean);
-// default:
-// webSocketLogic.defaultReq(requestBean);
-// break;
-// }
+ default:
+ break;
+ }
}
diff --git a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
index f40c91d..8ef1aef 100644
--- a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
+++ b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
@@ -24,8 +24,8 @@
private ChannelFuture channelFuture;
- @Autowired
- private WebSocketServerInitializer webSocketServerInitializer;
+// @Autowired
+// private WebSocketServerInitializer webSocketServerInitializer;
@Override
public void start() throws Exception {
@@ -34,7 +34,7 @@
ServerBootstrap b = new ServerBootstrap();
b.group(boss, work)
.channel(NioServerSocketChannel.class)
- .childHandler(webSocketServerInitializer);
+ .childHandler(new WebSocketServerInitializer());
channelFuture = b.bind(9998).sync();
--
Gitblit v1.9.1