src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
@@ -15,6 +15,16 @@ private Object data; private String channelId; public String getChannelId() { return channelId; } public void setChannelId(String channelId) { this.channelId = channelId; } public Object getData() { return data; } src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,77 +10,39 @@ private static final long serialVersionUID = 1L; public static final String SUCCESS = "200"; private Integer type; private String status; private Integer status; private String type; private Object data; private String info; private String channelId; private Map<Object, Object> mapInfo = new HashMap<>(); private List<?> row; public static ResponseBean ok(String type, String info) { ResponseBean responseBean = new ResponseBean(); responseBean.status = SUCCESS; responseBean.type = type; responseBean.info = info; return responseBean; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getType() { public Integer getType() { return type; } public void setType(String type) { public void setType(Integer type) { this.type = type; } public String getInfo() { return info; public Integer getStatus() { return status; } public void setInfo(String info) { this.info = info; public void setStatus(Integer status) { this.status = status; } public String getChannelId() { return channelId; public Object getData() { return data; } public void setChannelId(String channelId) { this.channelId = channelId; public void setData(Object data) { this.data = data; } public Map<Object, Object> getMapInfo() { return mapInfo; } public void setMapInfo(Map<Object, Object> mapInfo) { this.mapInfo = mapInfo; } public void putInfo(Object key, Object value) { this.mapInfo.put(key, value); } public List<?> getRow() { return row; } public void setRow(List<?> row) { this.row = row; public static ResponseBean fail(){ ResponseBean res = new ResponseBean(); res.setStatus(0); return res; } } src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -23,14 +23,20 @@ // 当前连接到服务器的通道(tcp和websocket) private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); // key - 用户ID value - 通道ID private static final ConcurrentMap<String, ChannelId> MEMBER_CHANNEL = new ConcurrentHashMap<>(); // key - 通道 value - 用户 private static final ConcurrentMap<ChannelId, String> CHANNEL_MEMBER = new ConcurrentHashMap<>(); public static void addWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.add(channel); CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); } public static void addWsChannel(Channel channel, Long memberId) { WEBSOCKET_GROUP.add(channel); CHANNEL_MAP.put(memberId.toString(), channel.id()); MEMBER_CHANNEL.put(memberId.toString(), channel.id()); CHANNEL_MEMBER.put(channel.id(), memberId.toString()); } public static void removeWebSocketChannel(Channel channel) { @@ -39,8 +45,8 @@ } public static void removeWsChannel(Channel channel, Long memberId) { WEBSOCKET_GROUP.remove(channel); CHANNEL_MAP.remove(memberId.toString()); MEMBER_CHANNEL.remove(memberId.toString()); CHANNEL_MEMBER.remove(channel.id()); } public static Channel findWebSocketChannel(String id){ @@ -49,8 +55,12 @@ } public static Channel findWsChannel(Long id){ ChannelId channelId = CHANNEL_MAP.get(id.toString()); ChannelId channelId = MEMBER_CHANNEL.get(id.toString()); return WEBSOCKET_GROUP.find(channelId); } public static String findWsMemberId(Channel channel) { return CHANNEL_MEMBER.get(channel.id()); } public static ChannelGroup getWebSocketGroup() { @@ -61,10 +71,10 @@ if (WEBSOCKET_GROUP.size() == 0) { return; } ResponseBean responseBean = ResponseBean.ok(type, null); responseBean.putInfo("data", object); String msg = JSONObject.toJSONString(responseBean); WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); // ResponseBean responseBean = ResponseBean.ok(type, null); // responseBean.putInfo("data", object); // String msg = JSONObject.toJSONString(responseBean); // WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); } src/main/java/com/xcong/excoin/netty/common/Contans.java
@@ -8,16 +8,8 @@ public static final String HEART_BEAT = "ping pong pang"; public static final String WEB_REQ_CONNECTION = "000_000"; public static final int AUTH_CHECK = 1; public static final String HOME_SYMBOLS = "001_001"; public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001"; public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002"; public static final String ORDER_PRE_ORDER_DATA = "003_001"; public static final String ORDER_FIND_TRUST_ORDER = "003_002"; public static final int MESSAGE = 2; } src/main/java/com/xcong/excoin/netty/common/NettyTools.java
@@ -1,5 +1,7 @@ package com.xcong.excoin.netty.common; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.ResponseBean; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; @@ -21,6 +23,11 @@ return Unpooled.copiedBuffer((msg + "_split").getBytes()); } public static TextWebSocketFrame wsSendMsg(ResponseBean responseBean) { String res = JSONObject.toJSONString(responseBean); return new TextWebSocketFrame(res); } public static TextWebSocketFrame webSocketBytes(String msg) { return new TextWebSocketFrame(msg); } src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -1,9 +1,14 @@ package com.xcong.excoin.netty.dispatch; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.bean.ResponseBean; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; @@ -19,27 +24,26 @@ */ @Slf4j @Component("msgDispatch") public class MsgDispatch implements ApplicationContextAware { private ApplicationContext applicationContext; public class MsgDispatch { @Autowired private MsgLogic msgLogic; public void webSocketDispatch(ChannelHandlerContext ctx, String msg) { log.info("==========={}", msg); RequestBean requestBean = null; RequestBean requestBean = JSONObject.parseObject(msg, RequestBean.class); // 判断当前通道用户是否已经登陆 if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) { ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123")); return; } try { requestBean.setChannelId(ctx.channel().id().asShortText()); msgLogic.webSocketMsgLogic(requestBean); } catch (Exception e) { log.info("#websocket json error:{}#", e); ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error")); ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail())); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } } src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -5,6 +5,7 @@ import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.dispatch.MsgDispatch; import com.xcong.excoin.utils.SpringContextHolder; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; @@ -29,9 +30,11 @@ import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-06 * 项目启动时,在控制台有 * Unable to proxy interface-implementing method [public final void io.netty.channel.ChannelInitializer.channelRegistered(io.netty.channel.ChannelHandlerContext) throws java.lang.Exception] because it is marked as final: Consider using interface-based JDK proxies instead! * 输出 * 表明,此类将走代理enhancerbyspringcglib代理 * 此时,获取到此类将为null(不知道原因),从而导致netty连接在初始化时会有空指针异常 */ @Slf4j @Component @@ -44,8 +47,8 @@ private WebSocketServerHandshaker handshaker; @Resource(name = "msgDispatch") private MsgDispatch msgDispatch; // @Resource(name = "msgDispatch") // private MsgDispatch msgDispatch; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -76,7 +79,7 @@ if (content.contains(Contans.HEART_BEAT)) { resetTimes(ctx.channel()); } else { this.msgDispatch.webSocketDispatch(ctx, content); SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content); } } catch (ClassCastException e) { content = ((CloseWebSocketFrame) frame).reasonText(); src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,8 +19,8 @@ @Component public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { @Autowired private WebSocketServerHandler webSocketServerHandler; // @Autowired // private WebSocketServerHandler webSocketServerHandler; @Override protected void initChannel(NioSocketChannel ch) throws Exception { @@ -32,8 +32,8 @@ cp.addLast(new HttpObjectAggregator(65536)); cp.addLast(new ChunkedWriteHandler()); // 心跳 ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); // ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); // 自定义业务handler cp.addLast(webSocketServerHandler); cp.addLast(new WebSocketServerHandler()); } } src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -17,5 +17,16 @@ @Component public class WebSocketLogic { public void authCheck(RequestBean requestBean) { Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString())); ResponseBean responseBean = new ResponseBean(); responseBean.setType(requestBean.getType()); responseBean.setStatus(1); channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); } } src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -1,9 +1,13 @@ package com.xcong.excoin.netty.logic.impl; import cn.hutool.core.util.StrUtil; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; import com.xcong.excoin.netty.logic.WebSocketLogic; import io.netty.channel.Channel; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -20,16 +24,15 @@ @Override public void webSocketMsgLogic(RequestBean requestBean) { // switch (requestBean.getType()) { // case Contans.WEB_REQ_CONNECTION : // webSocketLogic.webReqConnection(requestBean); // break; switch (requestBean.getType()) { case Contans.AUTH_CHECK: webSocketLogic.authCheck(requestBean); break; // case Contans.HOME_SYMBOLS: // webSocketLogic.reqHomeSymbols(requestBean); // default: // webSocketLogic.defaultReq(requestBean); // break; // } default: break; } } src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
@@ -24,8 +24,8 @@ private ChannelFuture channelFuture; @Autowired private WebSocketServerInitializer webSocketServerInitializer; // @Autowired // private WebSocketServerInitializer webSocketServerInitializer; @Override public void start() throws Exception { @@ -34,7 +34,7 @@ ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) .channel(NioServerSocketChannel.class) .childHandler(webSocketServerInitializer); .childHandler(new WebSocketServerInitializer()); channelFuture = b.bind(9998).sync();