| package com.xcong.excoin.netty.handler; | 
|   | 
|   | 
| import com.xcong.excoin.netty.common.ChannelManager; | 
| import com.xcong.excoin.netty.common.Contans; | 
| import com.xcong.excoin.netty.common.NettyTools; | 
| import com.xcong.excoin.netty.dispatch.MsgDispatch; | 
| import io.netty.buffer.ByteBuf; | 
| import io.netty.buffer.Unpooled; | 
| import io.netty.channel.*; | 
| import io.netty.handler.codec.http.DefaultFullHttpResponse; | 
| import io.netty.handler.codec.http.FullHttpRequest; | 
| import io.netty.handler.codec.http.HttpResponseStatus; | 
| import io.netty.handler.codec.http.HttpVersion; | 
| import io.netty.handler.codec.http.websocketx.*; | 
| import io.netty.handler.timeout.IdleState; | 
| import io.netty.handler.timeout.IdleStateEvent; | 
| import io.netty.util.CharsetUtil; | 
| import lombok.extern.slf4j.Slf4j; | 
| import org.springframework.stereotype.Component; | 
|   | 
| import javax.annotation.Resource; | 
| import java.net.InetAddress; | 
| import java.net.InetSocketAddress; | 
| import java.util.Date; | 
| import java.util.concurrent.ConcurrentHashMap; | 
| import java.util.concurrent.ConcurrentMap; | 
|   | 
| import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; | 
|   | 
| /** | 
|  * @author wzy | 
|  * @email wangdoubleone@gmail.com | 
|  * @date 2019-05-06 | 
|  */ | 
| @Slf4j | 
| @Component | 
| @ChannelHandler.Sharable | 
| public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { | 
|   | 
|     private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>(); | 
|   | 
|     private static final int MAX_UN_REC_PING_TIMES = 3; | 
|   | 
|     private WebSocketServerHandshaker handshaker; | 
|   | 
|     @Resource(name = "msgDispatch") | 
|     private MsgDispatch msgDispatch; | 
|   | 
|     @Override | 
|     public void channelActive(ChannelHandlerContext ctx) throws Exception { | 
|         log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id()); | 
|         ChannelManager.addWebSocketChannel(ctx.channel()); | 
|     } | 
|   | 
|     @Override | 
|     public void channelInactive(ChannelHandlerContext ctx) throws Exception { | 
|         log.info("[离开websocket服务器]-->{}", ctx.channel().id()); | 
|         ChannelManager.removeWebSocketChannel(ctx.channel()); | 
|     } | 
|   | 
|     @Override | 
|     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | 
| //        LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); | 
|         if (msg instanceof FullHttpRequest) { | 
|             // 以http请求形式接入,但是走的是websocket | 
|             handleHttpRequest(ctx, (FullHttpRequest) msg); | 
|         } else if (msg instanceof WebSocketFrame) { | 
|             // 处理websocket客户端的消息 | 
|             handlerWebSocketFrame(ctx, (WebSocketFrame) msg); | 
|   | 
|             WebSocketFrame frame = (WebSocketFrame) msg; | 
|             String content; | 
|             try { | 
|                 content = ((TextWebSocketFrame) frame).text(); | 
|                 if (content.contains(Contans.HEART_BEAT)) { | 
|                     resetTimes(ctx.channel()); | 
|                 } else { | 
|                     this.msgDispatch.webSocketDispatch(ctx, content); | 
|                 } | 
|             } catch (ClassCastException e) { | 
|                 content = ((CloseWebSocketFrame) frame).reasonText(); | 
|                 ctx.writeAndFlush(new TextWebSocketFrame(content)); | 
|             } | 
|         } | 
|     } | 
|   | 
|     @Override | 
|     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { | 
|         log.info("[触发器触发]"); | 
| //        super.userEventTriggered(ctx, evt); | 
|         if (evt instanceof IdleStateEvent) { | 
|             IdleStateEvent event = (IdleStateEvent) evt; | 
|             if (event.state() == IdleState.READER_IDLE) { | 
|   | 
|             } else if (event.state() == IdleState.WRITER_IDLE) { | 
|                 /*写超时*/ | 
|                 ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); | 
|                 Integer times = pingTimes.get(ctx.channel().id().asShortText()); | 
|                 if (times == null) { | 
|                     times = 0; | 
|                 } | 
|                 /*读超时*/ | 
|                 log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); | 
|                 // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 | 
|                 if (times >= MAX_UN_REC_PING_TIMES) { | 
|                     log.info("===服务端===(写超时,关闭chanel)"); | 
|                     // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 | 
|                     ctx.channel().close(); | 
|                 } else { | 
|                     // 失败计数器加1 | 
|                     times++; | 
|                     pingTimes.remove(ctx.channel().id().asShortText()); | 
|                     pingTimes.put(ctx.channel().id().asShortText(), times); | 
|                 } | 
|             } else if (event.state() == IdleState.ALL_IDLE) { | 
|                 /*总超时*/ | 
|                 System.out.println("===服务端===(ALL_IDLE 总超时)"); | 
|             } | 
|         } | 
|     } | 
|   | 
|     private void resetTimes(Channel channel) { | 
|         pingTimes.remove(channel.id().asShortText()); | 
|         pingTimes.put(channel.id().asShortText(), 0); | 
|     } | 
|   | 
|     @Override | 
|     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | 
|         log.error("[websocket服务器发生异常]-->{},{}#", ctx.channel().id(), cause); | 
|         super.exceptionCaught(ctx, cause); | 
|     } | 
|   | 
|   | 
|     private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { | 
|         // 判断是否关闭链路的指令 | 
|         if (frame instanceof CloseWebSocketFrame) { | 
|             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); | 
|             return; | 
|         } | 
|         // 判断是否ping消息 | 
|         if (frame instanceof PingWebSocketFrame) { | 
|             ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); | 
|             return; | 
|         } | 
|         // 本例程仅支持文本消息,不支持二进制消息 | 
|         if (!(frame instanceof TextWebSocketFrame)) { | 
|             System.out.println("本例程仅支持文本消息,不支持二进制消息"); | 
|             throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); | 
|         } | 
|         // 返回应答消息 | 
|         String request = ((TextWebSocketFrame) frame).text(); | 
|         TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); | 
|         // 群发 | 
|         // ChannelSupervise.send2All(tws); | 
|         // 返回【谁发的发给谁】 | 
|         // ctx.channel().writeAndFlush(tws); | 
|     } | 
|   | 
|     /** | 
|      * 唯一的一次http请求,用于创建websocket | 
|      */ | 
|     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { | 
|         InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); | 
|         InetAddress inetAddress = inetSocketAddress.getAddress(); | 
|         String ip = inetAddress.getHostAddress(); | 
|         int port = inetSocketAddress.getPort(); | 
|   | 
|         //要求Upgrade为websocket,过滤掉get/Post | 
|         if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { | 
|             //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 | 
|             sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); | 
|             return; | 
|         } | 
|         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + ip + ":" + port + "/websocket", null, false); | 
|         handshaker = wsFactory.newHandshaker(req); | 
|         if (handshaker == null) { | 
|             WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); | 
|         } else { | 
|             handshaker.handshake(ctx.channel(), req); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 拒绝不合法的请求,并返回错误信息 | 
|      */ | 
|     private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { | 
|         // 返回应答给客户端 | 
|         if (res.status().code() != 200) { | 
|             ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), | 
|                     CharsetUtil.UTF_8); | 
|             res.content().writeBytes(buf); | 
|             buf.release(); | 
|         } | 
|         //服务端向客户端发送数据 | 
|         ChannelFuture f = ctx.channel().writeAndFlush(res); | 
|         // 如果是非Keep-Alive,关闭连接 | 
|         if (!isKeepAlive(req) || res.status().code() != 200) { | 
|             f.addListener(ChannelFutureListener.CLOSE); | 
|         } | 
|     } | 
| } |