From 91484bb4417567ec8f953bc52b0ee0a8f47309d8 Mon Sep 17 00:00:00 2001
From: wzy <wzy19931122ai@163.com>
Date: Tue, 19 May 2020 22:22:35 +0800
Subject: [PATCH] init netty code

---
 src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java                  |   41 +++
 src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java                     |   89 ++++++
 src/main/java/com/xcong/excoin/netty/common/ChannelManager.java                 |   56 ++++
 src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java                  |   46 +++
 pom.xml                                                                         |   12 
 src/main/java/com/xcong/excoin/netty/bean/RequestBean.java                      |   90 ++++++
 src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java |   39 +++
 src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java                        |   13 +
 src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java               |   36 ++
 src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java                |   68 +++++
 src/main/java/com/xcong/excoin/netty/common/NettyTools.java                     |   27 ++
 src/main/java/com/xcong/excoin/netty/common/Contans.java                        |   23 +
 src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java        |  202 +++++++++++++++
 src/main/java/com/xcong/excoin/netty/ChatServer.java                            |   11 
 14 files changed, 747 insertions(+), 6 deletions(-)

diff --git a/pom.xml b/pom.xml
index dbdfbfe..20f2f96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -147,12 +147,6 @@
             <version>${mapstruct.version}</version>
         </dependency>
 
-<!--        <dependency>-->
-<!--            <groupId>org.mapstruct</groupId>-->
-<!--            <artifactId>mapstruct-processor</artifactId>-->
-<!--            <version>${mapstruct.version}</version>-->
-<!--        </dependency>-->
-
         <dependency>
             <groupId>cn.hutool</groupId>
             <artifactId>hutool-all</artifactId>
@@ -175,6 +169,12 @@
             <version>1.6.1</version>
         </dependency>
 
