Helius
2021-02-28 d8d653b40cc6565c72cccd28de831474e5d5c512
add netty
6 files added
3 files modified
305 ■■■■■ changed files
src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java 30 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/client/ClientServer.java 55 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java 15 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java 49 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java 33 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/server/TcpServer.java 67 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java 7 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java 35 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java 14 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/client/ClientChannelHandler.java
New file
@@ -0,0 +1,30 @@
package com.xcong.excoin.netty.client;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
/**
 * @author wzy
 * @email wangdoubleone@gmail.com
 * @date 2019-05-14
 */
@Slf4j
public class ClientChannelHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("",cause);
    }
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("消息收到");
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        log.info("消息收到");
    }
}
src/main/java/com/xcong/excoin/netty/client/ClientServer.java
New file
@@ -0,0 +1,55 @@
package com.xcong.excoin.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.net.InetSocketAddress;
/**
 * @author wzy
 * @email wangdoubleone@gmail.com
 * @date 2019-05-14
 */
public class ClientServer {
    public static void start() throws InterruptedException {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress("localhost", 9998))
                    .handler(new ChannelInitializer<SocketChannel>() {    //5
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
                            ch.pipeline().addLast(new DelimiterBasedFrameDecoder(10240, buf));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new StringEncoder());
                            ch.pipeline().addLast(new ClientChannelHandler());
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
    public static void main(String args[]) throws InterruptedException {
        start();
    }
}
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -22,6 +22,8 @@
    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ChannelGroup TCP_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
@@ -31,11 +33,24 @@
    // 当前连接到服务器的通道(tcp和websocket)
    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
    public static void addTcpChannel(Channel channel) {
        TCP_GROUP.add(channel);
    }
    public static void removeTcpChannel(Channel channel) {
        TCP_GROUP.remove(channel);
    }
    public static void addWebSocketChannel(Channel channel) {
        WEBSOCKET_GROUP.add(channel);
        CHANNEL_MAP.put(channel.id().asShortText(), channel.id());
    }
    public static ChannelGroup getTcpGroup() {
        return TCP_GROUP;
    }
    public static void removeWebSocketChannel(Channel channel) {
        WEBSOCKET_GROUP.remove(channel);
        CHANNEL_MAP.remove(channel.id().asShortText());
src/main/java/com/xcong/excoin/netty/handler/TcpServerHandler.java
New file
@@ -0,0 +1,49 @@
package com.xcong.excoin.netty.handler;
import com.xcong.excoin.netty.common.ChannelManager;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
 * @author wzy
 * @email wangdoubleone@gmail.com
 * @date 2019-05-06
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class TcpServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("[tcp客户端连入服务器]");
        ChannelManager.addTcpChannel(ctx.channel());
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("[tcp客户端断开服务器]");
        ChannelManager.removeTcpChannel(ctx.channel());
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
    }
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    }
}
src/main/java/com/xcong/excoin/netty/initalizer/TcpServerInitializer.java
New file
@@ -0,0 +1,33 @@
package com.xcong.excoin.netty.initalizer;
import com.xcong.excoin.netty.handler.TcpServerHandler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author wzy
 * @email wangdoubleone@gmail.com
 * @date 2019-05-06
 */
@Component
public class TcpServerInitializer extends ChannelInitializer<NioSocketChannel> {
    @Autowired
    private TcpServerHandler tcpServerHandler;
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ByteBuf buf = Unpooled.copiedBuffer("_split".getBytes());
        ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf));
        ch.pipeline().addLast(new StringDecoder());
        ch.pipeline().addLast(new StringEncoder());
        ch.pipeline().addLast(tcpServerHandler);
    }
}
src/main/java/com/xcong/excoin/netty/server/TcpServer.java
New file
@@ -0,0 +1,67 @@
package com.xcong.excoin.netty.server;
import com.xcong.excoin.netty.ChatServer;
import com.xcong.excoin.netty.initalizer.TcpServerInitializer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author wzy
 * @email wangdoubleone@gmail.com
 * @date 2019-05-06
 */
@Slf4j
@Component("tcpServer")
public class TcpServer implements ChatServer {
    private EventLoopGroup boss = new NioEventLoopGroup();
    private EventLoopGroup work = new NioEventLoopGroup();
    private ChannelFuture channelFuture;
    @Autowired
    private TcpServerInitializer tcpServerInitializer;
    @Override
    public void start() throws Exception {
        log.info("[tcp服务器启动]");
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(boss, work)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(tcpServerInitializer);
            channelFuture = b.bind(9998).sync();
            log.info("[tcp服务器启动完成]-->{}", channelFuture.channel().localAddress());
        } finally {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    shutdown();
                }
            });
        }
    }
    @Override
    public void shutdown() {
        if (channelFuture != null) {
            channelFuture.channel().close().syncUninterruptibly();
        }
        if (boss != null) {
            boss.shutdownGracefully();
        }
        if (work != null) {
            work.shutdownGracefully();
        }
    }
}
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -7,6 +7,7 @@
import com.huobi.client.model.enums.CandlestickInterval;
import com.huobi.client.model.enums.MBPLevelEnums;
import com.huobi.client.model.event.PriceDepthEvent;
import com.xcong.excoin.netty.server.TcpServer;
import com.xcong.excoin.netty.server.WebSocketServer;
import com.xcong.excoin.utils.CoinTypeConvert;
import lombok.extern.slf4j.Slf4j;
@@ -22,18 +23,12 @@
@Slf4j
@Component
public class KLineDataJob {
    @Autowired
    WebSocketServer webSocketServer;
    @Autowired
    private SubscriptionClient subscriptionClient;
//    @PostConstruct
    public void data() throws Exception {
        webSocketServer.start();
        log.info("==================");
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
src/main/java/com/xcong/excoin/quartz/job/NettyServerStartUp.java
New file
@@ -0,0 +1,35 @@
package com.xcong.excoin.quartz.job;
import com.xcong.excoin.netty.server.TcpServer;
import com.xcong.excoin.netty.server.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContextAware;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * @author wzy
 * @date 2021-02-26
 **/
@Slf4j
@Component
public class NettyServerStartUp {
    @Autowired
    WebSocketServer webSocketServer;
    @Autowired
    TcpServer tcpServer;
    @Order(0)
    @PostConstruct
    public void start() throws Exception {
        log.info("启动netty服务器");
        webSocketServer.start();
        tcpServer.start();
    }
}
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -6,6 +6,8 @@
import com.huobi.client.model.Candlestick;
import com.huobi.client.model.enums.CandlestickInterval;
import com.xcong.excoin.modules.symbols.service.SymbolsService;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
@@ -49,12 +51,12 @@
        log.info("#=======价格更新开启=======#");
        subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
            String symbol = tradeEvent.getSymbol();
            ChannelManager.getTcpGroup().writeAndFlush(NettyTools.textBytes(JSONObject.toJSONString(tradeEvent)));
//            String symbol = tradeEvent.getSymbol();
//            // 根据symbol判断做什么操作
            symbol = CoinTypeConvert.convert(symbol);
            if (null != symbol) {
                String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
                redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price);
//            symbol = CoinTypeConvert.convert(symbol);
//            if (null != symbol) {
//                String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
//                // TODO 测试环境关闭这个插入redis
//                redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
//                // 比较
@@ -63,7 +65,7 @@
//                websocketPriceService.wholeBomb();
//                //System.out.println("比较完毕:"+symbol+"-"+price);
//
            }
//            }
        });
//        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {