From 5285d249e702eeab3ad3847aaa24523ef0f1a544 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Wed, 24 Feb 2021 18:11:06 +0800
Subject: [PATCH] add websocket
---
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 82 ++++++++++++++++
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java | 47 +++++++++
src/main/java/com/xcong/excoin/netty/bean/SubRequest.java | 18 +++
src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java | 18 +++
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 44 +++++++-
src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java | 21 ++++
src/main/java/com/xcong/excoin/netty/bean/SubResponse.java | 21 ++++
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 5
8 files changed, 245 insertions(+), 11 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java b/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java
new file mode 100644
index 0000000..39699b2
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/SubRequest.java
@@ -0,0 +1,18 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class SubRequest implements Serializable {
+ private static final long serialVersionUID = -5661972007013485438L;
+ private String sub;
+
+ private String id;
+
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java b/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java
new file mode 100644
index 0000000..845b9d5
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/SubResponse.java
@@ -0,0 +1,21 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class SubResponse implements Serializable {
+
+ private String id;
+
+ private String status;
+
+ private String subbed;
+
+ private long ts;
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java b/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java
new file mode 100644
index 0000000..2515065
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/UnSubRequest.java
@@ -0,0 +1,18 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class UnSubRequest implements Serializable {
+
+ private static final long serialVersionUID = -7082425849051404678L;
+ private String unsub;
+
+ private String id;
+}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java b/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java
new file mode 100644
index 0000000..313939d
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/netty/bean/UnSubResponse.java
@@ -0,0 +1,21 @@
+package com.xcong.excoin.netty.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+/**
+ * @author wzy
+ * @date 2021-02-24
+ **/
+@Data
+public class UnSubResponse implements Serializable {
+
+ private String id;
+
+ private String status;
+
+ private String subbed;
+
+ private long ts;
+}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 5d158f6..3937137 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -8,6 +8,8 @@
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -19,6 +21,12 @@
public class ChannelManager {
private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>();
+
+ private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>();
+
+ private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>();
// 当前连接到服务器的通道(tcp和websocket)
private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>();
@@ -52,5 +60,79 @@
WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg));
}
+ public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+ switch (type) {
+ case "kline" :
+ ChannelGroup kline = KLINE_MAP.get(symbol);
+ if (kline == null) {
+ kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ kline.add(channel);
+ KLINE_MAP.put(symbol, kline);
+ break;
+ case "depth" :
+ ChannelGroup depth = DEPTH_MAP.get(symbol);
+ if (depth == null) {
+ depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ depth.add(channel);
+ DEPTH_MAP.put(symbol, depth);
+ break;
+ case "trade" :
+ ChannelGroup trade = TRADE_MAP.get(symbol);
+ if (trade == null) {
+ trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ trade.add(channel);
+ TRADE_MAP.put(symbol, trade);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+ switch (type) {
+ case "kline" :
+ ChannelGroup kline = KLINE_MAP.get(symbol);
+ if (kline == null) {
+ return;
+ }
+ kline.remove(channel);
+ KLINE_MAP.put(symbol, kline);
+ break;
+ case "depth" :
+ ChannelGroup depth = DEPTH_MAP.get(symbol);
+ if (depth == null) {
+ return;
+ }
+ depth.remove(channel);
+ DEPTH_MAP.put(symbol, depth);
+ break;
+ case "trade" :
+ ChannelGroup trade = TRADE_MAP.get(symbol);
+ if (trade == null) {
+ return;
+ }
+ trade.remove(channel);
+ TRADE_MAP.put(symbol, trade);
+ break;
+ default:
+ break;
+ }
+ }
+
+ public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) {
+ switch (type) {
+ case "kline" :
+ return KLINE_MAP.get(symbol);
+ case "depth" :
+ return DEPTH_MAP.get(symbol);
+ case "trade" :
+ return TRADE_MAP.get(symbol);
+ default:
+ return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+ }
+ }
}
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
index 540fd36..5471a7f 100644
--- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -2,6 +2,10 @@
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.netty.bean.RequestBean;
+import com.xcong.excoin.netty.bean.SubRequest;
+import com.xcong.excoin.netty.bean.SubResponse;
+import com.xcong.excoin.netty.bean.UnSubResponse;
+import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.netty.logic.MsgLogic;
import io.netty.channel.ChannelHandlerContext;
@@ -27,14 +31,38 @@
private MsgLogic msgLogic;
public void webSocketDispatch(ChannelHandlerContext ctx, String msg) {
- RequestBean requestBean = null;
- try {
- requestBean = JSONObject.parseObject(msg, RequestBean.class);
- requestBean.setChannelId(ctx.channel().id().asShortText());
- msgLogic.webSocketMsgLogic(requestBean);
- } catch (Exception e) {
- log.info("#websocket json error:{}#", e);
- ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error"));
+ JSONObject jsonObject = JSONObject.parseObject(msg);
+ if (jsonObject.containsKey("sub")) {
+ String sub = jsonObject.getString("sub");
+ log.info("{}", sub);
+ String[] split = sub.split("\\.");
+ if (split.length != 4) {
+ ctx.writeAndFlush("error");
+ return;
+ }
+
+ ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]);
+ SubResponse subResponse = new SubResponse();
+ subResponse.setSubbed(sub);
+ subResponse.setId(jsonObject.getString("id"));
+ subResponse.setTs(System.currentTimeMillis());
+ subResponse.setStatus("ok");
+ ctx.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(subResponse)));
+ } else if (jsonObject.containsKey("unsub")) {
+ String sub = jsonObject.getString("unsub");
+ String[] split = sub.split("\\.");
+ if (split.length != 4) {
+ ctx.writeAndFlush("error");
+ return;
+ }
+
+ ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]);
+ UnSubResponse resp = new UnSubResponse();
+ resp.setSubbed(sub);
+ resp.setId(jsonObject.getString("id"));
+ resp.setTs(System.currentTimeMillis());
+ resp.setStatus("ok");
+ ctx.writeAndFlush(resp);
}
}
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
index 3e57246..a8b448e 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -61,7 +61,7 @@
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-// LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
+// log.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg);
if (msg instanceof FullHttpRequest) {
// 以http请求形式接入,但是走的是websocket
handleHttpRequest(ctx, (FullHttpRequest) msg);
@@ -88,7 +88,6 @@
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
log.info("[触发器触发]");
-// super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
@@ -101,7 +100,7 @@
times = 0;
}
/*读超时*/
- log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
+// log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times);
// 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
if (times >= MAX_UN_REC_PING_TIMES) {
log.info("===服务端===(写超时,关闭chanel)");
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
new file mode 100644
index 0000000..cd6cae0
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.quartz.job;
+
+import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.SubscriptionClient;
+import com.huobi.client.SubscriptionOptions;
+import com.huobi.client.model.Candlestick;
+import com.huobi.client.model.enums.CandlestickInterval;
+import com.huobi.client.model.enums.MBPLevelEnums;
+import com.huobi.client.model.event.PriceDepthEvent;
+import com.xcong.excoin.netty.server.WebSocketServer;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author wzy
+ * @date 2021-02-23
+ **/
+@Slf4j
+@Component
+public class KLineDataJob {
+
+ @Autowired
+ WebSocketServer webSocketServer;
+
+ @PostConstruct
+ public void data() throws Exception {
+ webSocketServer.start();
+ log.info("==================");
+ SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
+ subscriptionOptions.setConnectionDelayOnFailure(5);
+ subscriptionOptions.setUri("wss://api.hadax.com/ws");
+ SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
+ });
+
+ subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
+// log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids()));
+// log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks()));
+ });
+ }
+}
--
Gitblit v1.9.1