package com.xcong.excoin.netty.handler; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.Contans; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.dispatch.MsgDispatch; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.*; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.net.InetAddress; import java.net.InetSocketAddress; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; /** * @author wzy * @email wangdoubleone@gmail.com * @date 2019-05-06 */ @Slf4j @Component @ChannelHandler.Sharable public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { private final ConcurrentMap pingTimes = new ConcurrentHashMap<>(); private static final int MAX_UN_REC_PING_TIMES = 3; private WebSocketServerHandshaker handshaker; @Resource(name = "msgDispatch") private MsgDispatch msgDispatch; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id()); ChannelManager.addWebSocketChannel(ctx.channel()); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.info("[离开websocket服务器]-->{}", ctx.channel().id()); ChannelManager.removeWebSocketChannel(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); if (msg instanceof FullHttpRequest) { // 以http请求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { // 处理websocket客户端的消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); WebSocketFrame frame = (WebSocketFrame) msg; String content; try { content = ((TextWebSocketFrame) frame).text(); if (content.contains(Contans.HEART_BEAT)) { resetTimes(ctx.channel()); } else { this.msgDispatch.webSocketDispatch(ctx, content); } } catch (ClassCastException e) { content = ((CloseWebSocketFrame) frame).reasonText(); ctx.writeAndFlush(new TextWebSocketFrame(content)); } } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("[触发器触发]"); // super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { } else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); Integer times = pingTimes.get(ctx.channel().id().asShortText()); if (times == null) { times = 0; } /*读超时*/ log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 if (times >= MAX_UN_REC_PING_TIMES) { log.info("===服务端===(写超时,关闭chanel)"); // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 ctx.channel().close(); } else { // 失败计数器加1 times++; pingTimes.remove(ctx.channel().id().asShortText()); pingTimes.put(ctx.channel().id().asShortText(), times); } } else if (event.state() == IdleState.ALL_IDLE) { /*总超时*/ System.out.println("===服务端===(ALL_IDLE 总超时)"); } } } private void resetTimes(Channel channel) { pingTimes.remove(channel.id().asShortText()); pingTimes.put(channel.id().asShortText(), 0); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("[websocket服务器发生异常]-->{},{}#", ctx.channel().id(), cause); super.exceptionCaught(ctx, cause); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { System.out.println("本例程仅支持文本消息,不支持二进制消息"); throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); // 群发 // ChannelSupervise.send2All(tws); // 返回【谁发的发给谁】 // ctx.channel().writeAndFlush(tws); } /** * 唯一的一次http请求,用于创建websocket */ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); InetAddress inetAddress = inetSocketAddress.getAddress(); String ip = inetAddress.getHostAddress(); int port = inetSocketAddress.getPort(); //要求Upgrade为websocket,过滤掉get/Post if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + ip + ":" + port + "/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } /** * 拒绝不合法的请求,并返回错误信息 */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } //服务端向客户端发送数据 ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 if (!isKeepAlive(req) || res.status().code() != 200) { f.addListener(ChannelFutureListener.CLOSE); } } }