From 21f93fe6b95e868659bc1af9658a0cd7ba43b203 Mon Sep 17 00:00:00 2001 From: xiaoyong931011 <15274802129@163.com> Date: Wed, 26 May 2021 20:15:20 +0800 Subject: [PATCH] Merge branch 'otc' of http://120.27.238.55:7000/r/exchange into otc --- src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java | 21 +++-- src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 11 ++ src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java | 72 ++++------------- src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 28 ++++-- src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java | 6 src/main/java/com/xcong/excoin/netty/common/NettyTools.java | 7 + src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 28 ++++--- src/main/java/com/xcong/excoin/netty/common/Contans.java | 12 -- src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 15 ++- src/main/java/com/xcong/excoin/netty/bean/RequestBean.java | 10 ++ src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java | 8 +- 11 files changed, 110 insertions(+), 108 deletions(-) diff --git a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java index 22bd4ab..7ec536a 100644 --- a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java +++ b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java @@ -15,6 +15,16 @@ private Object data; + private String channelId; + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + public Object getData() { return data; } diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java index 78a1fbf..a5e63a7 100644 --- a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java +++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java @@ -10,77 +10,39 @@ private static final long serialVersionUID = 1L; - public static final String SUCCESS = "200"; + private Integer type; - private String status; + private Integer status; - private String type; + private Object data; - private String info; - - private String channelId; - - private Map<Object, Object> mapInfo = new HashMap<>(); - - private List<?> row; - - public static ResponseBean ok(String type, String info) { - ResponseBean responseBean = new ResponseBean(); - responseBean.status = SUCCESS; - responseBean.type = type; - responseBean.info = info; - return responseBean; - } - - public String getStatus() { - return status; - } - - public void setStatus(String status) { - this.status = status; - } - - public String getType() { + public Integer getType() { return type; } - public void setType(String type) { + public void setType(Integer type) { this.type = type; } - public String getInfo() { - return info; + public Integer getStatus() { + return status; } - public void setInfo(String info) { - this.info = info; + public void setStatus(Integer status) { + this.status = status; } - public String getChannelId() { - return channelId; + public Object getData() { + return data; } - public void setChannelId(String channelId) { - this.channelId = channelId; + public void setData(Object data) { + this.data = data; } - public Map<Object, Object> getMapInfo() { - return mapInfo; - } - - public void setMapInfo(Map<Object, Object> mapInfo) { - this.mapInfo = mapInfo; - } - - public void putInfo(Object key, Object value) { - this.mapInfo.put(key, value); - } - - public List<?> getRow() { - return row; - } - - public void setRow(List<?> row) { - this.row = row; + public static ResponseBean fail(){ + ResponseBean res = new ResponseBean(); + res.setStatus(0); + return res; } } diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java index 0167807..04163d6 100644 --- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java +++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java @@ -23,14 +23,20 @@ // 当前连接到服务器的通道(tcp和websocket) private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); + // key - 用户ID value - 通道ID + private static final ConcurrentMap<String, ChannelId> MEMBER_CHANNEL = new ConcurrentHashMap<>(); + + // key - 通道 value - 用户 + private static final ConcurrentMap<ChannelId, String> CHANNEL_MEMBER = new ConcurrentHashMap<>(); + public static void addWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.add(channel); CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); } public static void addWsChannel(Channel channel, Long memberId) { - WEBSOCKET_GROUP.add(channel); - CHANNEL_MAP.put(memberId.toString(), channel.id()); + MEMBER_CHANNEL.put(memberId.toString(), channel.id()); + CHANNEL_MEMBER.put(channel.id(), memberId.toString()); } public static void removeWebSocketChannel(Channel channel) { @@ -39,8 +45,8 @@ } public static void removeWsChannel(Channel channel, Long memberId) { - WEBSOCKET_GROUP.remove(channel); - CHANNEL_MAP.remove(memberId.toString()); + MEMBER_CHANNEL.remove(memberId.toString()); + CHANNEL_MEMBER.remove(channel.id()); } public static Channel findWebSocketChannel(String id){ @@ -49,8 +55,12 @@ } public static Channel findWsChannel(Long id){ - ChannelId channelId = CHANNEL_MAP.get(id.toString()); + ChannelId channelId = MEMBER_CHANNEL.get(id.toString()); return WEBSOCKET_GROUP.find(channelId); + } + + public static String findWsMemberId(Channel channel) { + return CHANNEL_MEMBER.get(channel.id()); } public static ChannelGroup getWebSocketGroup() { @@ -61,10 +71,10 @@ if (WEBSOCKET_GROUP.size() == 0) { return; } - ResponseBean responseBean = ResponseBean.ok(type, null); - responseBean.putInfo("data", object); - String msg = JSONObject.toJSONString(responseBean); - WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); +// ResponseBean responseBean = ResponseBean.ok(type, null); +// responseBean.putInfo("data", object); +// String msg = JSONObject.toJSONString(responseBean); +// WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); } diff --git a/src/main/java/com/xcong/excoin/netty/common/Contans.java b/src/main/java/com/xcong/excoin/netty/common/Contans.java index 1a64913..17bc2c2 100644 --- a/src/main/java/com/xcong/excoin/netty/common/Contans.java +++ b/src/main/java/com/xcong/excoin/netty/common/Contans.java @@ -8,16 +8,8 @@ public static final String HEART_BEAT = "ping pong pang"; - public static final String WEB_REQ_CONNECTION = "000_000"; + public static final int AUTH_CHECK = 1; - public static final String HOME_SYMBOLS = "001_001"; - - public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001"; - - public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002"; - - public static final String ORDER_PRE_ORDER_DATA = "003_001"; - - public static final String ORDER_FIND_TRUST_ORDER = "003_002"; + public static final int MESSAGE = 2; } diff --git a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java index ead90db..413888b 100644 --- a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java +++ b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java @@ -1,5 +1,7 @@ package com.xcong.excoin.netty.common; +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.netty.bean.ResponseBean; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; @@ -21,6 +23,11 @@ return Unpooled.copiedBuffer((msg + "_split").getBytes()); } + public static TextWebSocketFrame wsSendMsg(ResponseBean responseBean) { + String res = JSONObject.toJSONString(responseBean); + return new TextWebSocketFrame(res); + } + public static TextWebSocketFrame webSocketBytes(String msg) { return new TextWebSocketFrame(msg); } diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java index 9a296df..9742e1a 100644 --- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java +++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java @@ -1,9 +1,14 @@ package com.xcong.excoin.netty.dispatch; +import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; +import com.xcong.excoin.netty.bean.ResponseBean; +import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; @@ -19,27 +24,26 @@ */ @Slf4j @Component("msgDispatch") -public class MsgDispatch implements ApplicationContextAware { - - private ApplicationContext applicationContext; +public class MsgDispatch { @Autowired private MsgLogic msgLogic; public void webSocketDispatch(ChannelHandlerContext ctx, String msg) { - log.info("==========={}", msg); - RequestBean requestBean = null; + RequestBean requestBean = JSONObject.parseObject(msg, RequestBean.class); + + // 判断当前通道用户是否已经登陆 + if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) { + ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123")); + return; + } + try { + requestBean.setChannelId(ctx.channel().id().asShortText()); msgLogic.webSocketMsgLogic(requestBean); } catch (Exception e) { log.info("#websocket json error:{}#", e); - ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error")); + ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail())); } - } - - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; } } diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java index 2ceb827..7b5cd3d 100644 --- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java +++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java @@ -5,6 +5,7 @@ import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.dispatch.MsgDispatch; +import com.xcong.excoin.utils.SpringContextHolder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; @@ -29,9 +30,11 @@ import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; /** - * @author wzy - * @email wangdoubleone@gmail.com - * @date 2019-05-06 + * 项目启动时,在控制台有 + * Unable to proxy interface-implementing method [public final void io.netty.channel.ChannelInitializer.channelRegistered(io.netty.channel.ChannelHandlerContext) throws java.lang.Exception] because it is marked as final: Consider using interface-based JDK proxies instead! + * 输出 + * 表明,此类将走代理enhancerbyspringcglib代理 + * 此时,获取到此类将为null(不知道原因),从而导致netty连接在初始化时会有空指针异常 */ @Slf4j @Component @@ -44,8 +47,8 @@ private WebSocketServerHandshaker handshaker; - @Resource(name = "msgDispatch") - private MsgDispatch msgDispatch; +// @Resource(name = "msgDispatch") +// private MsgDispatch msgDispatch; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -76,7 +79,7 @@ if (content.contains(Contans.HEART_BEAT)) { resetTimes(ctx.channel()); } else { - this.msgDispatch.webSocketDispatch(ctx, content); + SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content); } } catch (ClassCastException e) { content = ((CloseWebSocketFrame) frame).reasonText(); diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java index 54cb224..c202638 100644 --- a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java +++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java @@ -19,8 +19,8 @@ @Component public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { - @Autowired - private WebSocketServerHandler webSocketServerHandler; +// @Autowired +// private WebSocketServerHandler webSocketServerHandler; @Override protected void initChannel(NioSocketChannel ch) throws Exception { @@ -32,8 +32,8 @@ cp.addLast(new HttpObjectAggregator(65536)); cp.addLast(new ChunkedWriteHandler()); // 心跳 - ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); +// ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); // 自定义业务handler - cp.addLast(webSocketServerHandler); + cp.addLast(new WebSocketServerHandler()); } } diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java index 2b0b493..16b6f52 100644 --- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java +++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java @@ -17,5 +17,16 @@ @Component public class WebSocketLogic { + public void authCheck(RequestBean requestBean) { + Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); + + ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString())); + + ResponseBean responseBean = new ResponseBean(); + responseBean.setType(requestBean.getType()); + responseBean.setStatus(1); + channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); + } + } diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java index 7fb6fa9..875461a 100644 --- a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java +++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java @@ -1,9 +1,13 @@ package com.xcong.excoin.netty.logic.impl; +import cn.hutool.core.util.StrUtil; import com.xcong.excoin.netty.bean.RequestBean; +import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.Contans; +import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; import com.xcong.excoin.netty.logic.WebSocketLogic; +import io.netty.channel.Channel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -20,16 +24,15 @@ @Override public void webSocketMsgLogic(RequestBean requestBean) { -// switch (requestBean.getType()) { -// case Contans.WEB_REQ_CONNECTION : -// webSocketLogic.webReqConnection(requestBean); -// break; + + switch (requestBean.getType()) { + case Contans.AUTH_CHECK: + webSocketLogic.authCheck(requestBean); + break; // case Contans.HOME_SYMBOLS: -// webSocketLogic.reqHomeSymbols(requestBean); -// default: -// webSocketLogic.defaultReq(requestBean); -// break; -// } + default: + break; + } } diff --git a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java index f40c91d..8ef1aef 100644 --- a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java +++ b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java @@ -24,8 +24,8 @@ private ChannelFuture channelFuture; - @Autowired - private WebSocketServerInitializer webSocketServerInitializer; +// @Autowired +// private WebSocketServerInitializer webSocketServerInitializer; @Override public void start() throws Exception { @@ -34,7 +34,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) .channel(NioServerSocketChannel.class) - .childHandler(webSocketServerInitializer); + .childHandler(new WebSocketServerInitializer()); channelFuture = b.bind(9998).sync(); -- Gitblit v1.9.1