pom.xml
@@ -147,12 +147,6 @@ <version>${mapstruct.version}</version> </dependency> <!-- <dependency>--> <!-- <groupId>org.mapstruct</groupId>--> <!-- <artifactId>mapstruct-processor</artifactId>--> <!-- <version>${mapstruct.version}</version>--> <!-- </dependency>--> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> @@ -175,6 +169,12 @@ <version>1.6.1</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.33.Final</version> </dependency> </dependencies> <build> src/main/java/com/xcong/excoin/netty/ChatServer.java
New file @@ -0,0 +1,11 @@ package com.xcong.excoin.netty; /** * @author wzy * @date 2019-05-06 */ public interface ChatServer { void start() throws Exception; void shutdown(); } src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
New file @@ -0,0 +1,90 @@ package com.xcong.excoin.netty.bean; import java.io.Serializable; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-09 */ public class RequestBean implements Serializable { private static final long serialVersionUID = 1L; /** * 请求类型 */ private String type; /** * 当前通道id */ private String channelId; /** * web端通道ID */ private String reqId; /** * 请求参数 */ private String params; /** * 手持端是否同意连接 0-否 1-是 */ private String tag; public String getType() { return type; } public void setType(String type) { this.type = type; } public String getChannelId() { return channelId; } public void setChannelId(String channelId) { this.channelId = channelId; } public String getParams() { return params; } public void setParams(String params) { this.params = params; } public String getReqId() { return reqId; } public void setReqId(String reqId) { this.reqId = reqId; } public String getTag() { return tag; } public void setTag(String tag) { this.tag = tag; } @Override public String toString() { return "RequestBean{" + "type='" + type + '\'' + ", channelId='" + channelId + '\'' + ", reqId='" + reqId + '\'' + ", params='" + params + '\'' + ", tag='" + tag + '\'' + '}'; } } src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
New file @@ -0,0 +1,89 @@ package com.xcong.excoin.netty.bean; import java.io.Serializable; import java.util.HashMap; import java.util.List; import java.util.Map; /** * @author wzy * @date 2020-04-18 16:17 **/ public class ResponseBean implements Serializable { private static final long serialVersionUID = 1L; public static final String SUCCESS = "200"; private String status; private String type; private String info; private String channelId; private Map<Object, Object> mapInfo = new HashMap<>(); private List<?> row; public static ResponseBean ok(String type, String info) { ResponseBean responseBean = new ResponseBean(); responseBean.status = SUCCESS; responseBean.type = type; responseBean.info = info; return responseBean; } public String getStatus() { return status; } public void setStatus(String status) { this.status = status; } public String getType() { return type; } public void setType(String type) { this.type = type; } public String getInfo() { return info; } public void setInfo(String info) { this.info = info; } public String getChannelId() { return channelId; } public void setChannelId(String channelId) { this.channelId = channelId; } public Map<Object, Object> getMapInfo() { return mapInfo; } public void setMapInfo(Map<Object, Object> mapInfo) { this.mapInfo = mapInfo; } public void putInfo(Object key, Object value) { this.mapInfo.put(key, value); } public List<?> getRow() { return row; } public void setRow(List<?> row) { this.row = row; } } src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
New file @@ -0,0 +1,56 @@ package com.xcong.excoin.netty.common; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.ResponseBean; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-06 */ public class ChannelManager { private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 当前连接到服务器的通道(tcp和websocket) private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); public static void addWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.add(channel); CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); } public static void removeWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.remove(channel); CHANNEL_MAP.remove(channel.id().asShortText()); } public static Channel findWebSocketChannel(String id){ ChannelId channelId = CHANNEL_MAP.get(id); return WEBSOCKET_GROUP.find(channelId); } public static ChannelGroup getWebSocketGroup() { return WEBSOCKET_GROUP; } public static void send2All(Object object, String type) { if (WEBSOCKET_GROUP.size() == 0) { return; } ResponseBean responseBean = ResponseBean.ok(type, null); responseBean.putInfo("data", object); String msg = JSONObject.toJSONString(responseBean); WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); } } src/main/java/com/xcong/excoin/netty/common/Contans.java
New file @@ -0,0 +1,23 @@ package com.xcong.excoin.netty.common; /** * @author wzy * @date 2019-05-06 */ public class Contans { public static final String HEART_BEAT = "ping pong pang"; public static final String WEB_REQ_CONNECTION = "000_000"; public static final String HOME_SYMBOLS = "001_001"; public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001"; public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002"; public static final String ORDER_PRE_ORDER_DATA = "003_001"; public static final String ORDER_FIND_TRUST_ORDER = "003_002"; } src/main/java/com/xcong/excoin/netty/common/NettyTools.java
New file @@ -0,0 +1,27 @@ package com.xcong.excoin.netty.common; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; /** * @author wzy * @date 2019-05-14 */ public class NettyTools { /** * socket字符串传输转码 * * @param msg * @return */ public static ByteBuf textBytes(String msg) { return Unpooled.copiedBuffer((msg + "_split").getBytes()); } public static TextWebSocketFrame webSocketBytes(String msg) { return new TextWebSocketFrame(msg); } } src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
New file @@ -0,0 +1,46 @@ package com.xcong.excoin.netty.dispatch; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; import io.netty.channel.ChannelHandlerContext; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeansException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @author wzy * @date 2019-05-08 */ @Slf4j @Component("msgDispatch") public class MsgDispatch implements ApplicationContextAware { private ApplicationContext applicationContext; @Autowired private MsgLogic msgLogic; public void webSocketDispatch(ChannelHandlerContext ctx, String msg) { RequestBean requestBean = null; try { requestBean = JSONObject.parseObject(msg, RequestBean.class); requestBean.setChannelId(ctx.channel().id().asShortText()); msgLogic.webSocketMsgLogic(requestBean); } catch (Exception e) { log.info("#websocket json error:{}#", e); ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error")); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } } src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
New file @@ -0,0 +1,202 @@ package com.xcong.excoin.netty.handler; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.dispatch.MsgDispatch; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-06 */ @Slf4j @Component @ChannelHandler.Sharable public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>(); private static final int MAX_UN_REC_PING_TIMES = 3; private WebSocketServerHandshaker handshaker; @Resource(name = "msgDispatch") private MsgDispatch msgDispatch; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id()); ChannelManager.addWebSocketChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("[离开websocket服务器]-->{}", ctx.channel().id()); ChannelManager.removeWebSocketChannel(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); if (msg instanceof FullHttpRequest) { // 以http请求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { // 处理websocket客户端的消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); WebSocketFrame frame = (WebSocketFrame) msg; String content; try { content = ((TextWebSocketFrame) frame).text(); if (content.contains(Contans.HEART_BEAT)) { resetTimes(ctx.channel()); } else { this.msgDispatch.webSocketDispatch(ctx, content); } } catch (ClassCastException e) { content = ((CloseWebSocketFrame) frame).reasonText(); ctx.writeAndFlush(new TextWebSocketFrame(content)); } } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("[触发器触发]"); // super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { } else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); Integer times = pingTimes.get(ctx.channel().id().asShortText()); if (times == null) { times = 0; } /*读超时*/ log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 if (times >= MAX_UN_REC_PING_TIMES) { log.info("===服务端===(写超时,关闭chanel)"); // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 ctx.channel().close(); } else { // 失败计数器加1 times++; pingTimes.remove(ctx.channel().id().asShortText()); pingTimes.put(ctx.channel().id().asShortText(), times); } } else if (event.state() == IdleState.ALL_IDLE) { /*总超时*/ System.out.println("===服务端===(ALL_IDLE 总超时)"); } } } private void resetTimes(Channel channel) { pingTimes.remove(channel.id().asShortText()); pingTimes.put(channel.id().asShortText(), 0); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("[websocket服务器发生异常]-->{},{}#", ctx.channel().id(), cause); super.exceptionCaught(ctx, cause); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { System.out.println("本例程仅支持文本消息,不支持二进制消息"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); // 群发 // ChannelSupervise.send2All(tws); // 返回【谁发的发给谁】 // ctx.channel().writeAndFlush(tws); } /** * 唯一的一次http请求,用于创建websocket */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); InetAddress inetAddress = inetSocketAddress.getAddress(); String ip = inetAddress.getHostAddress(); int port = inetSocketAddress.getPort(); //要求Upgrade为websocket,过滤掉get/Post if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + ip + ":" + port + "/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } /** * 拒绝不合法的请求,并返回错误信息 */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } //服务端向客户端发送数据 ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } } src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
New file @@ -0,0 +1,39 @@ package com.xcong.excoin.netty.initalizer; import com.xcong.excoin.netty.handler.WebSocketServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-06 */ @Component public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { @Autowired private WebSocketServerHandler webSocketServerHandler; @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline cp = ch.pipeline(); // http编码器 cp.addLast(new HttpServerCodec()); // 聚合器,使用websocket会用到 cp.addLast(new HttpObjectAggregator(65536)); cp.addLast(new ChunkedWriteHandler()); // 心跳 ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); // 自定义业务handler cp.addLast(webSocketServerHandler); } } src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java
New file @@ -0,0 +1,13 @@ package com.xcong.excoin.netty.logic; import com.xcong.excoin.netty.bean.RequestBean; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-09 */ public interface MsgLogic { void webSocketMsgLogic(RequestBean requestBean); } src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
New file @@ -0,0 +1,41 @@ package com.xcong.excoin.netty.logic; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.bean.ResponseBean; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.NettyTools; import io.netty.channel.Channel; import org.springframework.stereotype.Component; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-09 */ @Component public class WebSocketLogic { public void webReqConnection(RequestBean requestBean) { Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush(NettyTools.webSocketBytes("this is ok")); } public void reqHomeSymbols(RequestBean requestBean) { String params = requestBean.getParams(); JSONObject jsonObject = JSONObject.parseObject(params); String token = jsonObject.getString("token"); String type = jsonObject.getString("type"); ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null); Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); } public void defaultReq(RequestBean requestBean) { Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); channel.writeAndFlush("this is error type"); } } src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
New file @@ -0,0 +1,36 @@ package com.xcong.excoin.netty.logic.impl; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.logic.MsgLogic; import com.xcong.excoin.netty.logic.WebSocketLogic; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author wzy * @date 2019-05-09 */ @Component public class MsgLogicImpl implements MsgLogic { @Autowired private WebSocketLogic webSocketLogic; @Override public void webSocketMsgLogic(RequestBean requestBean) { switch (requestBean.getType()) { case Contans.WEB_REQ_CONNECTION : webSocketLogic.webReqConnection(requestBean); break; case Contans.HOME_SYMBOLS: webSocketLogic.reqHomeSymbols(requestBean); default: webSocketLogic.defaultReq(requestBean); break; } } } src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
New file @@ -0,0 +1,68 @@ package com.xcong.excoin.netty.server; import com.xcong.excoin.netty.ChatServer; import com.xcong.excoin.netty.initalizer.WebSocketServerInitializer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author wzy * @date 2019-05-06 */ @Slf4j @Component("webSocketServer") public class WebSocketServer implements ChatServer { private EventLoopGroup boss = new NioEventLoopGroup(); private EventLoopGroup work = new NioEventLoopGroup(); private ChannelFuture channelFuture; @Autowired private WebSocketServerInitializer webSocketServerInitializer; @Override public void start() throws Exception { log.info("[websocket服务器启动]"); try { ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) .channel(NioServerSocketChannel.class) .childHandler(webSocketServerInitializer); channelFuture = b.bind(9999).sync(); log.info("[websocket服务器启动完成]-->{}", channelFuture.channel().localAddress()); } finally { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { shutdown(); } }); } } @Override public void shutdown() { if (channelFuture != null) { channelFuture.channel().close().syncUninterruptibly(); } if (boss != null) { boss.shutdownGracefully(); } if (work != null) { work.shutdownGracefully(); } } }