From 91484bb4417567ec8f953bc52b0ee0a8f47309d8 Mon Sep 17 00:00:00 2001 From: wzy <wzy19931122ai@163.com> Date: Tue, 19 May 2020 22:22:35 +0800 Subject: [PATCH] init netty code --- src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 41 +++ src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java | 89 ++++++ src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 56 ++++ src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 46 +++ pom.xml | 12 src/main/java/com/xcong/excoin/netty/bean/RequestBean.java | 90 ++++++ src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java | 39 +++ src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java | 13 + src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java | 36 ++ src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java | 68 +++++ src/main/java/com/xcong/excoin/netty/common/NettyTools.java | 27 ++ src/main/java/com/xcong/excoin/netty/common/Contans.java | 23 + src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 202 +++++++++++++++ src/main/java/com/xcong/excoin/netty/ChatServer.java | 11 14 files changed, 747 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index dbdfbfe..20f2f96 100644 --- a/pom.xml +++ b/pom.xml @@ -147,12 +147,6 @@ <version>${mapstruct.version}</version> </dependency> -<!-- <dependency>--> -<!-- <groupId>org.mapstruct</groupId>--> -<!-- <artifactId>mapstruct-processor</artifactId>--> -<!-- <version>${mapstruct.version}</version>--> -<!-- </dependency>--> - <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> @@ -175,6 +169,12 @@ <version>1.6.1</version> </dependency> + + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-all</artifactId> + <version>4.1.33.Final</version> + </dependency> </dependencies> <build> diff --git a/src/main/java/com/xcong/excoin/netty/ChatServer.java b/src/main/java/com/xcong/excoin/netty/ChatServer.java new file mode 100644 index 0000000..58f0cd7 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/ChatServer.java @@ -0,0 +1,11 @@ +package com.xcong.excoin.netty; + +/** + * @author wzy + * @date 2019-05-06 + */ +public interface ChatServer { + void start() throws Exception; + + void shutdown(); +} diff --git a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java new file mode 100644 index 0000000..289b4ae --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java @@ -0,0 +1,90 @@ +package com.xcong.excoin.netty.bean; + + +import java.io.Serializable; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-09 + */ +public class RequestBean implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * 请求类型 + */ + private String type; + + /** + * 当前通道id + */ + private String channelId; + + /** + * web端通道ID + */ + private String reqId; + + /** + * 请求参数 + */ + private String params; + + /** + * 手持端是否同意连接 0-否 1-是 + */ + private String tag; + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public String getParams() { + return params; + } + + public void setParams(String params) { + this.params = params; + } + + public String getReqId() { + return reqId; + } + + public void setReqId(String reqId) { + this.reqId = reqId; + } + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + @Override + public String toString() { + return "RequestBean{" + + "type='" + type + '\'' + + ", channelId='" + channelId + '\'' + + ", reqId='" + reqId + '\'' + + ", params='" + params + '\'' + + ", tag='" + tag + '\'' + + '}'; + } +} diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java new file mode 100644 index 0000000..b595122 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java @@ -0,0 +1,89 @@ +package com.xcong.excoin.netty.bean; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author wzy + * @date 2020-04-18 16:17 + **/ +public class ResponseBean implements Serializable { + + private static final long serialVersionUID = 1L; + + public static final String SUCCESS = "200"; + + private String status; + + private String type; + + private String info; + + private String channelId; + + private Map<Object, Object> mapInfo = new HashMap<>(); + + private List<?> row; + + public static ResponseBean ok(String type, String info) { + ResponseBean responseBean = new ResponseBean(); + responseBean.status = SUCCESS; + responseBean.type = type; + responseBean.info = info; + return responseBean; + } + + public String getStatus() { + return status; + } + + public void setStatus(String status) { + this.status = status; + } + + public String getType() { + return type; + } + + public void setType(String type) { + this.type = type; + } + + public String getInfo() { + return info; + } + + public void setInfo(String info) { + this.info = info; + } + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public Map<Object, Object> getMapInfo() { + return mapInfo; + } + + public void setMapInfo(Map<Object, Object> mapInfo) { + this.mapInfo = mapInfo; + } + + public void putInfo(Object key, Object value) { + this.mapInfo.put(key, value); + } + + public List<?> getRow() { + return row; + } + + public void setRow(List<?> row) { + this.row = row; + } +} diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java new file mode 100644 index 0000000..5d158f6 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java @@ -0,0 +1,56 @@ +package com.xcong.excoin.netty.common; + +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.netty.bean.ResponseBean; +import io.netty.channel.Channel; +import io.netty.channel.ChannelId; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.util.concurrent.GlobalEventExecutor; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-06 + */ +public class ChannelManager { + + private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + // 当前连接到服务器的通道(tcp和websocket) + private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); + + public static void addWebSocketChannel(Channel channel) { + WEBSOCKET_GROUP.add(channel); + CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); + } + + public static void removeWebSocketChannel(Channel channel) { + WEBSOCKET_GROUP.remove(channel); + CHANNEL_MAP.remove(channel.id().asShortText()); + } + + public static Channel findWebSocketChannel(String id){ + ChannelId channelId = CHANNEL_MAP.get(id); + return WEBSOCKET_GROUP.find(channelId); + } + + public static ChannelGroup getWebSocketGroup() { + return WEBSOCKET_GROUP; + } + + public static void send2All(Object object, String type) { + if (WEBSOCKET_GROUP.size() == 0) { + return; + } + ResponseBean responseBean = ResponseBean.ok(type, null); + responseBean.putInfo("data", object); + String msg = JSONObject.toJSONString(responseBean); + WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); + } + + +} diff --git a/src/main/java/com/xcong/excoin/netty/common/Contans.java b/src/main/java/com/xcong/excoin/netty/common/Contans.java new file mode 100644 index 0000000..1a64913 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/common/Contans.java @@ -0,0 +1,23 @@ +package com.xcong.excoin.netty.common; + +/** + * @author wzy + * @date 2019-05-06 + */ +public class Contans { + + public static final String HEART_BEAT = "ping pong pang"; + + public static final String WEB_REQ_CONNECTION = "000_000"; + + public static final String HOME_SYMBOLS = "001_001"; + + public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001"; + + public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002"; + + public static final String ORDER_PRE_ORDER_DATA = "003_001"; + + public static final String ORDER_FIND_TRUST_ORDER = "003_002"; + +} diff --git a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java new file mode 100644 index 0000000..ead90db --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java @@ -0,0 +1,27 @@ +package com.xcong.excoin.netty.common; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; + +/** + * @author wzy + * @date 2019-05-14 + */ +public class NettyTools { + + + /** + * socket字符串传输转码 + * + * @param msg + * @return + */ + public static ByteBuf textBytes(String msg) { + return Unpooled.copiedBuffer((msg + "_split").getBytes()); + } + + public static TextWebSocketFrame webSocketBytes(String msg) { + return new TextWebSocketFrame(msg); + } +} diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java new file mode 100644 index 0000000..540fd36 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java @@ -0,0 +1,46 @@ +package com.xcong.excoin.netty.dispatch; + +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.netty.bean.RequestBean; +import com.xcong.excoin.netty.common.NettyTools; +import com.xcong.excoin.netty.logic.MsgLogic; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.context.ApplicationContextAware; +import org.springframework.stereotype.Component; + + +/** + * @author wzy + * @date 2019-05-08 + */ +@Slf4j +@Component("msgDispatch") +public class MsgDispatch implements ApplicationContextAware { + + private ApplicationContext applicationContext; + + @Autowired + private MsgLogic msgLogic; + + public void webSocketDispatch(ChannelHandlerContext ctx, String msg) { + RequestBean requestBean = null; + try { + requestBean = JSONObject.parseObject(msg, RequestBean.class); + requestBean.setChannelId(ctx.channel().id().asShortText()); + msgLogic.webSocketMsgLogic(requestBean); + } catch (Exception e) { + log.info("#websocket json error:{}#", e); + ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error")); + } + } + + + @Override + public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { + this.applicationContext = applicationContext; + } +} diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java new file mode 100644 index 0000000..3e57246 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java @@ -0,0 +1,202 @@ +package com.xcong.excoin.netty.handler; + + +import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.Contans; +import com.xcong.excoin.netty.common.NettyTools; +import com.xcong.excoin.netty.dispatch.MsgDispatch; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.websocketx.*; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.Date; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static io.netty.handler.codec.http.HttpUtil.isKeepAlive; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-06 + */ +@Slf4j +@Component +@ChannelHandler.Sharable +public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { + + private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>(); + + private static final int MAX_UN_REC_PING_TIMES = 3; + + private WebSocketServerHandshaker handshaker; + + @Resource(name = "msgDispatch") + private MsgDispatch msgDispatch; + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id()); + ChannelManager.addWebSocketChannel(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.info("[离开websocket服务器]-->{}", ctx.channel().id()); + ChannelManager.removeWebSocketChannel(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { +// LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); + if (msg instanceof FullHttpRequest) { + // 以http请求形式接入,但是走的是websocket + handleHttpRequest(ctx, (FullHttpRequest) msg); + } else if (msg instanceof WebSocketFrame) { + // 处理websocket客户端的消息 + handlerWebSocketFrame(ctx, (WebSocketFrame) msg); + + WebSocketFrame frame = (WebSocketFrame) msg; + String content; + try { + content = ((TextWebSocketFrame) frame).text(); + if (content.contains(Contans.HEART_BEAT)) { + resetTimes(ctx.channel()); + } else { + this.msgDispatch.webSocketDispatch(ctx, content); + } + } catch (ClassCastException e) { + content = ((CloseWebSocketFrame) frame).reasonText(); + ctx.writeAndFlush(new TextWebSocketFrame(content)); + } + } + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + log.info("[触发器触发]"); +// super.userEventTriggered(ctx, evt); + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state() == IdleState.READER_IDLE) { + + } else if (event.state() == IdleState.WRITER_IDLE) { + /*写超时*/ + ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); + Integer times = pingTimes.get(ctx.channel().id().asShortText()); + if (times == null) { + times = 0; + } + /*读超时*/ + log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); + // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 + if (times >= MAX_UN_REC_PING_TIMES) { + log.info("===服务端===(写超时,关闭chanel)"); + // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 + ctx.channel().close(); + } else { + // 失败计数器加1 + times++; + pingTimes.remove(ctx.channel().id().asShortText()); + pingTimes.put(ctx.channel().id().asShortText(), times); + } + } else if (event.state() == IdleState.ALL_IDLE) { + /*总超时*/ + System.out.println("===服务端===(ALL_IDLE 总超时)"); + } + } + } + + private void resetTimes(Channel channel) { + pingTimes.remove(channel.id().asShortText()); + pingTimes.put(channel.id().asShortText(), 0); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("[websocket服务器发生异常]-->{},{}#", ctx.channel().id(), cause); + super.exceptionCaught(ctx, cause); + } + + + private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { + // 判断是否关闭链路的指令 + if (frame instanceof CloseWebSocketFrame) { + handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + return; + } + // 判断是否ping消息 + if (frame instanceof PingWebSocketFrame) { + ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); + return; + } + // 本例程仅支持文本消息,不支持二进制消息 + if (!(frame instanceof TextWebSocketFrame)) { + System.out.println("本例程仅支持文本消息,不支持二进制消息"); + throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName())); + } + // 返回应答消息 + String request = ((TextWebSocketFrame) frame).text(); + TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request); + // 群发 + // ChannelSupervise.send2All(tws); + // 返回【谁发的发给谁】 + // ctx.channel().writeAndFlush(tws); + } + + /** + * 唯一的一次http请求,用于创建websocket + */ + private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) { + InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress(); + InetAddress inetAddress = inetSocketAddress.getAddress(); + String ip = inetAddress.getHostAddress(); + int port = inetSocketAddress.getPort(); + + //要求Upgrade为websocket,过滤掉get/Post + if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { + //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 + sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); + return; + } + WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + ip + ":" + port + "/websocket", null, false); + handshaker = wsFactory.newHandshaker(req); + if (handshaker == null) { + WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); + } else { + handshaker.handshake(ctx.channel(), req); + } + } + + /** + * 拒绝不合法的请求,并返回错误信息 + */ + private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { + // 返回应答给客户端 + if (res.status().code() != 200) { + ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), + CharsetUtil.UTF_8); + res.content().writeBytes(buf); + buf.release(); + } + //服务端向客户端发送数据 + ChannelFuture f = ctx.channel().writeAndFlush(res); + // 如果是非Keep-Alive,关闭连接 + if (!isKeepAlive(req) || res.status().code() != 200) { + f.addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java new file mode 100644 index 0000000..54cb224 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java @@ -0,0 +1,39 @@ +package com.xcong.excoin.netty.initalizer; + +import com.xcong.excoin.netty.handler.WebSocketServerHandler; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.http.HttpObjectAggregator; +import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.stream.ChunkedWriteHandler; +import io.netty.handler.timeout.IdleStateHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-06 + */ +@Component +public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { + + @Autowired + private WebSocketServerHandler webSocketServerHandler; + + @Override + protected void initChannel(NioSocketChannel ch) throws Exception { + ChannelPipeline cp = ch.pipeline(); + + // http编码器 + cp.addLast(new HttpServerCodec()); + // 聚合器,使用websocket会用到 + cp.addLast(new HttpObjectAggregator(65536)); + cp.addLast(new ChunkedWriteHandler()); + // 心跳 + ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); + // 自定义业务handler + cp.addLast(webSocketServerHandler); + } +} diff --git a/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java b/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java new file mode 100644 index 0000000..2fc8280 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java @@ -0,0 +1,13 @@ +package com.xcong.excoin.netty.logic; + + +import com.xcong.excoin.netty.bean.RequestBean; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-09 + */ +public interface MsgLogic { + void webSocketMsgLogic(RequestBean requestBean); +} diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java new file mode 100644 index 0000000..d1c9322 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java @@ -0,0 +1,41 @@ +package com.xcong.excoin.netty.logic; + +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.netty.bean.RequestBean; +import com.xcong.excoin.netty.bean.ResponseBean; +import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.NettyTools; +import io.netty.channel.Channel; +import org.springframework.stereotype.Component; + + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-09 + */ +@Component +public class WebSocketLogic { + + + public void webReqConnection(RequestBean requestBean) { + Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); + channel.writeAndFlush(NettyTools.webSocketBytes("this is ok")); + } + + public void reqHomeSymbols(RequestBean requestBean) { + String params = requestBean.getParams(); + JSONObject jsonObject = JSONObject.parseObject(params); + String token = jsonObject.getString("token"); + String type = jsonObject.getString("type"); + ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null); + + Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); + channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); + } + + public void defaultReq(RequestBean requestBean) { + Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); + channel.writeAndFlush("this is error type"); + } +} diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java new file mode 100644 index 0000000..c1fbb4c --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java @@ -0,0 +1,36 @@ +package com.xcong.excoin.netty.logic.impl; + +import com.xcong.excoin.netty.bean.RequestBean; +import com.xcong.excoin.netty.common.Contans; +import com.xcong.excoin.netty.logic.MsgLogic; +import com.xcong.excoin.netty.logic.WebSocketLogic; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +/** + * @author wzy + * @date 2019-05-09 + */ +@Component +public class MsgLogicImpl implements MsgLogic { + + @Autowired + private WebSocketLogic webSocketLogic; + + @Override + public void webSocketMsgLogic(RequestBean requestBean) { + switch (requestBean.getType()) { + case Contans.WEB_REQ_CONNECTION : + webSocketLogic.webReqConnection(requestBean); + break; + case Contans.HOME_SYMBOLS: + webSocketLogic.reqHomeSymbols(requestBean); + default: + webSocketLogic.defaultReq(requestBean); + break; + } + } + + +} diff --git a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java new file mode 100644 index 0000000..84b1290 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java @@ -0,0 +1,68 @@ +package com.xcong.excoin.netty.server; + +import com.xcong.excoin.netty.ChatServer; +import com.xcong.excoin.netty.initalizer.WebSocketServerInitializer; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @date 2019-05-06 + */ +@Slf4j +@Component("webSocketServer") +public class WebSocketServer implements ChatServer { + + + private EventLoopGroup boss = new NioEventLoopGroup(); + private EventLoopGroup work = new NioEventLoopGroup(); + + private ChannelFuture channelFuture; + + @Autowired + private WebSocketServerInitializer webSocketServerInitializer; + + @Override + public void start() throws Exception { + log.info("[websocket服务器启动]"); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(boss, work) + .channel(NioServerSocketChannel.class) + .childHandler(webSocketServerInitializer); + + channelFuture = b.bind(9999).sync(); + + log.info("[websocket服务器启动完成]-->{}", channelFuture.channel().localAddress()); + } finally { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + shutdown(); + } + }); + } + } + + @Override + public void shutdown() { + if (channelFuture != null) { + channelFuture.channel().close().syncUninterruptibly(); + } + + if (boss != null) { + boss.shutdownGracefully(); + } + + if (work != null) { + work.shutdownGracefully(); + } + } + +} -- Gitblit v1.9.1