From d8d653b40cc6565c72cccd28de831474e5d5c512 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Sun, 28 Feb 2021 15:16:18 +0800
Subject: [PATCH] add netty
---
src/main/java/com/xcong/excoin/netty/server/TcpServer.java | 67 +++++++++++
src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java | 35 +++++
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 15 ++
src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java | 33 +++++
src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java | 30 +++++
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java | 7 -
src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java | 49 ++++++++
src/main/java/com/xcong/excoin/netty/client/ClientServer.java | 55 +++++++++
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java | 14 +-
9 files changed, 293 insertions(+), 12 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
new file mode 100644
index 0000000..33bc9b8
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
@@ -0,0 +1,30 @@
+package com.xcong.excoin.netty.client;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-14
+ */
+@Slf4j
+public class ClientChannelHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ log.error("",cause);
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ log.info("消息收到");
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ log.info("消息收到");
+ }
+
+}
\ No newline at end of file
diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientServer.java b/src/main/java/com/xcong/excoin/netty/client/ClientServer.java
new file mode 100644
index 0000000..58a9249
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/client/ClientServer.java
@@ -0,0 +1,55 @@
+package com.xcong.excoin.netty.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+
+import java.net.InetSocketAddress;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-14
+ */
+public class ClientServer {
+
+ public static void start() throws InterruptedException {
+ EventLoopGroup group = new NioEventLoopGroup();
+ try {
+ Bootstrap b = new Bootstrap();
+ b.group(group)
+ .channel(NioSocketChannel.class)
+ .remoteAddress(new InetSocketAddress("localhost", 9998))
+ .handler(new ChannelInitializer<SocketChannel>() { //5
+ @Override
+ public void initChannel(SocketChannel ch)
+ throws Exception {
+ ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
+ ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, buf));
+ ch.pipeline().addLast(new StringDecoder());
+ ch.pipeline().addLast(new StringEncoder());
+ ch.pipeline().addLast(new ClientChannelHandler());
+ }
+ });
+
+ ChannelFuture f = b.connect().sync();
+
+ f.channel().closeFuture().sync();
+ } finally {
+ group.shutdownGracefully();
+ }
+ }
+
+ public static void main(String args[]) throws InterruptedException {
+ start();
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index e6f6336..7932c00 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -22,6 +22,8 @@
private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
@@ -31,11 +33,24 @@
// 当前连接到服务器的通道(tcp和websocket)
private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
+
+ public static void addTcpChannel(Channel channel) {
+ TCP_GROUP.add(channel);
+ }
+
+ public static void removeTcpChannel(Channel channel) {
+ TCP_GROUP.remove(channel);
+ }
+
public static void addWebSocketChannel(Channel channel) {
WEBSOCKET_GROUP.add(channel);
CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
}
+ public static ChannelGroup getTcpGroup() {
+ return TCP_GROUP;
+ }
+
public static void removeWebSocketChannel(Channel channel) {
WEBSOCKET_GROUP.remove(channel);
CHANNEL_MAP.remove(channel.id().asShortText());
diff --git a/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
new file mode 100644
index 0000000..bc93484
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
@@ -0,0 +1,49 @@
+package com.xcong.excoin.netty.handler;
+
+import com.xcong.excoin.netty.common.ChannelManager;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component
+@ChannelHandler.Sharable
+public class TcpServerHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+ log.info("[tcp客户端连入服务器]");
+ ChannelManager.addTcpChannel(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+ log.info("[tcp客户端断开服务器]");
+ ChannelManager.removeTcpChannel(ctx.channel());
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ super.channelReadComplete(ctx);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ super.userEventTriggered(ctx, evt);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java
new file mode 100644
index 0000000..885b032
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java
@@ -0,0 +1,33 @@
+package com.xcong.excoin.netty.initalizer;
+
+import com.xcong.excoin.netty.handler.TcpServerHandler;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Component
+public class TcpServerInitializer extends ChannelInitializer<NioSocketChannel> {
+
+ @Autowired
+ private TcpServerHandler tcpServerHandler;
+
+ @Override
+ protected void initChannel(NioSocketChannel ch) throws Exception {
+ ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
+ ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
+ ch.pipeline().addLast(new StringDecoder());
+ ch.pipeline().addLast(new StringEncoder());
+ ch.pipeline().addLast(tcpServerHandler);
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/netty/server/TcpServer.java b/src/main/java/com/xcong/excoin/netty/server/TcpServer.java
new file mode 100644
index 0000000..b07760b
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/server/TcpServer.java
@@ -0,0 +1,67 @@
+package com.xcong.excoin.netty.server;
+
+import com.xcong.excoin.netty.ChatServer;
+import com.xcong.excoin.netty.initalizer.TcpServerInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @email wangdoubleone@gmail.com
+ * @date 2019-05-06
+ */
+@Slf4j
+@Component("tcpServer")
+public class TcpServer implements ChatServer {
+
+
+ private EventLoopGroup boss = new NioEventLoopGroup();
+ private EventLoopGroup work = new NioEventLoopGroup();
+
+ private ChannelFuture channelFuture;
+
+ @Autowired
+ private TcpServerInitializer tcpServerInitializer;
+
+ @Override
+ public void start() throws Exception {
+ log.info("[tcp服务器启动]");
+ try {
+ ServerBootstrap b = new ServerBootstrap();
+ b.group(boss, work)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(tcpServerInitializer);
+
+ channelFuture = b.bind(9998).sync();
+ log.info("[tcp服务器启动完成]-->{}", channelFuture.channel().localAddress());
+ } finally {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ shutdown();
+ }
+ });
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ if (channelFuture != null) {
+ channelFuture.channel().close().syncUninterruptibly();
+ }
+
+ if (boss != null) {
+ boss.shutdownGracefully();
+ }
+
+ if (work != null) {
+ work.shutdownGracefully();
+ }
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
index 2463a60..2b7da4f 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -7,6 +7,7 @@
import com.huobi.client.model.enums.CandlestickInterval;
import com.huobi.client.model.enums.MBPLevelEnums;
import com.huobi.client.model.event.PriceDepthEvent;
+import com.xcong.excoin.netty.server.TcpServer;
import com.xcong.excoin.netty.server.WebSocketServer;
import com.xcong.excoin.utils.CoinTypeConvert;
import lombok.extern.slf4j.Slf4j;
@@ -22,18 +23,12 @@
@Slf4j
@Component
public class KLineDataJob {
-
- @Autowired
- WebSocketServer webSocketServer;
-
@Autowired
private SubscriptionClient subscriptionClient;
// @PostConstruct
public void data() throws Exception {
- webSocketServer.start();
log.info("==================");
-
subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
Candlestick data = candlestickEvent.getData();
});
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java b/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java
new file mode 100644
index 0000000..490ab86
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java
@@ -0,0 +1,35 @@
+package com.xcong.excoin.quartz.job;
+
+import com.xcong.excoin.netty.server.TcpServer;
+import com.xcong.excoin.netty.server.WebSocketServer;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author wzy
+ * @date 2021-02-26
+ **/
+
+@Slf4j
+@Component
+public class NettyServerStartUp {
+
+ @Autowired
+ WebSocketServer webSocketServer;
+
+ @Autowired
+ TcpServer tcpServer;
+
+ @Order(0)
+ @PostConstruct
+ public void start() throws Exception {
+ log.info("启动netty服务器");
+ webSocketServer.start();
+ tcpServer.start();
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
index 32131b6..603e2c6 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -6,6 +6,8 @@
import com.huobi.client.model.Candlestick;
import com.huobi.client.model.enums.CandlestickInterval;
import com.xcong.excoin.modules.symbols.service.SymbolsService;
+import com.xcong.excoin.netty.common.ChannelManager;
+import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
@@ -49,12 +51,12 @@
log.info("#=======价格更新开启=======#");
subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
- String symbol = tradeEvent.getSymbol();
+ ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent)));
+// String symbol = tradeEvent.getSymbol();
// // 根据symbol判断做什么操作
- symbol = CoinTypeConvert.convert(symbol);
- if (null != symbol) {
- String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
- redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price);
+// symbol = CoinTypeConvert.convert(symbol);
+// if (null != symbol) {
+// String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
// // TODO 测试环境关闭这个插入redis
// redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
// // 比较
@@ -63,7 +65,7 @@
// websocketPriceService.wholeBomb();
// //System.out.println("比较完毕:"+symbol+"-"+price);
//
- }
+// }
});
// subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
--
Gitblit v1.9.1