+
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-all</artifactId>
+            <version>4.1.33.Final</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git a/src/main/java/com/xcong/excoin/netty/ChatServer.java b/src/main/java/com/xcong/excoin/netty/ChatServer.java
new file mode 100644
index 0000000..58f0cd7
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/ChatServer.java
@@ -0,0 +1,11 @@
+package com.xcong.excoin.netty;
+
+/**
+ * @author wzy
+ * @date 2019-05-06
+ */
+public interface ChatServer {
+    void start() throws  Exception;
+
+    void shutdown();
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
new file mode 100644
index 0000000..289b4ae
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/RequestBean.java
@@ -0,0 +1,90 @@
+package com.xcong.excoin.netty.bean;
+
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-09
+ */
+public class RequestBean implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    /**
+     * 请求类型
+     */
+    private String type;
+
+    /**
+     * 当前通道id
+     */
+    private String channelId;
+
+    /**
+     * web端通道ID
+     */
+    private String reqId;
+
+    /**
+     * 请求参数
+     */
+    private String params;
+
+    /**
+     * 手持端是否同意连接 0-否 1-是
+     */
+    private String tag;
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(String channelId) {
+        this.channelId = channelId;
+    }
+
+    public String getParams() {
+        return params;
+    }
+
+    public void setParams(String params) {
+        this.params = params;
+    }
+
+    public String getReqId() {
+        return reqId;
+    }
+
+    public void setReqId(String reqId) {
+        this.reqId = reqId;
+    }
+
+    public String getTag() {
+        return tag;
+    }
+
+    public void setTag(String tag) {
+        this.tag = tag;
+    }
+
+    @Override
+    public String toString() {
+        return "RequestBean{" +
+                "type='" + type + '\'' +
+                ", channelId='" + channelId + '\'' +
+                ", reqId='" + reqId + '\'' +
+                ", params='" + params + '\'' +
+                ", tag='" + tag + '\'' +
+                '}';
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
new file mode 100644
index 0000000..b595122
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -0,0 +1,89 @@
+package com.xcong.excoin.netty.bean;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author wzy
+ * @date 2020-04-18 16:17
+ **/
+public class ResponseBean implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    public static final String SUCCESS = "200";
+
+    private String status;
+
+    private String type;
+
+    private String info;
+
+    private String channelId;
+
+    private Map<Object, Object> mapInfo = new HashMap<>();
+
+    private List<?> row;
+
+    public static ResponseBean ok(String type, String info) {
+        ResponseBean responseBean = new ResponseBean();
+        responseBean.status = SUCCESS;
+        responseBean.type = type;
+        responseBean.info = info;
+        return responseBean;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getInfo() {
+        return info;
+    }
+
+    public void setInfo(String info) {
+        this.info = info;
+    }
+
+    public String getChannelId() {
+        return channelId;
+    }
+
+    public void setChannelId(String channelId) {
+        this.channelId = channelId;
+    }
+
+    public Map<Object, Object> getMapInfo() {
+        return mapInfo;
+    }
+
+    public void setMapInfo(Map<Object, Object> mapInfo) {
+        this.mapInfo = mapInfo;
+    }
+
+    public void putInfo(Object key, Object value) {
+        this.mapInfo.put(key, value);
+    }
+
+    public List<?> getRow() {
+        return row;
+    }
+
+    public void setRow(List<?> row) {
+        this.row = row;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
new file mode 100644
index 0000000..5d158f6
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -0,0 +1,56 @@
+package com.xcong.excoin.netty.common;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.ResponseBean;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelId;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+public class ChannelManager {
+
+    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+    // 当前连接到服务器的通道(tcp和websocket)
+    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
+
+    public static void addWebSocketChannel(Channel channel) {
+        WEBSOCKET_GROUP.add(channel);
+        CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
+    }
+
+    public static void removeWebSocketChannel(Channel channel) {
+        WEBSOCKET_GROUP.remove(channel);
+        CHANNEL_MAP.remove(channel.id().asShortText());
+    }
+
+    public static Channel findWebSocketChannel(String id){
+        ChannelId channelId = CHANNEL_MAP.get(id);
+        return WEBSOCKET_GROUP.find(channelId);
+    }
+
+    public static ChannelGroup getWebSocketGroup() {
+        return WEBSOCKET_GROUP;
+    }
+
+    public static void send2All(Object object, String type) {
+        if (WEBSOCKET_GROUP.size() == 0) {
+            return;
+        }
+        ResponseBean responseBean = ResponseBean.ok(type, null);
+        responseBean.putInfo("data", object);
+        String msg = JSONObject.toJSONString(responseBean);
+        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
+    }
+
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/Contans.java b/src/main/java/com/xcong/excoin/netty/common/Contans.java
new file mode 100644
index 0000000..1a64913
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/common/Contans.java
@@ -0,0 +1,23 @@
+package com.xcong.excoin.netty.common;
+
+/**
+ * @author wzy
+ * @date 2019-05-06
+ */
+public class Contans {
+
+    public static final String HEART_BEAT = "ping pong pang";
+
+    public static final String WEB_REQ_CONNECTION = "000_000";
+
+    public static final String HOME_SYMBOLS = "001_001";
+
+    public static final String ORDER_COIN_PRE_ORDER_DATA = "002_001";
+
+    public static final String ORDER_COIN_FIND_TRUST_ORDER = "002_002";
+
+    public static final String ORDER_PRE_ORDER_DATA = "003_001";
+
+    public static final String ORDER_FIND_TRUST_ORDER = "003_002";
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/NettyTools.java b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
new file mode 100644
index 0000000..ead90db
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/common/NettyTools.java
@@ -0,0 +1,27 @@
+package com.xcong.excoin.netty.common;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+
+/**
+ * @author wzy
+ * @date 2019-05-14
+ */
+public class NettyTools {
+
+
+    /**
+     * socket字符串传输转码
+     *
+     * @param msg
+     * @return
+     */
+    public static ByteBuf textBytes(String msg) {
+        return Unpooled.copiedBuffer((msg + "_split").getBytes());
+    }
+
+    public static TextWebSocketFrame webSocketBytes(String msg) {
+        return new TextWebSocketFrame(msg);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
new file mode 100644
index 0000000..540fd36
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -0,0 +1,46 @@
+package com.xcong.excoin.netty.dispatch;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.common.NettyTools;
+import com.xcong.excoin.netty.logic.MsgLogic;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author wzy
+ * @date 2019-05-08
+ */
+@Slf4j
+@Component("msgDispatch")
+public class MsgDispatch implements ApplicationContextAware {
+
+    private ApplicationContext applicationContext;
+
+    @Autowired
+    private MsgLogic msgLogic;
+
+    public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
+        RequestBean requestBean = null;
+        try {
+            requestBean = JSONObject.parseObject(msg, RequestBean.class);
+            requestBean.setChannelId(ctx.channel().id().asShortText());
+            msgLogic.webSocketMsgLogic(requestBean);
+        } catch (Exception e) {
+            log.info("#websocket json error:{}#", e);
+            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
+        }
+    }
+
+
+    @Override
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        this.applicationContext = applicationContext;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
new file mode 100644
index 0000000..3e57246
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -0,0 +1,202 @@
+package com.xcong.excoin.netty.handler;
+
+
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.Contans;
+import com.xcong.excoin.netty.common.NettyTools;
+import com.xcong.excoin.netty.dispatch.MsgDispatch;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.*;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.websocketx.*;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Date;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import static io.netty.handler.codec.http.HttpUtil.isKeepAlive;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
+
+    private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>();
+
+    private static final int MAX_UN_REC_PING_TIMES = 3;
+
+    private WebSocketServerHandshaker handshaker;
+
+    @Resource(name = "msgDispatch")
+    private MsgDispatch msgDispatch;
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        log.info("[websocket客户端连入服务器]-->{}", ctx.channel().id());
+        ChannelManager.addWebSocketChannel(ctx.channel());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        log.info("[离开websocket服务器]-->{}", ctx.channel().id());
+        ChannelManager.removeWebSocketChannel(ctx.channel());
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+//        LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
+        if (msg instanceof FullHttpRequest) {
+            // 以http请求形式接入,但是走的是websocket
+            handleHttpRequest(ctx, (FullHttpRequest) msg);
+        } else if (msg instanceof WebSocketFrame) {
+            // 处理websocket客户端的消息
+            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
+
+            WebSocketFrame frame = (WebSocketFrame) msg;
+            String content;
+            try {
+                content = ((TextWebSocketFrame) frame).text();
+                if (content.contains(Contans.HEART_BEAT)) {
+                    resetTimes(ctx.channel());
+                } else {
+                    this.msgDispatch.webSocketDispatch(ctx, content);
+                }
+            } catch (ClassCastException e) {
+                content = ((CloseWebSocketFrame) frame).reasonText();
+                ctx.writeAndFlush(new TextWebSocketFrame(content));
+            }
+        }
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        log.info("[触发器触发]");
+//        super.userEventTriggered(ctx, evt);
+        if (evt instanceof IdleStateEvent) {
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.READER_IDLE) {
+
+            } else if (event.state() == IdleState.WRITER_IDLE) {
+                /*写超时*/
+                ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
+                Integer times = pingTimes.get(ctx.channel().id().asShortText());
+                if (times == null) {
+                    times = 0;
+                }
+                /*读超时*/
+                log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
+                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
+                if (times >= MAX_UN_REC_PING_TIMES) {
+                    log.info("===服务端===(写超时,关闭chanel)");
+                    // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
+                    ctx.channel().close();
+                } else {
+                    // 失败计数器加1
+                    times++;
+                    pingTimes.remove(ctx.channel().id().asShortText());
+                    pingTimes.put(ctx.channel().id().asShortText(), times);
+                }
+            } else if (event.state() == IdleState.ALL_IDLE) {
+                /*总超时*/
+                System.out.println("===服务端===(ALL_IDLE 总超时)");
+            }
+        }
+    }
+
+    private void resetTimes(Channel channel) {
+        pingTimes.remove(channel.id().asShortText());
+        pingTimes.put(channel.id().asShortText(), 0);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.error("[websocket服务器发生异常]-->{},{}#", ctx.channel().id(), cause);
+        super.exceptionCaught(ctx, cause);
+    }
+
+
+    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+        // 判断是否关闭链路的指令
+        if (frame instanceof CloseWebSocketFrame) {
+            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
+            return;
+        }
+        // 判断是否ping消息
+        if (frame instanceof PingWebSocketFrame) {
+            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
+            return;
+        }
+        // 本例程仅支持文本消息,不支持二进制消息
+        if (!(frame instanceof TextWebSocketFrame)) {
+            System.out.println("本例程仅支持文本消息,不支持二进制消息");
+            throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
+        }
+        // 返回应答消息
+        String request = ((TextWebSocketFrame) frame).text();
+        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + ctx.channel().id() + ":" + request);
+        // 群发
+        // ChannelSupervise.send2All(tws);
+        // 返回【谁发的发给谁】
+        // ctx.channel().writeAndFlush(tws);
+    }
+
+    /**
+     * 唯一的一次http请求,用于创建websocket
+     */
+    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
+        InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
+        InetAddress inetAddress = inetSocketAddress.getAddress();
+        String ip = inetAddress.getHostAddress();
+        int port = inetSocketAddress.getPort();
+
+        //要求Upgrade为websocket,过滤掉get/Post
+        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
+            //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
+            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
+            return;
+        }
+        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + ip + ":" + port + "/websocket", null, false);
+        handshaker = wsFactory.newHandshaker(req);
+        if (handshaker == null) {
+            WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
+        } else {
+            handshaker.handshake(ctx.channel(), req);
+        }
+    }
+
+    /**
+     * 拒绝不合法的请求,并返回错误信息
+     */
+    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
+        // 返回应答给客户端
+        if (res.status().code() != 200) {
+            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
+                    CharsetUtil.UTF_8);
+            res.content().writeBytes(buf);
+            buf.release();
+        }
+        //服务端向客户端发送数据
+        ChannelFuture f = ctx.channel().writeAndFlush(res);
+        // 如果是非Keep-Alive,关闭连接
+        if (!isKeepAlive(req) || res.status().code() != 200) {
+            f.addListener(ChannelFutureListener.CLOSE);
+        }
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
new file mode 100644
index 0000000..54cb224
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -0,0 +1,39 @@
+package com.xcong.excoin.netty.initalizer;
+
+import com.xcong.excoin.netty.handler.WebSocketServerHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Component
+public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
+
+    @Autowired
+    private WebSocketServerHandler webSocketServerHandler;
+
+    @Override
+    protected void initChannel(NioSocketChannel ch) throws Exception {
+        ChannelPipeline cp = ch.pipeline();
+
+        // http编码器
+        cp.addLast(new HttpServerCodec());
+        // 聚合器,使用websocket会用到
+        cp.addLast(new HttpObjectAggregator(65536));
+        cp.addLast(new ChunkedWriteHandler());
+        // 心跳
+        ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
+        // 自定义业务handler
+        cp.addLast(webSocketServerHandler);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java b/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java
new file mode 100644
index 0000000..2fc8280
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/logic/MsgLogic.java
@@ -0,0 +1,13 @@
+package com.xcong.excoin.netty.logic;
+
+
+import com.xcong.excoin.netty.bean.RequestBean;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-09
+ */
+public interface MsgLogic {
+    void webSocketMsgLogic(RequestBean requestBean);
+}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
new file mode 100644
index 0000000..d1c9322
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -0,0 +1,41 @@
+package com.xcong.excoin.netty.logic;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.bean.ResponseBean;
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.NettyTools;
+import io.netty.channel.Channel;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-09
+ */
+@Component
+public class WebSocketLogic {
+
+
+    public void webReqConnection(RequestBean requestBean) {
+        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+        channel.writeAndFlush(NettyTools.webSocketBytes("this is ok"));
+    }
+
+    public void reqHomeSymbols(RequestBean requestBean) {
+        String params = requestBean.getParams();
+        JSONObject jsonObject = JSONObject.parseObject(params);
+        String token = jsonObject.getString("token");
+        String type = jsonObject.getString("type");
+        ResponseBean responseBean = ResponseBean.ok(requestBean.getType(), null);
+
+        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+        channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+    }
+
+    public void defaultReq(RequestBean requestBean) {
+        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
+        channel.writeAndFlush("this is error type");
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
new file mode 100644
index 0000000..c1fbb4c
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -0,0 +1,36 @@
+package com.xcong.excoin.netty.logic.impl;
+
+import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.common.Contans;
+import com.xcong.excoin.netty.logic.MsgLogic;
+import com.xcong.excoin.netty.logic.WebSocketLogic;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+
+/**
+ * @author wzy
+ * @date 2019-05-09
+ */
+@Component
+public class MsgLogicImpl implements MsgLogic {
+
+    @Autowired
+    private WebSocketLogic webSocketLogic;
+
+    @Override
+    public void webSocketMsgLogic(RequestBean requestBean) {
+        switch (requestBean.getType()) {
+            case Contans.WEB_REQ_CONNECTION :
+                webSocketLogic.webReqConnection(requestBean);
+                break;
+            case Contans.HOME_SYMBOLS:
+                webSocketLogic.reqHomeSymbols(requestBean);
+            default:
+                webSocketLogic.defaultReq(requestBean);
+                break;
+        }
+    }
+
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
new file mode 100644
index 0000000..84b1290
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java
@@ -0,0 +1,68 @@
+package com.xcong.excoin.netty.server;
+
+import com.xcong.excoin.netty.ChatServer;
+import com.xcong.excoin.netty.initalizer.WebSocketServerInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component("webSocketServer")
+public class WebSocketServer implements ChatServer {
+
+
+    private EventLoopGroup boss = new NioEventLoopGroup();
+    private EventLoopGroup work = new NioEventLoopGroup();
+
+    private ChannelFuture channelFuture;
+
+    @Autowired
+    private WebSocketServerInitializer webSocketServerInitializer;
+
+    @Override
+    public void start() throws Exception {
+        log.info("[websocket服务器启动]");
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(boss, work)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(webSocketServerInitializer);
+
+            channelFuture = b.bind(9999).sync();
+
+            log.info("[websocket服务器启动完成]-->{}", channelFuture.channel().localAddress());
+        } finally {
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    shutdown();
+                }
+            });
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (channelFuture != null) {
+            channelFuture.channel().close().syncUninterruptibly();
+        }
+
+        if (boss != null) {
+            boss.shutdownGracefully();
+        }
+
+        if (work != null) {
+            work.shutdownGracefully();
+        }
+    }
+
+}

--
Gitblit v1.9.1