From ed57e750b5e2cf14fe5d447ff318228f8df77d23 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Fri, 05 Jun 2026 16:48:51 +0800
Subject: [PATCH] refactor(okx): 移除算法单频道处理器并优化网格交易重置逻辑
---
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java | 19 ++++++---
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java | 1
/dev/null | 89 --------------------------------------------
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxTradeExecutor.java | 8 ++++
4 files changed, 20 insertions(+), 97 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
index 611049b..84a5e49 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
@@ -269,11 +269,9 @@
updateUnrealizedPnl();
if (state == StrategyState.STOPPED) {
- executor.cancelAllAlgoOrders();
- closeExistingPositions();
+ // stopGrid() 已做清理,仅打印日志不重复操作
BigDecimal totalPnl = cumulativePnl.add(unrealizedPnl);
log.info("[OKX] 已实现:{}, 未实现:{}, 合计:{}", cumulativePnl, unrealizedPnl, totalPnl);
- startGrid();
return;
}
@@ -363,7 +361,6 @@
}
} else {
if (longActive && state == StrategyState.ACTIVE) {
- log.info("[OKX] 多仓持仓归零,重置策略");
handlePositionZeroAndReset("多仓");
}
longActive = false;
@@ -385,7 +382,6 @@
}
} else {
if (shortActive && state == StrategyState.ACTIVE) {
- log.info("[OKX] 空仓持仓归零,重置策略");
handlePositionZeroAndReset("空仓");
}
shortActive = false;
@@ -555,7 +551,11 @@
state = StrategyState.STOPPED;
closeExistingPositions();
executor.cancelAllAlgoOrders();
- startGrid();
+ // 提交到 executor 末尾:单线程FIFO保证前面所有平仓/取消任务完成后才重置
+ executor.submitTask(() -> {
+ try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
+ startGrid();
+ });
}
} catch (Exception e) {
log.warn("[OKX] 盈亏检查失败", e);
@@ -563,10 +563,15 @@
}
private void handlePositionZeroAndReset(String direction) {
+ log.info("[OKX] {}持仓归零,重置策略", direction);
state = StrategyState.STOPPED;
executor.cancelAllAlgoOrders();
closeExistingPositions();
- startGrid();
+ // 提交到 executor 末尾:FIFO保证平仓完成后再重置
+ executor.submitTask(() -> {
+ try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
+ startGrid();
+ });
}
// ---- getters ----
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxTradeExecutor.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxTradeExecutor.java
index 488e8ac..3aa0578 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxTradeExecutor.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxTradeExecutor.java
@@ -79,6 +79,14 @@
}
/**
+ * 提交一个通用任务到交易线程池末尾。
+ * 利用单线程池的 FIFO 特性确保任务按提交顺序执行。
+ */
+ public void submitTask(Runnable task) {
+ executor.execute(task);
+ }
+
+ /**
* 异步 IOC 市价开多。
*
* @param quantity 开仓张数(正数)
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
index 439fa8d..95a7096 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
@@ -1,6 +1,5 @@
package com.xcong.excoin.modules.okxNewPrice;
-import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxAlgoOrdersChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxKlineChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxOrdersChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxPositionsChannelHandler;
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxAlgoOrdersChannelHandler.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxAlgoOrdersChannelHandler.java
deleted file mode 100644
index 852eeaa..0000000
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxAlgoOrdersChannelHandler.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package com.xcong.excoin.modules.okxNewPrice.gridWs;
-
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.xcong.excoin.modules.okxNewPrice.OkxGridTradeService;
-import lombok.extern.slf4j.Slf4j;
-import org.java_websocket.client.WebSocketClient;
-
-/**
- * OKX 策略委托(算法单)频道处理器 (orders-algo)。
- * 接收条件单状态变更推送并回调 OkxGridTradeService.onOrderUpdate()。
- *
- * @author Administrator
- */
-@Slf4j
-public class OkxAlgoOrdersChannelHandler implements OkxGridChannelHandler {
-
- private static final String CHANNEL_NAME = "orders-algo";
-
- private final String instId;
- private final OkxGridTradeService gridTradeService;
-
- public OkxAlgoOrdersChannelHandler(String instId, OkxGridTradeService gridTradeService) {
- this.instId = instId;
- this.gridTradeService = gridTradeService;
- }
-
- @Override
- public String getChannelName() { return CHANNEL_NAME; }
-
- @Override
- public void subscribe(WebSocketClient ws) {
- JSONObject msg = new JSONObject();
- JSONObject arg = new JSONObject();
- arg.put("channel", CHANNEL_NAME);
- arg.put("instId", instId);
- msg.put("op", "subscribe");
- JSONArray args = new JSONArray();
- args.add(arg);
- msg.put("args", args);
- ws.send(msg.toJSONString());
- log.info("[OKX-WS] {} 订阅成功, instId:{}", CHANNEL_NAME, instId);
- }
-
- @Override
- public void unsubscribe(WebSocketClient ws) {
- JSONObject msg = new JSONObject();
- JSONObject arg = new JSONObject();
- arg.put("channel", CHANNEL_NAME);
- arg.put("instId", instId);
- msg.put("op", "unsubscribe");
- JSONArray args = new JSONArray();
- args.add(arg);
- msg.put("args", args);
- ws.send(msg.toJSONString());
- log.info("[OKX-WS] {} 取消订阅成功", CHANNEL_NAME);
- }
-
- @Override
- public boolean handleMessage(JSONObject response) {
- JSONObject arg = response.getJSONObject("arg");
- if (arg == null || !CHANNEL_NAME.equals(arg.getString("channel"))) {
- return false;
- }
- try {
- JSONArray data = response.getJSONArray("data");
- if (data == null || data.isEmpty()) return true;
- for (int i = 0; i < data.size(); i++) {
- JSONObject order = data.getJSONObject(i);
- if (!instId.equals(order.getString("instId"))) continue;
-
- String algoId = order.getString("algoId");
- String state = order.getString("state");
- String ordType = order.getString("ordType");
- log.info("[OKX-WS] 算法单更新, algoId:{}, state:{}, ordType:{}, triggerPx:{}, actualPx:{}",
- algoId, state, ordType,
- order.getString("triggerPx"),
- order.getString("actualPx"));
-
- if (gridTradeService != null) {
- gridTradeService.onOrderUpdate(algoId, state, ordType);
- }
- }
- } catch (Exception e) {
- log.error("[OKX-WS] {} 处理数据失败", CHANNEL_NAME, e);
- }
- return true;
- }
-}
--
Gitblit v1.9.1