From d8d653b40cc6565c72cccd28de831474e5d5c512 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Sun, 28 Feb 2021 15:16:18 +0800
Subject: [PATCH] add netty

---
 src/main/java/com/xcong/excoin/netty/server/TcpServer.java                |   67 +++++++++++
 src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java         |   35 +++++
 src/main/java/com/xcong/excoin/netty/common/ChannelManager.java           |   15 ++
 src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java |   33 +++++
 src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java     |   30 +++++
 src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java               |    7 -
 src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java        |   49 ++++++++
 src/main/java/com/xcong/excoin/netty/client/ClientServer.java             |   55 +++++++++
 src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java       |   14 +-
 9 files changed, 293 insertions(+), 12 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
new file mode 100644
index 0000000..33bc9b8
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
@@ -0,0 +1,30 @@
+package com.xcong.excoin.netty.client;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-14
+ */
+@Slf4j
+public class ClientChannelHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+        log.error("",cause);
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) throws Exception {
+        log.info("消息收到");
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+        log.info("消息收到");
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientServer.java b/src/main/java/com/xcong/excoin/netty/client/ClientServer.java
new file mode 100644
index 0000000..58a9249
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/client/ClientServer.java
@@ -0,0 +1,55 @@
+package com.xcong.excoin.netty.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-14
+ */
+public class ClientServer {
+
+    public static void start() throws InterruptedException {
+        EventLoopGroup group = new NioEventLoopGroup();
+        try {
+            Bootstrap b = new Bootstrap();
+            b.group(group)
+                    .channel(NioSocketChannel.class)
+                    .remoteAddress(new InetSocketAddress("localhost", 9998))
+                    .handler(new ChannelInitializer<SocketChannel>() {    //5
+                        @Override
+                        public void initChannel(SocketChannel ch)
+                                throws Exception {
+                            ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
+                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, buf));
+                            ch.pipeline().addLast(new StringDecoder());
+                            ch.pipeline().addLast(new StringEncoder());
+                            ch.pipeline().addLast(new ClientChannelHandler());
+                        }
+                    });
+
+            ChannelFuture f = b.connect().sync();
+
+            f.channel().closeFuture().sync();
+        } finally {
+            group.shutdownGracefully();
+        }
+    }
+
+    public static void main(String args[]) throws InterruptedException {
+        start();
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index e6f6336..7932c00 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -22,6 +22,8 @@
 
     private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
+    private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
     private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
 
     private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
@@ -31,11 +33,24 @@
     // 当前连接到服务器的通道(tcp和websocket)
     private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
 
+
+    public static void addTcpChannel(Channel channel) {
+        TCP_GROUP.add(channel);
+    }
+
+    public static void removeTcpChannel(Channel channel) {
+        TCP_GROUP.remove(channel);
+    }
+
     public static void addWebSocketChannel(Channel channel) {
         WEBSOCKET_GROUP.add(channel);
         CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
     }
 
+    public static ChannelGroup getTcpGroup() {
+        return TCP_GROUP;
+    }
+
     public static void removeWebSocketChannel(Channel channel) {
         WEBSOCKET_GROUP.remove(channel);
         CHANNEL_MAP.remove(channel.id().asShortText());
diff --git a/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
new file mode 100644
index 0000000..bc93484
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
@@ -0,0 +1,49 @@
+package com.xcong.excoin.netty.handler;
+
+import com.xcong.excoin.netty.common.ChannelManager;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class TcpServerHandler extends ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelActive(ChannelHandlerContext ctx) {
+        log.info("[tcp客户端连入服务器]");
+        ChannelManager.addTcpChannel(ctx.channel());
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) {
+        log.info("[tcp客户端断开服务器]");
+        ChannelManager.removeTcpChannel(ctx.channel());
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) {
+    }
+
+    @Override
+    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+        super.channelReadComplete(ctx);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+        super.userEventTriggered(ctx, evt);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java
new file mode 100644
index 0000000..885b032
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java
@@ -0,0 +1,33 @@
+package com.xcong.excoin.netty.initalizer;
+
+import com.xcong.excoin.netty.handler.TcpServerHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Component
+public class TcpServerInitializer extends ChannelInitializer<NioSocketChannel> {
+
+    @Autowired
+    private TcpServerHandler tcpServerHandler;
+
+    @Override
+    protected void initChannel(NioSocketChannel ch) throws Exception {
+        ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
+        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
+        ch.pipeline().addLast(new StringDecoder());
+        ch.pipeline().addLast(new StringEncoder());
+        ch.pipeline().addLast(tcpServerHandler);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/server/TcpServer.java b/src/main/java/com/xcong/excoin/netty/server/TcpServer.java
new file mode 100644
index 0000000..b07760b
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/server/TcpServer.java
@@ -0,0 +1,67 @@
+package com.xcong.excoin.netty.server;
+
+import com.xcong.excoin.netty.ChatServer;
+import com.xcong.excoin.netty.initalizer.TcpServerInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component("tcpServer")
+public class TcpServer implements ChatServer {
+
+
+    private EventLoopGroup boss = new NioEventLoopGroup();
+    private EventLoopGroup work = new NioEventLoopGroup();
+
+    private ChannelFuture channelFuture;
+
+    @Autowired
+    private TcpServerInitializer tcpServerInitializer;
+
+    @Override
+    public void start() throws Exception {
+        log.info("[tcp服务器启动]");
+        try {
+            ServerBootstrap b = new ServerBootstrap();
+            b.group(boss, work)
+                    .channel(NioServerSocketChannel.class)
+                    .childHandler(tcpServerInitializer);
+
+            channelFuture = b.bind(9998).sync();
+            log.info("[tcp服务器启动完成]-->{}", channelFuture.channel().localAddress());
+        } finally {
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override
+                public void run() {
+                    shutdown();
+                }
+            });
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        if (channelFuture != null) {
+            channelFuture.channel().close().syncUninterruptibly();
+        }
+
+        if (boss != null) {
+            boss.shutdownGracefully();
+        }
+
+        if (work != null) {
+            work.shutdownGracefully();
+        }
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
index 2463a60..2b7da4f 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -7,6 +7,7 @@
 import com.huobi.client.model.enums.CandlestickInterval;
 import com.huobi.client.model.enums.MBPLevelEnums;
 import com.huobi.client.model.event.PriceDepthEvent;
+import com.xcong.excoin.netty.server.TcpServer;
 import com.xcong.excoin.netty.server.WebSocketServer;
 import com.xcong.excoin.utils.CoinTypeConvert;
 import lombok.extern.slf4j.Slf4j;
@@ -22,18 +23,12 @@
 @Slf4j
 @Component
 public class KLineDataJob {
-
-    @Autowired
-    WebSocketServer webSocketServer;
-
     @Autowired
     private SubscriptionClient subscriptionClient;
 
 //    @PostConstruct
     public void data() throws Exception {
-        webSocketServer.start();
         log.info("==================");
-
         subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
             Candlestick data = candlestickEvent.getData();
         });
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java b/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java
new file mode 100644
index 0000000..490ab86
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java
@@ -0,0 +1,35 @@
+package com.xcong.excoin.quartz.job;
+
+import com.xcong.excoin.netty.server.TcpServer;
+import com.xcong.excoin.netty.server.WebSocketServer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author wzy
+ * @date 2021-02-26
+ **/
+
+@Slf4j
+@Component
+public class NettyServerStartUp {
+
+    @Autowired
+    WebSocketServer webSocketServer;
+
+    @Autowired
+    TcpServer tcpServer;
+
+    @Order(0)
+    @PostConstruct
+    public void start() throws Exception {
+        log.info("启动netty服务器");
+        webSocketServer.start();
+        tcpServer.start();
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
index 32131b6..603e2c6 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -6,6 +6,8 @@
 import com.huobi.client.model.Candlestick;
 import com.huobi.client.model.enums.CandlestickInterval;
 import com.xcong.excoin.modules.symbols.service.SymbolsService;
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.NettyTools;
 import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
 import com.xcong.excoin.utils.CoinTypeConvert;
 import com.xcong.excoin.utils.RedisUtils;
@@ -49,12 +51,12 @@
         log.info("#=======价格更新开启=======#");
 
         subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
-            String symbol = tradeEvent.getSymbol();
+            ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent)));
+//            String symbol = tradeEvent.getSymbol();
 //            // 根据symbol判断做什么操作
-            symbol = CoinTypeConvert.convert(symbol);
-            if (null != symbol) {
-                String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
-                redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price);
+//            symbol = CoinTypeConvert.convert(symbol);
+//            if (null != symbol) {
+//                String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
 //                // TODO 测试环境关闭这个插入redis
 //                redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
 //                // 比较
@@ -63,7 +65,7 @@
 //                websocketPriceService.wholeBomb();
 //                //System.out.println("比较完毕:"+symbol+"-"+price);
 //
-            }
+//            }
 
         });
 //        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {

--
Gitblit v1.9.1