Helius
2021-02-24 5285d249e702eeab3ad3847aaa24523ef0f1a544
add websocket
5 files added
3 files modified
256 ■■■■■ changed files
src/main/java/com/xcong/excoin/netty/bean/SubRequest.java 18 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/bean/SubResponse.java 21 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java 18 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java 21 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java 82 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java 44 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java 47 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/bean/SubRequest.java
New file
@@ -0,0 +1,18 @@
package com.xcong.excoin.netty.bean;
import lombok.Data;
import java.io.Serializable;
/**
 * @author wzy
 * @date 2021-02-24
 **/
@Data
public class SubRequest implements Serializable {
    private static final long serialVersionUID = -5661972007013485438L;
    private String sub;
    private String id;
}
src/main/java/com/xcong/excoin/netty/bean/SubResponse.java
New file
@@ -0,0 +1,21 @@
package com.xcong.excoin.netty.bean;
import lombok.Data;
import java.io.Serializable;
/**
 * @author wzy
 * @date 2021-02-24
 **/
@Data
public class SubResponse implements Serializable {
    private String id;
    private String status;
    private String subbed;
    private long ts;
}
src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java
New file
@@ -0,0 +1,18 @@
package com.xcong.excoin.netty.bean;
import lombok.Data;
import java.io.Serializable;
/**
 * @author wzy
 * @date 2021-02-24
 **/
@Data
public class UnSubRequest implements Serializable {
    private static final long serialVersionUID = -7082425849051404678L;
    private String unsub;
    private String id;
}
src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java
New file
@@ -0,0 +1,21 @@
package com.xcong.excoin.netty.bean;
import lombok.Data;
import java.io.Serializable;
/**
 * @author wzy
 * @date 2021-02-24
 **/
@Data
public class UnSubResponse implements Serializable {
    private String id;
    private String status;
    private String subbed;
    private long ts;
}
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -8,6 +8,8 @@
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -19,6 +21,12 @@
public class ChannelManager {
    private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
    private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
    // 当前连接到服务器的通道(tcp和websocket)
    private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
@@ -52,5 +60,79 @@
        WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
    }
    public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                kline.add(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                if (depth == null) {
                    depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                depth.add(channel);
                DEPTH_MAP.put(symbol, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                trade.add(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
    public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
                if (kline == null) {
                    return;
                }
                kline.remove(channel);
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                if (depth == null) {
                    return;
                }
                depth.remove(channel);
                DEPTH_MAP.put(symbol, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
                if (trade == null) {
                    return;
                }
                trade.remove(channel);
                TRADE_MAP.put(symbol, trade);
                break;
            default:
                break;
        }
    }
    public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
        switch (type) {
            case "kline" :
                return KLINE_MAP.get(symbol);
            case "depth" :
                return DEPTH_MAP.get(symbol);
            case "trade" :
                return TRADE_MAP.get(symbol);
            default:
                return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        }
    }
}
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -2,6 +2,10 @@
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.bean.SubRequest;
import com.xcong.excoin.netty.bean.SubResponse;
import com.xcong.excoin.netty.bean.UnSubResponse;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
import io.netty.channel.ChannelHandlerContext;
@@ -27,14 +31,38 @@
    private MsgLogic msgLogic;
    public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
        RequestBean requestBean = null;
        try {
            requestBean = JSONObject.parseObject(msg, RequestBean.class);
            requestBean.setChannelId(ctx.channel().id().asShortText());
            msgLogic.webSocketMsgLogic(requestBean);
        } catch (Exception e) {
            log.info("#websocket json error:{}#", e);
            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
        JSONObject jsonObject = JSONObject.parseObject(msg);
        if (jsonObject.containsKey("sub")) {
            String sub = jsonObject.getString("sub");
            log.info("{}", sub);
            String[] split = sub.split("\\.");
            if (split.length != 4) {
                ctx.writeAndFlush("error");
                return;
            }
            ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]);
            SubResponse subResponse = new SubResponse();
            subResponse.setSubbed(sub);
            subResponse.setId(jsonObject.getString("id"));
            subResponse.setTs(System.currentTimeMillis());
            subResponse.setStatus("ok");
            ctx.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(subResponse)));
        } else if (jsonObject.containsKey("unsub")) {
            String sub = jsonObject.getString("unsub");
            String[] split = sub.split("\\.");
            if (split.length != 4) {
                ctx.writeAndFlush("error");
                return;
            }
            ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]);
            UnSubResponse resp = new UnSubResponse();
            resp.setSubbed(sub);
            resp.setId(jsonObject.getString("id"));
            resp.setTs(System.currentTimeMillis());
            resp.setStatus("ok");
            ctx.writeAndFlush(resp);
        }
    }
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -61,7 +61,7 @@
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
//        log.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
        if (msg instanceof FullHttpRequest) {
            // 以http请求形式接入,但是走的是websocket
            handleHttpRequest(ctx, (FullHttpRequest) msg);
@@ -88,7 +88,6 @@
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("[触发器触发]");
//        super.userEventTriggered(ctx, evt);
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
@@ -101,7 +100,7 @@
                    times = 0;
                }
                /*读超时*/
                log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
//                log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
                if (times >= MAX_UN_REC_PING_TIMES) {
                    log.info("===服务端===(写超时,关闭chanel)");
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
New file
@@ -0,0 +1,47 @@
package com.xcong.excoin.quartz.job;
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.huobi.client.model.Candlestick;
import com.huobi.client.model.enums.CandlestickInterval;
import com.huobi.client.model.enums.MBPLevelEnums;
import com.huobi.client.model.event.PriceDepthEvent;
import com.xcong.excoin.netty.server.WebSocketServer;
import com.xcong.excoin.utils.CoinTypeConvert;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
/**
 * @author wzy
 * @date 2021-02-23
 **/
@Slf4j
@Component
public class KLineDataJob {
    @Autowired
    WebSocketServer webSocketServer;
    @PostConstruct
    public void data() throws Exception {
        webSocketServer.start();
        log.info("==================");
        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
        subscriptionOptions.setConnectionDelayOnFailure(5);
        subscriptionOptions.setUri("wss://api.hadax.com/ws");
        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
//            log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids()));
//            log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks()));
        });
    }
}