6 files added
3 files modified
New file |
| | |
| | | package com.xcong.excoin.netty.client; |
| | | |
| | | import io.netty.channel.ChannelHandlerContext; |
| | | import io.netty.channel.ChannelInboundHandlerAdapter; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @email wangdoubleone@gmail.com |
| | | * @date 2019-05-14 |
| | | */ |
| | | @Slf4j |
| | | public class ClientChannelHandler extends ChannelInboundHandlerAdapter { |
| | | |
| | | @Override |
| | | public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { |
| | | log.error("",cause); |
| | | } |
| | | |
| | | @Override |
| | | public void channelActive(ChannelHandlerContext ctx) throws Exception { |
| | | log.info("消息收到"); |
| | | } |
| | | |
| | | @Override |
| | | public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| | | log.info("消息收到"); |
| | | } |
| | | |
| | | } |
New file |
| | |
| | | package com.xcong.excoin.netty.client; |
| | | |
| | | import io.netty.bootstrap.Bootstrap; |
| | | import io.netty.buffer.ByteBuf; |
| | | import io.netty.buffer.Unpooled; |
| | | import io.netty.channel.ChannelFuture; |
| | | import io.netty.channel.ChannelInitializer; |
| | | import io.netty.channel.EventLoopGroup; |
| | | import io.netty.channel.nio.NioEventLoopGroup; |
| | | import io.netty.channel.socket.SocketChannel; |
| | | import io.netty.channel.socket.nio.NioSocketChannel; |
| | | import io.netty.handler.codec.DelimiterBasedFrameDecoder; |
| | | import io.netty.handler.codec.string.StringDecoder; |
| | | import io.netty.handler.codec.string.StringEncoder; |
| | | |
| | | import java.net.InetSocketAddress; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @email wangdoubleone@gmail.com |
| | | * @date 2019-05-14 |
| | | */ |
| | | public class ClientServer { |
| | | |
| | | public static void start() throws InterruptedException { |
| | | EventLoopGroup group = new NioEventLoopGroup(); |
| | | try { |
| | | Bootstrap b = new Bootstrap(); |
| | | b.group(group) |
| | | .channel(NioSocketChannel.class) |
| | | .remoteAddress(new InetSocketAddress("localhost", 9998)) |
| | | .handler(new ChannelInitializer<SocketChannel>() { //5 |
| | | @Override |
| | | public void initChannel(SocketChannel ch) |
| | | throws Exception { |
| | | ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes()); |
| | | ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, buf)); |
| | | ch.pipeline().addLast(new StringDecoder()); |
| | | ch.pipeline().addLast(new StringEncoder()); |
| | | ch.pipeline().addLast(new ClientChannelHandler()); |
| | | } |
| | | }); |
| | | |
| | | ChannelFuture f = b.connect().sync(); |
| | | |
| | | f.channel().closeFuture().sync(); |
| | | } finally { |
| | | group.shutdownGracefully(); |
| | | } |
| | | } |
| | | |
| | | public static void main(String args[]) throws InterruptedException { |
| | | start(); |
| | | } |
| | | } |
| | |
| | | |
| | | private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | |
| | | private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>(); |
| | |
| | | // 当前连接到服务器的通道(tcp和websocket) |
| | | private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | |
| | | public static void addTcpChannel(Channel channel) { |
| | | TCP_GROUP.add(channel); |
| | | } |
| | | |
| | | public static void removeTcpChannel(Channel channel) { |
| | | TCP_GROUP.remove(channel); |
| | | } |
| | | |
| | | public static void addWebSocketChannel(Channel channel) { |
| | | WEBSOCKET_GROUP.add(channel); |
| | | CHANNEL_MAP.put(channel.id().asShortText(), channel.id()); |
| | | } |
| | | |
| | | public static ChannelGroup getTcpGroup() { |
| | | return TCP_GROUP; |
| | | } |
| | | |
| | | public static void removeWebSocketChannel(Channel channel) { |
| | | WEBSOCKET_GROUP.remove(channel); |
| | | CHANNEL_MAP.remove(channel.id().asShortText()); |
New file |
| | |
| | | package com.xcong.excoin.netty.handler; |
| | | |
| | | import com.xcong.excoin.netty.common.ChannelManager; |
| | | import io.netty.channel.ChannelHandler; |
| | | import io.netty.channel.ChannelHandlerContext; |
| | | import io.netty.channel.ChannelInboundHandlerAdapter; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @email wangdoubleone@gmail.com |
| | | * @date 2019-05-06 |
| | | */ |
| | | @Slf4j |
| | | @Component |
| | | @ChannelHandler.Sharable |
| | | public class TcpServerHandler extends ChannelInboundHandlerAdapter { |
| | | |
| | | @Override |
| | | public void channelActive(ChannelHandlerContext ctx) { |
| | | log.info("[tcp客户端连入服务器]"); |
| | | ChannelManager.addTcpChannel(ctx.channel()); |
| | | } |
| | | |
| | | @Override |
| | | public void channelInactive(ChannelHandlerContext ctx) { |
| | | log.info("[tcp客户端断开服务器]"); |
| | | ChannelManager.removeTcpChannel(ctx.channel()); |
| | | } |
| | | |
| | | @Override |
| | | public void channelRead(ChannelHandlerContext ctx, Object msg) { |
| | | } |
| | | |
| | | @Override |
| | | public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { |
| | | super.channelReadComplete(ctx); |
| | | } |
| | | |
| | | @Override |
| | | public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { |
| | | super.userEventTriggered(ctx, evt); |
| | | } |
| | | |
| | | @Override |
| | | public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { |
| | | } |
| | | } |
New file |
| | |
| | | package com.xcong.excoin.netty.initalizer; |
| | | |
| | | import com.xcong.excoin.netty.handler.TcpServerHandler; |
| | | import io.netty.buffer.ByteBuf; |
| | | import io.netty.buffer.Unpooled; |
| | | import io.netty.channel.ChannelInitializer; |
| | | import io.netty.channel.socket.nio.NioSocketChannel; |
| | | import io.netty.handler.codec.DelimiterBasedFrameDecoder; |
| | | import io.netty.handler.codec.string.StringDecoder; |
| | | import io.netty.handler.codec.string.StringEncoder; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @email wangdoubleone@gmail.com |
| | | * @date 2019-05-06 |
| | | */ |
| | | @Component |
| | | public class TcpServerInitializer extends ChannelInitializer<NioSocketChannel> { |
| | | |
| | | @Autowired |
| | | private TcpServerHandler tcpServerHandler; |
| | | |
| | | @Override |
| | | protected void initChannel(NioSocketChannel ch) throws Exception { |
| | | ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes()); |
| | | ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); |
| | | ch.pipeline().addLast(new StringDecoder()); |
| | | ch.pipeline().addLast(new StringEncoder()); |
| | | ch.pipeline().addLast(tcpServerHandler); |
| | | } |
| | | } |
New file |
| | |
| | | package com.xcong.excoin.netty.server; |
| | | |
| | | import com.xcong.excoin.netty.ChatServer; |
| | | import com.xcong.excoin.netty.initalizer.TcpServerInitializer; |
| | | import io.netty.bootstrap.ServerBootstrap; |
| | | import io.netty.channel.ChannelFuture; |
| | | import io.netty.channel.EventLoopGroup; |
| | | import io.netty.channel.nio.NioEventLoopGroup; |
| | | import io.netty.channel.socket.nio.NioServerSocketChannel; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @email wangdoubleone@gmail.com |
| | | * @date 2019-05-06 |
| | | */ |
| | | @Slf4j |
| | | @Component("tcpServer") |
| | | public class TcpServer implements ChatServer { |
| | | |
| | | |
| | | private EventLoopGroup boss = new NioEventLoopGroup(); |
| | | private EventLoopGroup work = new NioEventLoopGroup(); |
| | | |
| | | private ChannelFuture channelFuture; |
| | | |
| | | @Autowired |
| | | private TcpServerInitializer tcpServerInitializer; |
| | | |
| | | @Override |
| | | public void start() throws Exception { |
| | | log.info("[tcp服务器启动]"); |
| | | try { |
| | | ServerBootstrap b = new ServerBootstrap(); |
| | | b.group(boss, work) |
| | | .channel(NioServerSocketChannel.class) |
| | | .childHandler(tcpServerInitializer); |
| | | |
| | | channelFuture = b.bind(9998).sync(); |
| | | log.info("[tcp服务器启动完成]-->{}", channelFuture.channel().localAddress()); |
| | | } finally { |
| | | Runtime.getRuntime().addShutdownHook(new Thread() { |
| | | @Override |
| | | public void run() { |
| | | shutdown(); |
| | | } |
| | | }); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void shutdown() { |
| | | if (channelFuture != null) { |
| | | channelFuture.channel().close().syncUninterruptibly(); |
| | | } |
| | | |
| | | if (boss != null) { |
| | | boss.shutdownGracefully(); |
| | | } |
| | | |
| | | if (work != null) { |
| | | work.shutdownGracefully(); |
| | | } |
| | | } |
| | | } |
| | |
| | | import com.huobi.client.model.enums.CandlestickInterval; |
| | | import com.huobi.client.model.enums.MBPLevelEnums; |
| | | import com.huobi.client.model.event.PriceDepthEvent; |
| | | import com.xcong.excoin.netty.server.TcpServer; |
| | | import com.xcong.excoin.netty.server.WebSocketServer; |
| | | import com.xcong.excoin.utils.CoinTypeConvert; |
| | | import lombok.extern.slf4j.Slf4j; |
| | |
| | | @Slf4j |
| | | @Component |
| | | public class KLineDataJob { |
| | | |
| | | @Autowired |
| | | WebSocketServer webSocketServer; |
| | | |
| | | @Autowired |
| | | private SubscriptionClient subscriptionClient; |
| | | |
| | | // @PostConstruct |
| | | public void data() throws Exception { |
| | | webSocketServer.start(); |
| | | log.info("=================="); |
| | | |
| | | subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { |
| | | Candlestick data = candlestickEvent.getData(); |
| | | }); |
New file |
| | |
| | | package com.xcong.excoin.quartz.job; |
| | | |
| | | import com.xcong.excoin.netty.server.TcpServer; |
| | | import com.xcong.excoin.netty.server.WebSocketServer; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.context.ApplicationContextAware; |
| | | import org.springframework.core.annotation.Order; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @date 2021-02-26 |
| | | **/ |
| | | |
| | | @Slf4j |
| | | @Component |
| | | public class NettyServerStartUp { |
| | | |
| | | @Autowired |
| | | WebSocketServer webSocketServer; |
| | | |
| | | @Autowired |
| | | TcpServer tcpServer; |
| | | |
| | | @Order(0) |
| | | @PostConstruct |
| | | public void start() throws Exception { |
| | | log.info("启动netty服务器"); |
| | | webSocketServer.start(); |
| | | tcpServer.start(); |
| | | } |
| | | } |
| | |
| | | import com.huobi.client.model.Candlestick; |
| | | import com.huobi.client.model.enums.CandlestickInterval; |
| | | import com.xcong.excoin.modules.symbols.service.SymbolsService; |
| | | import com.xcong.excoin.netty.common.ChannelManager; |
| | | import com.xcong.excoin.netty.common.NettyTools; |
| | | import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; |
| | | import com.xcong.excoin.utils.CoinTypeConvert; |
| | | import com.xcong.excoin.utils.RedisUtils; |
| | |
| | | log.info("#=======价格更新开启=======#"); |
| | | |
| | | subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> { |
| | | String symbol = tradeEvent.getSymbol(); |
| | | ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent))); |
| | | // String symbol = tradeEvent.getSymbol(); |
| | | // // 根据symbol判断做什么操作 |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | | if (null != symbol) { |
| | | String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString(); |
| | | redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price); |
| | | // symbol = CoinTypeConvert.convert(symbol); |
| | | // if (null != symbol) { |
| | | // String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString(); |
| | | // // TODO 测试环境关闭这个插入redis |
| | | // redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); |
| | | // // 比较 |
| | |
| | | // websocketPriceService.wholeBomb(); |
| | | // //System.out.println("比较完毕:"+symbol+"-"+price); |
| | | // |
| | | } |
| | | // } |
| | | |
| | | }); |
| | | // subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { |