package com.xcong.farmer.cms.netty.handler;
|
|
|
import com.xcong.farmer.cms.netty.common.ChannelManager;
|
import com.xcong.farmer.cms.netty.common.Contans;
|
import com.xcong.farmer.cms.netty.common.NettyTools;
|
import com.xcong.farmer.cms.netty.dispatch.MsgDispatch;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.Unpooled;
|
import io.netty.channel.*;
|
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
import io.netty.handler.codec.http.FullHttpRequest;
|
import io.netty.handler.codec.http.HttpResponseStatus;
|
import io.netty.handler.codec.http.HttpVersion;
|
import io.netty.handler.codec.http.websocketx.*;
|
import io.netty.handler.timeout.IdleState;
|
import io.netty.handler.timeout.IdleStateEvent;
|
import io.netty.util.CharsetUtil;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import java.net.InetAddress;
|
import java.net.InetSocketAddress;
|
import java.util.Date;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentMap;
|
|
import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
|
|
/**
|
* @author wzy
|
* @email wangdoubleone@gmail.com
|
* @date 2019-05-06
|
*/
|
@Slf4j
|
@Component
|
@ChannelHandler.Sharable
|
public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
|
|
private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>();
|
|
private static final int MAX_UN_REC_PING_TIMES = 3;
|
|
private WebSocketServerHandshaker handshaker;
|
|
@Resource(name = "msgDispatch")
|
private MsgDispatch msgDispatch;
|
|
@Override
|
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id());
|
ChannelManager.addWebSocketChannel(ctx.channel());
|
}
|
|
@Override
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
log.info("[离开websocket服务器]-->{}", ctx.channel().id());
|
ChannelManager.removeWebSocketChannel(ctx.channel());
|
}
|
|
@Override
|
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
|
// LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
|
if (msg instanceof FullHttpRequest) {
|
// 以http请求形式接入,但是走的是websocket
|
handleHttpRequest(ctx, (FullHttpRequest) msg);
|
} else if (msg instanceof WebSocketFrame) {
|
// 处理websocket客户端的消息
|
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
|
|
WebSocketFrame frame = (WebSocketFrame) msg;
|
String content;
|
try {
|
content = ((TextWebSocketFrame) frame).text();
|
if (content.contains(Contans.HEART_BEAT)) {
|
resetTimes(ctx.channel());
|
} else {
|
this.msgDispatch.webSocketDispatch(ctx, content);
|
}
|
} catch (ClassCastException e) {
|
content = ((CloseWebSocketFrame) frame).reasonText();
|
ctx.writeAndFlush(new TextWebSocketFrame(content));
|
}
|
}
|
}
|
|
@Override
|
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
log.info("[触发器触发]");
|
if (evt instanceof IdleStateEvent) {
|
IdleStateEvent event = (IdleStateEvent) evt;
|
if (event.state() == IdleState.READER_IDLE) {
|
|
} else if (event.state() == IdleState.WRITER_IDLE) {
|
/*写超时*/
|
ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
|
Integer times = pingTimes.get(ctx.channel().id().asShortText());
|
if (times == null) {
|
times = 0;
|
}
|
/*读超时*/
|
log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
|
// 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
|
if (times >= MAX_UN_REC_PING_TIMES) {
|
log.info("===服务端===(写超时,关闭chanel)");
|
// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
|
ctx.channel().close();
|
} else {
|
// 失败计数器加1
|
times++;
|
pingTimes.remove(ctx.channel().id().asShortText());
|
pingTimes.put(ctx.channel().id().asShortText(), times);
|
}
|
} else if (event.state() == IdleState.ALL_IDLE) {
|
/*总超时*/
|
System.out.println("===服务端===(ALL_IDLE 总超时)");
|
}
|
}
|
}
|
|
private void resetTimes(Channel channel) {
|
pingTimes.remove(channel.id().asShortText());
|
pingTimes.put(channel.id().asShortText(), 0);
|
}
|
|
@Override
|
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
|
log.error("[websocket服务器发生异常]-->{},{}#", ctx.channel().id(), cause);
|
super.exceptionCaught(ctx, cause);
|
}
|
|
|
private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
|
// 判断是否关闭链路的指令
|
if (frame instanceof CloseWebSocketFrame) {
|
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
|
return;
|
}
|
// 判断是否ping消息
|
if (frame instanceof PingWebSocketFrame) {
|
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
|
return;
|
}
|
// 本例程仅支持文本消息,不支持二进制消息
|
if (!(frame instanceof TextWebSocketFrame)) {
|
System.out.println("本例程仅支持文本消息,不支持二进制消息");
|
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
|
}
|
// 返回应答消息
|
String request = ((TextWebSocketFrame) frame).text();
|
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
|
// 群发
|
// ChannelSupervise.send2All(tws);
|
// 返回【谁发的发给谁】
|
// ctx.channel().writeAndFlush(tws);
|
}
|
|
/**
|
* 唯一的一次http请求,用于创建websocket
|
*/
|
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
|
InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
|
InetAddress inetAddress = inetSocketAddress.getAddress();
|
String ip = inetAddress.getHostAddress();
|
int port = inetSocketAddress.getPort();
|
|
//要求Upgrade为websocket,过滤掉get/Post
|
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
|
//若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
|
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
|
return;
|
}
|
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + ip + ":" + port + "/websocket", null, false);
|
handshaker = wsFactory.newHandshaker(req);
|
if (handshaker == null) {
|
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
|
} else {
|
handshaker.handshake(ctx.channel(), req);
|
}
|
}
|
|
/**
|
* 拒绝不合法的请求,并返回错误信息
|
*/
|
private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
|
// 返回应答给客户端
|
if (res.status().code() != 200) {
|
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
|
CharsetUtil.UTF_8);
|
res.content().writeBytes(buf);
|
buf.release();
|
}
|
//服务端向客户端发送数据
|
ChannelFuture f = ctx.channel().writeAndFlush(res);
|
// 如果是非Keep-Alive,关闭连接
|
if (!isKeepAlive(req) || res.status().code() != 200) {
|
f.addListener(ChannelFutureListener.CLOSE);
|
}
|
}
|
}
|