From d8d653b40cc6565c72cccd28de831474e5d5c512 Mon Sep 17 00:00:00 2001 From: Helius <wangdoubleone@gmail.com> Date: Sun, 28 Feb 2021 15:16:18 +0800 Subject: [PATCH] add netty --- src/main/java/com/xcong/excoin/netty/server/TcpServer.java | 67 +++++++++++ src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java | 35 +++++ src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 15 ++ src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java | 33 +++++ src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java | 30 +++++ src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java | 7 - src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java | 49 ++++++++ src/main/java/com/xcong/excoin/netty/client/ClientServer.java | 55 +++++++++ src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java | 14 +- 9 files changed, 293 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java new file mode 100644 index 0000000..33bc9b8 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java @@ -0,0 +1,30 @@ +package com.xcong.excoin.netty.client; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-14 + */ +@Slf4j +public class ClientChannelHandler extends ChannelInboundHandlerAdapter { + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("",cause); + } + + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.info("消息收到"); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + log.info("消息收到"); + } + +} \ No newline at end of file diff --git a/src/main/java/com/xcong/excoin/netty/client/ClientServer.java b/src/main/java/com/xcong/excoin/netty/client/ClientServer.java new file mode 100644 index 0000000..58a9249 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/client/ClientServer.java @@ -0,0 +1,55 @@ +package com.xcong.excoin.netty.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; + +import java.net.InetSocketAddress; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-14 + */ +public class ClientServer { + + public static void start() throws InterruptedException { + EventLoopGroup group = new NioEventLoopGroup(); + try { + Bootstrap b = new Bootstrap(); + b.group(group) + .channel(NioSocketChannel.class) + .remoteAddress(new InetSocketAddress("localhost", 9998)) + .handler(new ChannelInitializer<SocketChannel>() { //5 + @Override + public void initChannel(SocketChannel ch) + throws Exception { + ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes()); + ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, buf)); + ch.pipeline().addLast(new StringDecoder()); + ch.pipeline().addLast(new StringEncoder()); + ch.pipeline().addLast(new ClientChannelHandler()); + } + }); + + ChannelFuture f = b.connect().sync(); + + f.channel().closeFuture().sync(); + } finally { + group.shutdownGracefully(); + } + } + + public static void main(String args[]) throws InterruptedException { + start(); + } +} diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java index e6f6336..7932c00 100644 --- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java +++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java @@ -22,6 +22,8 @@ private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>(); private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>(); @@ -31,11 +33,24 @@ // 当前连接到服务器的通道(tcp和websocket) private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); + + public static void addTcpChannel(Channel channel) { + TCP_GROUP.add(channel); + } + + public static void removeTcpChannel(Channel channel) { + TCP_GROUP.remove(channel); + } + public static void addWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.add(channel); CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); } + public static ChannelGroup getTcpGroup() { + return TCP_GROUP; + } + public static void removeWebSocketChannel(Channel channel) { WEBSOCKET_GROUP.remove(channel); CHANNEL_MAP.remove(channel.id().asShortText()); diff --git a/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java new file mode 100644 index 0000000..bc93484 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java @@ -0,0 +1,49 @@ +package com.xcong.excoin.netty.handler; + +import com.xcong.excoin.netty.common.ChannelManager; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-06 + */ +@Slf4j +@Component +@ChannelHandler.Sharable +public class TcpServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(ChannelHandlerContext ctx) { + log.info("[tcp客户端连入服务器]"); + ChannelManager.addTcpChannel(ctx.channel()); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) { + log.info("[tcp客户端断开服务器]"); + ChannelManager.removeTcpChannel(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + } + + @Override + public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + super.channelReadComplete(ctx); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + super.userEventTriggered(ctx, evt); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + } +} diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java new file mode 100644 index 0000000..885b032 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java @@ -0,0 +1,33 @@ +package com.xcong.excoin.netty.initalizer; + +import com.xcong.excoin.netty.handler.TcpServerHandler; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.handler.codec.DelimiterBasedFrameDecoder; +import io.netty.handler.codec.string.StringDecoder; +import io.netty.handler.codec.string.StringEncoder; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-06 + */ +@Component +public class TcpServerInitializer extends ChannelInitializer<NioSocketChannel> { + + @Autowired + private TcpServerHandler tcpServerHandler; + + @Override + protected void initChannel(NioSocketChannel ch) throws Exception { + ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes()); + ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); + ch.pipeline().addLast(new StringDecoder()); + ch.pipeline().addLast(new StringEncoder()); + ch.pipeline().addLast(tcpServerHandler); + } +} diff --git a/src/main/java/com/xcong/excoin/netty/server/TcpServer.java b/src/main/java/com/xcong/excoin/netty/server/TcpServer.java new file mode 100644 index 0000000..b07760b --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/server/TcpServer.java @@ -0,0 +1,67 @@ +package com.xcong.excoin.netty.server; + +import com.xcong.excoin.netty.ChatServer; +import com.xcong.excoin.netty.initalizer.TcpServerInitializer; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @email wangdoubleone@gmail.com + * @date 2019-05-06 + */ +@Slf4j +@Component("tcpServer") +public class TcpServer implements ChatServer { + + + private EventLoopGroup boss = new NioEventLoopGroup(); + private EventLoopGroup work = new NioEventLoopGroup(); + + private ChannelFuture channelFuture; + + @Autowired + private TcpServerInitializer tcpServerInitializer; + + @Override + public void start() throws Exception { + log.info("[tcp服务器启动]"); + try { + ServerBootstrap b = new ServerBootstrap(); + b.group(boss, work) + .channel(NioServerSocketChannel.class) + .childHandler(tcpServerInitializer); + + channelFuture = b.bind(9998).sync(); + log.info("[tcp服务器启动完成]-->{}", channelFuture.channel().localAddress()); + } finally { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + shutdown(); + } + }); + } + } + + @Override + public void shutdown() { + if (channelFuture != null) { + channelFuture.channel().close().syncUninterruptibly(); + } + + if (boss != null) { + boss.shutdownGracefully(); + } + + if (work != null) { + work.shutdownGracefully(); + } + } +} diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java index 2463a60..2b7da4f 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java @@ -7,6 +7,7 @@ import com.huobi.client.model.enums.CandlestickInterval; import com.huobi.client.model.enums.MBPLevelEnums; import com.huobi.client.model.event.PriceDepthEvent; +import com.xcong.excoin.netty.server.TcpServer; import com.xcong.excoin.netty.server.WebSocketServer; import com.xcong.excoin.utils.CoinTypeConvert; import lombok.extern.slf4j.Slf4j; @@ -22,18 +23,12 @@ @Slf4j @Component public class KLineDataJob { - - @Autowired - WebSocketServer webSocketServer; - @Autowired private SubscriptionClient subscriptionClient; // @PostConstruct public void data() throws Exception { - webSocketServer.start(); log.info("=================="); - subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { Candlestick data = candlestickEvent.getData(); }); diff --git a/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java b/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java new file mode 100644 index 0000000..490ab86 --- /dev/null +++ b/src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java @@ -0,0 +1,35 @@ +package com.xcong.excoin.quartz.job; + +import com.xcong.excoin.netty.server.TcpServer; +import com.xcong.excoin.netty.server.WebSocketServer; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContextAware; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * @author wzy + * @date 2021-02-26 + **/ + +@Slf4j +@Component +public class NettyServerStartUp { + + @Autowired + WebSocketServer webSocketServer; + + @Autowired + TcpServer tcpServer; + + @Order(0) + @PostConstruct + public void start() throws Exception { + log.info("启动netty服务器"); + webSocketServer.start(); + tcpServer.start(); + } +} diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java index 32131b6..603e2c6 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java @@ -6,6 +6,8 @@ import com.huobi.client.model.Candlestick; import com.huobi.client.model.enums.CandlestickInterval; import com.xcong.excoin.modules.symbols.service.SymbolsService; +import com.xcong.excoin.netty.common.ChannelManager; +import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; @@ -49,12 +51,12 @@ log.info("#=======价格更新开启=======#"); subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> { - String symbol = tradeEvent.getSymbol(); + ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent))); +// String symbol = tradeEvent.getSymbol(); // // 根据symbol判断做什么操作 - symbol = CoinTypeConvert.convert(symbol); - if (null != symbol) { - String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString(); - redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price); +// symbol = CoinTypeConvert.convert(symbol); +// if (null != symbol) { +// String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString(); // // TODO 测试环境关闭这个插入redis // redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); // // 比较 @@ -63,7 +65,7 @@ // websocketPriceService.wholeBomb(); // //System.out.println("比较完毕:"+symbol+"-"+price); // - } +// } }); // subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { -- Gitblit v1.9.1