From 5285d249e702eeab3ad3847aaa24523ef0f1a544 Mon Sep 17 00:00:00 2001 From: Helius <wangdoubleone@gmail.com> Date: Wed, 24 Feb 2021 18:11:06 +0800 Subject: [PATCH] add websocket --- src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 82 ++++++++++++++++ src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java | 47 +++++++++ src/main/java/com/xcong/excoin/netty/bean/SubRequest.java | 18 +++ src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java | 18 +++ src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 44 +++++++- src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java | 21 ++++ src/main/java/com/xcong/excoin/netty/bean/SubResponse.java | 21 ++++ src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 5 8 files changed, 245 insertions(+), 11 deletions(-) diff --git a/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java b/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java new file mode 100644 index 0000000..39699b2 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java @@ -0,0 +1,18 @@ +package com.xcong.excoin.netty.bean; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wzy + * @date 2021-02-24 + **/ +@Data +public class SubRequest implements Serializable { + private static final long serialVersionUID = -5661972007013485438L; + private String sub; + + private String id; + +} diff --git a/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java b/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java new file mode 100644 index 0000000..845b9d5 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java @@ -0,0 +1,21 @@ +package com.xcong.excoin.netty.bean; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wzy + * @date 2021-02-24 + **/ +@Data +public class SubResponse implements Serializable { + + private String id; + + private String status; + + private String subbed; + + private long ts; +} diff --git a/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java b/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java new file mode 100644 index 0000000..2515065 --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java @@ -0,0 +1,18 @@ +package com.xcong.excoin.netty.bean; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wzy + * @date 2021-02-24 + **/ +@Data +public class UnSubRequest implements Serializable { + + private static final long serialVersionUID = -7082425849051404678L; + private String unsub; + + private String id; +} diff --git a/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java b/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java new file mode 100644 index 0000000..313939d --- /dev/null +++ b/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java @@ -0,0 +1,21 @@ +package com.xcong.excoin.netty.bean; + +import lombok.Data; + +import java.io.Serializable; + +/** + * @author wzy + * @date 2021-02-24 + **/ +@Data +public class UnSubResponse implements Serializable { + + private String id; + + private String status; + + private String subbed; + + private long ts; +} diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java index 5d158f6..3937137 100644 --- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java +++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java @@ -8,6 +8,8 @@ import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -19,6 +21,12 @@ public class ChannelManager { private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + + private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>(); + + private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>(); + + private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>(); // 当前连接到服务器的通道(tcp和websocket) private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); @@ -52,5 +60,79 @@ WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); } + public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { + switch (type) { + case "kline" : + ChannelGroup kline = KLINE_MAP.get(symbol); + if (kline == null) { + kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + } + kline.add(channel); + KLINE_MAP.put(symbol, kline); + break; + case "depth" : + ChannelGroup depth = DEPTH_MAP.get(symbol); + if (depth == null) { + depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + } + depth.add(channel); + DEPTH_MAP.put(symbol, depth); + break; + case "trade" : + ChannelGroup trade = TRADE_MAP.get(symbol); + if (trade == null) { + trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + } + trade.add(channel); + TRADE_MAP.put(symbol, trade); + break; + default: + break; + } + } + + public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { + switch (type) { + case "kline" : + ChannelGroup kline = KLINE_MAP.get(symbol); + if (kline == null) { + return; + } + kline.remove(channel); + KLINE_MAP.put(symbol, kline); + break; + case "depth" : + ChannelGroup depth = DEPTH_MAP.get(symbol); + if (depth == null) { + return; + } + depth.remove(channel); + DEPTH_MAP.put(symbol, depth); + break; + case "trade" : + ChannelGroup trade = TRADE_MAP.get(symbol); + if (trade == null) { + return; + } + trade.remove(channel); + TRADE_MAP.put(symbol, trade); + break; + default: + break; + } + } + + public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) { + switch (type) { + case "kline" : + return KLINE_MAP.get(symbol); + case "depth" : + return DEPTH_MAP.get(symbol); + case "trade" : + return TRADE_MAP.get(symbol); + default: + return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); + } + } } diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java index 540fd36..5471a7f 100644 --- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java +++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java @@ -2,6 +2,10 @@ import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.netty.bean.RequestBean; +import com.xcong.excoin.netty.bean.SubRequest; +import com.xcong.excoin.netty.bean.SubResponse; +import com.xcong.excoin.netty.bean.UnSubResponse; +import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.netty.logic.MsgLogic; import io.netty.channel.ChannelHandlerContext; @@ -27,14 +31,38 @@ private MsgLogic msgLogic; public void webSocketDispatch(ChannelHandlerContext ctx, String msg) { - RequestBean requestBean = null; - try { - requestBean = JSONObject.parseObject(msg, RequestBean.class); - requestBean.setChannelId(ctx.channel().id().asShortText()); - msgLogic.webSocketMsgLogic(requestBean); - } catch (Exception e) { - log.info("#websocket json error:{}#", e); - ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error")); + JSONObject jsonObject = JSONObject.parseObject(msg); + if (jsonObject.containsKey("sub")) { + String sub = jsonObject.getString("sub"); + log.info("{}", sub); + String[] split = sub.split("\\."); + if (split.length != 4) { + ctx.writeAndFlush("error"); + return; + } + + ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]); + SubResponse subResponse = new SubResponse(); + subResponse.setSubbed(sub); + subResponse.setId(jsonObject.getString("id")); + subResponse.setTs(System.currentTimeMillis()); + subResponse.setStatus("ok"); + ctx.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(subResponse))); + } else if (jsonObject.containsKey("unsub")) { + String sub = jsonObject.getString("unsub"); + String[] split = sub.split("\\."); + if (split.length != 4) { + ctx.writeAndFlush("error"); + return; + } + + ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]); + UnSubResponse resp = new UnSubResponse(); + resp.setSubbed(sub); + resp.setId(jsonObject.getString("id")); + resp.setTs(System.currentTimeMillis()); + resp.setStatus("ok"); + ctx.writeAndFlush(resp); } } diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java index 3e57246..a8b448e 100644 --- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java +++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java @@ -61,7 +61,7 @@ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { -// LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); +// log.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); if (msg instanceof FullHttpRequest) { // 以http请求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); @@ -88,7 +88,6 @@ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("[触发器触发]"); -// super.userEventTriggered(ctx, evt); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { @@ -101,7 +100,7 @@ times = 0; } /*读超时*/ - log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); +// log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 if (times >= MAX_UN_REC_PING_TIMES) { log.info("===服务端===(写超时,关闭chanel)"); diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java new file mode 100644 index 0000000..cd6cae0 --- /dev/null +++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java @@ -0,0 +1,47 @@ +package com.xcong.excoin.quartz.job; + +import com.alibaba.fastjson.JSONObject; +import com.huobi.client.SubscriptionClient; +import com.huobi.client.SubscriptionOptions; +import com.huobi.client.model.Candlestick; +import com.huobi.client.model.enums.CandlestickInterval; +import com.huobi.client.model.enums.MBPLevelEnums; +import com.huobi.client.model.event.PriceDepthEvent; +import com.xcong.excoin.netty.server.WebSocketServer; +import com.xcong.excoin.utils.CoinTypeConvert; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; + +/** + * @author wzy + * @date 2021-02-23 + **/ +@Slf4j +@Component +public class KLineDataJob { + + @Autowired + WebSocketServer webSocketServer; + + @PostConstruct + public void data() throws Exception { + webSocketServer.start(); + log.info("=================="); + SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); + subscriptionOptions.setConnectionDelayOnFailure(5); + subscriptionOptions.setUri("wss://api.hadax.com/ws"); + SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); + }); + + subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> { +// log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids())); +// log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks())); + }); + } +} -- Gitblit v1.9.1