From c8b80dc38d75e89aa44574659b154ddea2e8fce5 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Wed, 07 Jan 2026 14:16:32 +0800
Subject: [PATCH] refactor(okx): 重构WebSocket客户端实现止盈止损逻辑
---
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java | 65 +++++++++++++++++++-------------
1 files changed, 38 insertions(+), 27 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
index 9a79fdc..03b9d7e 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -1,10 +1,13 @@
package com.xcong.excoin.modules.okxNewPrice;
+import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService;
import com.xcong.excoin.modules.okxNewPrice.okxWs.*;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.ExchangeInfoEnum;
+import com.xcong.excoin.modules.okxNewPrice.okxWs.param.TradeRequestParam;
+import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService;
import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService;
import com.xcong.excoin.utils.RedisUtils;
@@ -19,6 +22,7 @@
import javax.annotation.PreDestroy;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -31,10 +35,11 @@
*/
@Slf4j
public class OkxQuantWebSocketClient {
- private final WangGeService wangGeService;
- private final CaoZuoService caoZuoService;
private final RedisUtils redisUtils;
private final ExchangeInfoEnum account;
+
+ private final CaoZuoService caoZuoService;
+ private final WangGeListService wangGeListService;
private WebSocketClient webSocketClient;
private ScheduledExecutorService heartbeatExecutor;
@@ -45,12 +50,32 @@
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
- public OkxQuantWebSocketClient(ExchangeInfoEnum account, WangGeService wangGeService,
- CaoZuoService caoZuoService, RedisUtils redisUtils) {
+ /**
+ * 获取WebSocketClient实例
+ * @return WebSocketClient实例
+ */
+ public WebSocketClient getWebSocketClient() {
+ return webSocketClient;
+ }
+
+ /**
+ * 获取账号名称
+ * @return 账号名称
+ */
+ public String getAccountName() {
+ return account.name();
+ }
+
+ public OkxQuantWebSocketClient(
+ ExchangeInfoEnum account,
+ RedisUtils redisUtils,
+ CaoZuoService caoZuoService,
+ WangGeListService wangGeListService
+ ) {
this.account = account;
- this.wangGeService = wangGeService;
- this.caoZuoService = caoZuoService;
this.redisUtils = redisUtils;
+ this.caoZuoService = caoZuoService;
+ this.wangGeListService = wangGeListService;
}
private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/private";
@@ -95,22 +120,6 @@
* 销毁方法,在 Spring Bean 销毁前执行。
* 关闭 WebSocket 连接、停止心跳定时器及相关的线程资源。
*/
-// @PreDestroy
-// public void destroy() {
-// if (webSocketClient != null && webSocketClient.isOpen()) {
-// subscribeAccountChannel(UNSUBSCRIBE);
-// subscribePositionChannel(UNSUBSCRIBE);
-// subscribeOrderInfoChannel(UNSUBSCRIBE);
-// webSocketClient.close();
-// }
-// shutdownExecutorGracefully(heartbeatExecutor);
-// if (pongTimeoutFuture != null) {
-// pongTimeoutFuture.cancel(true);
-// }
-// shutdownExecutorGracefully(sharedExecutor);
-//
-// // 移除了 reconnectScheduler 的关闭操作
-// }
@PreDestroy
public void destroy() {
log.info("开始销毁OkxQuantWebSocketClient");
@@ -174,7 +183,6 @@
try {
InstrumentsWs.handleEvent(account.name());
- wangGeService.initWangGe();
SSLConfig.configureSSL();
System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
String WS_URL = WS_URL_MONIPAN;
@@ -370,14 +378,17 @@
// 注意:当前实现中,OrderInfoWs等类使用静态Map存储数据
// 这会导致多账号之间的数据冲突。需要进一步修改这些类的设计,让数据存储与特定账号关联
if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) {
- OrderInfoWs.handleEvent(response, redisUtils, account.name());
+
+ TradeRequestParam tradeRequestParam = OrderInfoWs.handleEvent(response, redisUtils, account.name());
+ TradeOrderWs.orderZhiYingEvent(webSocketClient, tradeRequestParam);
}else if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
+
AccountWs.handleEvent(response, account.name());
-// String side = caoZuoService.caoZuo(account.name());
-// TradeOrderWs.orderEvent(webSocketClient, side, account.name());
} else if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
+
PositionsWs.handleEvent(response, account.name());
} else if (BalanceAndPositionWs.CHANNEL_NAME.equals(channel)) {
+
BalanceAndPositionWs.handleEvent(response);
}
}
@@ -474,7 +485,7 @@
}
int attempt = 0;
- int maxAttempts = 5;
+ int maxAttempts = 3;
long delayMs = 1000;
while (attempt < maxAttempts && !isConnected.get()) {
--
Gitblit v1.9.1