From 91484bb4417567ec8f953bc52b0ee0a8f47309d8 Mon Sep 17 00:00:00 2001
From: wzy <wzy19931122ai@163.com>
Date: Tue, 19 May 2020 22:22:35 +0800
Subject: [PATCH] init netty code
---
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 41 +++
src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java | 89 ++++++
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 56 ++++
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 46 +++
pom.xml | 12
src/main/java/com/xcong/excoin/netty/bean/RequestBean.java | 90 ++++++
src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java | 39 +++
src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java | 13 +
src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java | 36 ++
src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java | 68 +++++
src/main/java/com/xcong/excoin/netty/common/NettyTools.java | 27 ++
src/main/java/com/xcong/excoin/netty/common/Contans.java | 23 +
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 202 +++++++++++++++
src/main/java/com/xcong/excoin/netty/ChatServer.java | 11
14 files changed, 747 insertions(+), 6 deletions(-)
diff --git a/pom.xml b/pom.xml
index dbdfbfe..20f2f96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,12 +147,6 @@
<version>${mapstruct.version}</version>
</dependency>
-<!-- <dependency>-->
-<!-- <groupId>org.mapstruct</groupId>-->
-<!-- <artifactId>mapstruct-processor</artifactId>-->
-<!-- <version>${mapstruct.version}</version>-->
-<!-- </dependency>-->
-
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
@@ -175,6 +169,12 @@
<version>1.6.1</version>
</dependency>
+
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.1.33.Final</version>
+ </dependency>
</dependencies>
<build>
diff --git a/src/main/java/com/xcong/excoin/netty/ChatServer.java b/src/main/java/com/xcong/excoin/netty/ChatServer.java
new file mode 100644
index 0000000..58f0cd7
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/ChatServer.java
@@ -0,0 +1,11 @@
+package com.xcong.excoin.netty;
+
+/**
+ * @author wzy
+ * @date 2019-05-06
+ */
+public interface ChatServer {
+ void start() throws Exception;
+
+ void shutdown();
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
new file mode 100644
index 0000000..289b4ae
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
@@ -0,0 +1,90 @@
+package com.xcong.excoin.netty.bean;
+
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-09
+ */
+public class RequestBean implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * 请求类型
+ */
+ private String type;
+
+ /**
+ * 当前通道id
+ */
+ private String channelId;
+
+ /**
+ * web端通道ID
+ */
+ private String reqId;
+
+ /**
+ * 请求参数
+ */
+ private String params;
+
+ /**
+ * 手持端是否同意连接 0-否 1-是
+ */
+ private String tag;
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getChannelId() {
+ return channelId;
+ }
+
+ public void setChannelId(String channelId) {
+ this.channelId = channelId;
+ }
+
+ public String getParams() {
+ return params;
+ }
+
+ public void setParams(String params) {
+ this.params = params;
+ }
+
+ public String getReqId() {
+ return reqId;
+ }
+
+ public void setReqId(String reqId) {
+ this.reqId = reqId;
+ }
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ @Override
+ public String toString() {
+ return "RequestBean{" +
+ "type='" + type + '\'' +
+ ", channelId='" + channelId + '\'' +
+ ", reqId='" + reqId + '\'' +
+ ", params='" + params + '\'' +
+ ", tag='" + tag + '\'' +
+ '}';
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
new file mode 100644
index 0000000..b595122
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -0,0 +1,89 @@
+package com.xcong.excoin.netty.bean;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author wzy
+ * @date 2020-04-18 16:17
+ **/
+public class ResponseBean implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public static final String SUCCESS = "200";
+
+ private String status;
+
+ private String type;
+
+ private String info;
+
+ private String channelId;
+
+ private Map<Object, Object> mapInfo = new HashMap<>();
+
+ private List<?> row;
+
+ public static ResponseBean ok(String type, String info) {
+ ResponseBean responseBean = new ResponseBean();
+ responseBean.status = SUCCESS;
+ responseBean.type = type;
+ responseBean.info = info;
+ return responseBean;
+ }
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+
+ public String getInfo() {
+ return info;
+ }
+
+ public void setInfo(String info) {
+ this.info = info;
+ }
+
+ public String getChannelId() {
+ return channelId;
+ }
+
+ public void setChannelId(String channelId) {
+ this.channelId = channelId;
+ }
+
+ public Map<Object, Object> getMapInfo() {
+ return mapInfo;
+ }
+
+ public void setMapInfo(Map<Object, Object> mapInfo) {
+ this.mapInfo = mapInfo;
+ }
+
+ public void putInfo(Object key, Object value) {
+ this.mapInfo.put(key, value);
+ }
+
+ public List<?> getRow() {
+ return row;
+ }
+
+ public void setRow(List<?> row) {
+ this.row = row;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
new file mode 100644
index 0000000..5d158f6
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -0,0 +1,56 @@
+package com.xcong.excoin.netty.common;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.ResponseBean;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+public class ChannelManager {
+
+ private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ // 当前连接到服务器的通道(tcp和websocket)
+ private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
+
+ public static void addWebSocketChannel(Channel channel) {
+ WEBSOCKET_GROUP.add(channel);
+ CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
+ }
+
+ public static void removeWebSocketChannel(Channel channel) {
+ WEBSOCKET_GROUP.remove(channel);
+ CHANNEL_MAP.remove(channel.id().asShortText());
+ }
+
+ public static Channel findWebSocketChannel(String id){
+ ChannelId channelId = CHANNEL_MAP.get(id);
+ return WEBSOCKET_GROUP.find(channelId);
+ }
+
+ public static ChannelGroup getWebSocketGroup() {
+ return WEBSOCKET_GROUP;
+ }
+
+ public static void send2All(Object object, String type) {
+ if (WEBSOCKET_GROUP.size() == 0) {
+ return;
+ }
+ ResponseBean responseBean = ResponseBean.ok(type, null);
+ responseBean.putInfo("data", object);
+ String msg = JSONObject.toJSONString(responseBean);
+ WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
+ }
+
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/Contans.java b/src/main/java/com/xcong/excoin/netty/common/Contans.java
new file mode 100644
index 0000000..1a64913
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/common/Contans.java
@@ -0,0 +1,23 @@
+package com.xcong.excoin.netty.common;
+
+/**
+ * @author wzy
+ * @date 2019-05-06
+ */
+public class Contans {
+
+ public static final String HEART_BEAT = "ping pong pang";
+
+ public static final String WEB_REQ_CONNECTION = "000_000";
+
+ public static final String HOME_SYMBOLS = "001_001";
+
+ public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001";
+
+ public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002";
+
+ public static final String ORDER_PRE_ORDER_DATA = "003_001";
+
+ public static final String ORDER_FIND_TRUST_ORDER = "003_002";
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
new file mode 100644
index 0000000..ead90db
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
@@ -0,0 +1,27 @@
+package com.xcong.excoin.netty.common;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+/**
+ * @author wzy
+ * @date 2019-05-14
+ */
+public class NettyTools {
+
+
+ /**
+ * socket字符串传输转码
+ *
+ * @param msg
+ * @return
+ */
+ public static ByteBuf textBytes(String msg) {
+ return Unpooled.copiedBuffer((msg + "_split").getBytes());
+ }
+
+ public static TextWebSocketFrame webSocketBytes(String msg) {
+ return new TextWebSocketFrame(msg);
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
new file mode 100644
index 0000000..540fd36
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -0,0 +1,46 @@
+package com.xcong.excoin.netty.dispatch;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.common.NettyTools;
+import com.xcong.excoin.netty.logic.MsgLogic;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author wzy
+ * @date 2019-05-08
+ */
+@Slf4j
+@Component("msgDispatch")
+public class MsgDispatch implements ApplicationContextAware {
+
+ private ApplicationContext applicationContext;
+
+ @Autowired
+ private MsgLogic msgLogic;
+
+ public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
+ RequestBean requestBean = null;
+ try {
+ requestBean = JSONObject.parseObject(msg, RequestBean.class);
+ requestBean.setChannelId(ctx.channel().id().asShortText());
+ msgLogic.webSocketMsgLogic(requestBean);
+ } catch (Exception e) {
+ log.info("#websocket json error:{}#", e);
+ ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
+ }
+ }
+
+
+ @Override
+ public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+ this.applicationContext = applicationContext;
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
new file mode 100644
index 0000000..3e57246
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -0,0 +1,202 @@
+package com.xcong.excoin.netty.handler;
+
+
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.Contans;
+import com.xcong.excoin.netty.common.NettyTools;
+import com.xcong.excoin.netty.dispatch.MsgDispatch;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.websocketx.*;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
+
+ private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>();
+
+ private static final int MAX_UN_REC_PING_TIMES = 3;
+
+ private WebSocketServerHandshaker handshaker;
+
+ @Resource(name = "msgDispatch")
+ private MsgDispatch msgDispatch;
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id());
+ ChannelManager.addWebSocketChannel(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ log.info("[离开websocket服务器]-->{}", ctx.channel().id());
+ ChannelManager.removeWebSocketChannel(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+// LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
+ if (msg instanceof FullHttpRequest) {
+ // 以http请求形式接入,但是走的是websocket
+ handleHttpRequest(ctx, (FullHttpRequest) msg);
+ } else if (msg instanceof WebSocketFrame) {
+ // 处理websocket客户端的消息
+ handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
+
+ WebSocketFrame frame = (WebSocketFrame) msg;
+ String content;
+ try {
+ content = ((TextWebSocketFrame) frame).text();
+ if (content.contains(Contans.HEART_BEAT)) {
+ resetTimes(ctx.channel());
+ } else {
+ this.msgDispatch.webSocketDispatch(ctx, content);
+ }
+ } catch (ClassCastException e) {
+ content = ((CloseWebSocketFrame) frame).reasonText();
+ ctx.writeAndFlush(new TextWebSocketFrame(content));
+ }
+ }
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ log.info("[触发器触发]");
+// super.userEventTriggered(ctx, evt);
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state() == IdleState.READER_IDLE) {
+
+ } else if (event.state() == IdleState.WRITER_IDLE) {
+ /*写超时*/
+ ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
+ Integer times = pingTimes.get(ctx.channel().id().asShortText());
+ if (times == null) {
+ times = 0;
+ }
+ /*读超时*/
+ log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
+ // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
+ if (times >= MAX_UN_REC_PING_TIMES) {
+ log.info("===服务端===(写超时,关闭chanel)");
+ // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
+ ctx.channel().close();
+ } else {
+ // 失败计数器加1
+ times++;
+ pingTimes.remove(ctx.channel().id().asShortText());
+ pingTimes.put(ctx.channel().id().asShortText(), times);
+ }
+ } else if (event.state() == IdleState.ALL_IDLE) {
+ /*总超时*/
+ System.out.println("===服务端===(ALL_IDLE 总超时)");
+ }
+ }
+ }
+
+ private void resetTimes(Channel channel) {
+ pingTimes.remove(channel.id().asShortText());
+ pingTimes.put(channel.id().asShortText(), 0);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ log.error("[websocket服务器发生异常]-->{},{}#", ctx.channel().id(), cause);
+ super.exceptionCaught(ctx, cause);
+ }
+
+
+ private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+ // 判断是否关闭链路的指令
+ if (frame instanceof CloseWebSocketFrame) {
+ handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
+ return;
+ }
+ // 判断是否ping消息
+ if (frame instanceof PingWebSocketFrame) {
+ ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
+ return;
+ }
+ // 本例程仅支持文本消息,不支持二进制消息
+ if (!(frame instanceof TextWebSocketFrame)) {
+ System.out.println("本例程仅支持文本消息,不支持二进制消息");
+ throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
+ }
+ // 返回应答消息
+ String request = ((TextWebSocketFrame) frame).text();
+ TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
+ // 群发
+ // ChannelSupervise.send2All(tws);
+ // 返回【谁发的发给谁】
+ // ctx.channel().writeAndFlush(tws);
+ }
+
+ /**
+ * 唯一的一次http请求,用于创建websocket
+ */
+ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
+ InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
+ InetAddress inetAddress = inetSocketAddress.getAddress();
+ String ip = inetAddress.getHostAddress();
+ int port = inetSocketAddress.getPort();
+
+ //要求Upgrade为websocket,过滤掉get/Post
+ if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
+ //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
+ return;
+ }
+ WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + ip + ":" + port + "/websocket", null, false);
+ handshaker = wsFactory.newHandshaker(req);
+ if (handshaker == null) {
+ WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
+ } else {
+ handshaker.handshake(ctx.channel(), req);
+ }
+ }
+
+ /**
+ * 拒绝不合法的请求,并返回错误信息
+ */
+ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
+ // 返回应答给客户端
+ if (res.status().code() != 200) {
+ ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
+ CharsetUtil.UTF_8);
+ res.content().writeBytes(buf);
+ buf.release();
+ }
+ //服务端向客户端发送数据
+ ChannelFuture f = ctx.channel().writeAndFlush(res);
+ // 如果是非Keep-Alive,关闭连接
+ if (!isKeepAlive(req) || res.status().code() != 200) {
+ f.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
new file mode 100644
index 0000000..54cb224
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -0,0 +1,39 @@
+package com.xcong.excoin.netty.initalizer;
+
+import com.xcong.excoin.netty.handler.WebSocketServerHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Component
+public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
+
+ @Autowired
+ private WebSocketServerHandler webSocketServerHandler;
+
+ @Override
+ protected void initChannel(NioSocketChannel ch) throws Exception {
+ ChannelPipeline cp = ch.pipeline();
+
+ // http编码器
+ cp.addLast(new HttpServerCodec());
+ // 聚合器,使用websocket会用到
+ cp.addLast(new HttpObjectAggregator(65536));
+ cp.addLast(new ChunkedWriteHandler());
+ // 心跳
+ ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
+ // 自定义业务handler
+ cp.addLast(webSocketServerHandler);
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java b/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java
new file mode 100644
index 0000000..2fc8280
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java
@@ -0,0 +1,13 @@
+package com.xcong.excoin.netty.logic;
+
+
+import com.xcong.excoin.netty.bean.RequestBean;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-09
+ */
+public interface MsgLogic {
+ void webSocketMsgLogic(RequestBean requestBean);
+}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
new file mode 100644
index 0000000..d1c9322
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -0,0 +1,41 @@
+package com.xcong.excoin.netty.logic;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.bean.ResponseBean;
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.NettyTools;
+import io.netty.channel.Channel;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-09
+ */
+@Component
+public class WebSocketLogic {
+
+
+ public void webReqConnection(RequestBean requestBean) {
+ Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+ channel.writeAndFlush(NettyTools.webSocketBytes("this is ok"));
+ }
+
+ public void reqHomeSymbols(RequestBean requestBean) {
+ String params = requestBean.getParams();
+ JSONObject jsonObject = JSONObject.parseObject(params);
+ String token = jsonObject.getString("token");
+ String type = jsonObject.getString("type");
+ ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null);
+
+ Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+ channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+ }
+
+ public void defaultReq(RequestBean requestBean) {
+ Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+ channel.writeAndFlush("this is error type");
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
new file mode 100644
index 0000000..c1fbb4c
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -0,0 +1,36 @@
+package com.xcong.excoin.netty.logic.impl;
+
+import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.common.Contans;
+import com.xcong.excoin.netty.logic.MsgLogic;
+import com.xcong.excoin.netty.logic.WebSocketLogic;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author wzy
+ * @date 2019-05-09
+ */
+@Component
+public class MsgLogicImpl implements MsgLogic {
+
+ @Autowired
+ private WebSocketLogic webSocketLogic;
+
+ @Override
+ public void webSocketMsgLogic(RequestBean requestBean) {
+ switch (requestBean.getType()) {
+ case Contans.WEB_REQ_CONNECTION :
+ webSocketLogic.webReqConnection(requestBean);
+ break;
+ case Contans.HOME_SYMBOLS:
+ webSocketLogic.reqHomeSymbols(requestBean);
+ default:
+ webSocketLogic.defaultReq(requestBean);
+ break;
+ }
+ }
+
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
new file mode 100644
index 0000000..84b1290
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
@@ -0,0 +1,68 @@
+package com.xcong.excoin.netty.server;
+
+import com.xcong.excoin.netty.ChatServer;
+import com.xcong.excoin.netty.initalizer.WebSocketServerInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component("webSocketServer")
+public class WebSocketServer implements ChatServer {
+
+
+ private EventLoopGroup boss = new NioEventLoopGroup();
+ private EventLoopGroup work = new NioEventLoopGroup();
+
+ private ChannelFuture channelFuture;
+
+ @Autowired
+ private WebSocketServerInitializer webSocketServerInitializer;
+
+ @Override
+ public void start() throws Exception {
+ log.info("[websocket服务器启动]");
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(boss, work)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(webSocketServerInitializer);
+
+ channelFuture = b.bind(9999).sync();
+
+ log.info("[websocket服务器启动完成]-->{}", channelFuture.channel().localAddress());
+ } finally {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ });
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (channelFuture != null) {
+ channelFuture.channel().close().syncUninterruptibly();
+ }
+
+ if (boss != null) {
+ boss.shutdownGracefully();
+ }
+
+ if (work != null) {
+ work.shutdownGracefully();
+ }
+ }
+
+}
--
Gitblit v1.9.1