From 70b000665c80284571ed653afbbd6d10af74739d Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Fri, 05 Jun 2026 14:24:10 +0800
Subject: [PATCH] feat(okx): 添加WebSocket订阅确认机制和优化网格交易配置

---
 src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java               |   75 +++++++++++++++++++++---
 src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java         |   10 +-
 src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java                   |   15 +++++
 src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxGridChannelHandler.java      |    6 ++
 src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxOrdersChannelHandler.java    |    8 ++
 src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxKlineChannelHandler.java     |    8 ++
 src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxPositionsChannelHandler.java |    8 ++
 7 files changed, 114 insertions(+), 16 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
index aa9f09b..841cfb0 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
@@ -79,6 +79,16 @@
     /** 基底空头是否已开 */
     private volatile boolean baseShortOpened = false;
 
+    // ---- WS 订阅就绪标志 ----
+    /** candle1m (Business WS) 订阅已确认 */
+    private volatile boolean candle1mSubscribed = false;
+    /** positions (Private WS) 订阅已确认 */
+    private volatile boolean positionsSubscribed = false;
+    /** orders (Private WS) 订阅已确认 */
+    private volatile boolean ordersSubscribed = false;
+    /** 等待所有订阅就绪期间缓存的最新 K 线价格 */
+    private volatile BigDecimal pendingKlinePrice = null;
+
     private volatile boolean shortActive = false;
     private volatile boolean longActive = false;
 
@@ -195,6 +205,10 @@
         baseShortOpened = false;
         longActive = false;
         shortActive = false;
+        candle1mSubscribed = false;
+        positionsSubscribed = false;
+        ordersSubscribed = false;
+        pendingKlinePrice = null;
         shortPriceQueue.clear();
         longPriceQueue.clear();
         currentLongOrderIds.clear();
@@ -262,18 +276,16 @@
         }
 
         if (state == StrategyState.WAITING_KLINE) {
+            // 等待所有 WS 订阅就绪后再开仓
+            if (!allSubscriptionsReady()) {
+                pendingKlinePrice = closePrice;
+                log.info("[OKX] 等待所有 WS 订阅就绪(candle1m={}, positions={}, orders={}), 当前价: {}",
+                        candle1mSubscribed, positionsSubscribed, ordersSubscribed, closePrice);
+                return;
+            }
             state = StrategyState.OPENING;
-            log.info("[OKX] 首根K线到达,开基底仓位 多空各{}张...", config.getBaseQuantity());
-            executor.openLong(config.getBaseQuantity(), (orderId) -> {
-                OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build();
-                config.setBaseLongTraderParam(baseLongTp);
-                tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE
-            }, null);
-            executor.openShort(config.getBaseQuantity(), (orderId) -> {
-                OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build();
-                config.setBaseShortTraderParam(baseShortTp);
-                tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE
-            }, null);
+            log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
+            openBasePositions();
             return;
         }
 
@@ -283,6 +295,47 @@
         checkProfitAndReset();
     }
 
+    /**
+     * WS 订阅成功确认回调,由各 {@link OkxGridChannelHandler#onSubscribed()} 触发。
+     * 当所有订阅就绪且已有缓存的 K 线价格时,自动触发开仓。
+     */
+    public void onSubscriptionConfirmed(String channel) {
+        if ("candle1m".equals(channel)) candle1mSubscribed = true;
+        else if ("positions".equals(channel)) positionsSubscribed = true;
+        else if ("orders".equals(channel)) ordersSubscribed = true;
+
+        log.info("[OKX] 订阅就绪: {}, 全部就绪: {}", channel, allSubscriptionsReady());
+
+        // 所有订阅就绪 + 有缓存 K 线价格 + 仍处于等待状态 → 触发开仓
+        if (allSubscriptionsReady() && pendingKlinePrice != null && state == StrategyState.WAITING_KLINE) {
+            BigDecimal price = pendingKlinePrice;
+            pendingKlinePrice = null;
+            log.info("[OKX] 所有 WS 订阅就绪,触发开仓, 价格: {}", price);
+            state = StrategyState.OPENING;
+            log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
+            openBasePositions();
+        }
+    }
+
+    /** @return true 表示 candle1m + positions + orders 三个频道均已订阅成功 */
+    private boolean allSubscriptionsReady() {
+        return candle1mSubscribed && positionsSubscribed && ordersSubscribed;
+    }
+
+    /** 市价双开基底仓位(多 + 空),对齐 Gate 版本逻辑 */
+    private void openBasePositions() {
+        executor.openLong(config.getBaseQuantity(), (orderId) -> {
+            OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build();
+            config.setBaseLongTraderParam(baseLongTp);
+            tryGenerateQueues();
+        }, null);
+        executor.openShort(config.getBaseQuantity(), (orderId) -> {
+            OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build();
+            config.setBaseShortTraderParam(baseShortTp);
+            tryGenerateQueues();
+        }, null);
+    }
+
     // ---- 仓位推送回调 ----
 
     public void onPositionUpdate(String instId, String posSide, BigDecimal posSize, BigDecimal avgPx) {
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java
index 46b5ac8..8d5329b 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java
@@ -229,6 +229,21 @@
 
             if ("subscribe".equals(event) || "unsubscribe".equals(event)) {
                 log.info("[{}] {}事件: {}", logPrefix, event, response.getString("arg"));
+                // 订阅成功确认:解析频道名并通知对应 handler
+                if ("subscribe".equals(event)) {
+                    JSONObject argObj = response.getJSONObject("arg");
+                    if (argObj != null) {
+                        String channel = argObj.getString("channel");
+                        if (channel != null) {
+                            for (OkxGridChannelHandler handler : channelHandlers) {
+                                if (channel.equals(handler.getChannelName())) {
+                                    handler.onSubscribed();
+                                    break;
+                                }
+                            }
+                        }
+                    }
+                }
                 return;
             }
 
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
index 845f43d..439fa8d 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
@@ -86,14 +86,14 @@
                     .apiKey(primaryAccount.getApiKey())
                     .secretKey(primaryAccount.getSecretKey())
                     .passphrase(primaryAccount.getPassphrase())
-                    .instId("BTC-USDT-SWAP")
+                    .instId("ETH-USDT-SWAP")
                     .leverage("100")
                     .tdMode("cross")
-                    .gridRate(new BigDecimal("0.001"))
-                    .expectedProfit(new BigDecimal("200"))
+                    .gridRate(new BigDecimal("0.002"))
+                    .expectedProfit(new BigDecimal("180"))
                     .maxLoss(new BigDecimal("30"))
-                    .quantity("1")
-                    .baseQuantity("10")
+                    .quantity("5")
+                    .baseQuantity("50")
                     .priceScale(2)
                     .ctVal(new BigDecimal("0.01"))
                     .isSimulate(!primaryAccount.isAccountType())
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxGridChannelHandler.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxGridChannelHandler.java
index b0d065f..ad7a6c5 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxGridChannelHandler.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxGridChannelHandler.java
@@ -27,4 +27,10 @@
      * @return true 表示已处理(循环停止),false 表示频道不匹配
      */
     boolean handleMessage(JSONObject response);
+
+    /**
+     * 订阅成功确认回调,收到 OKX 服务器 subscribe 事件时触发。
+     * 子类可重写以通知策略引擎订阅就绪。
+     */
+    default void onSubscribed() {}
 }
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxKlineChannelHandler.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxKlineChannelHandler.java
index 401d236..c9cd5c3 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxKlineChannelHandler.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxKlineChannelHandler.java
@@ -46,6 +46,14 @@
     }
 
     @Override
+    public void onSubscribed() {
+        log.info("[OKX-WS] {} 订阅确认, instId:{}", CHANNEL_NAME, instId);
+        if (gridTradeService != null) {
+            gridTradeService.onSubscriptionConfirmed(CHANNEL_NAME);
+        }
+    }
+
+    @Override
     public void unsubscribe(WebSocketClient ws) {
         JSONObject msg = new JSONObject();
         JSONObject arg = new JSONObject();
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxOrdersChannelHandler.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxOrdersChannelHandler.java
index d679957..3d400de 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxOrdersChannelHandler.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxOrdersChannelHandler.java
@@ -41,6 +41,14 @@
     }
 
     @Override
+    public void onSubscribed() {
+        log.info("[OKX-WS] {} 订阅确认", CHANNEL_NAME);
+        if (gridTradeService != null) {
+            gridTradeService.onSubscriptionConfirmed(CHANNEL_NAME);
+        }
+    }
+
+    @Override
     public void unsubscribe(WebSocketClient ws) {
         JSONObject msg = new JSONObject();
         JSONObject arg = new JSONObject();
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxPositionsChannelHandler.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxPositionsChannelHandler.java
index e3aaed2..2a77f76 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxPositionsChannelHandler.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxPositionsChannelHandler.java
@@ -45,6 +45,14 @@
     }
 
     @Override
+    public void onSubscribed() {
+        log.info("[OKX-WS] {} 订阅确认", CHANNEL_NAME);
+        if (gridTradeService != null) {
+            gridTradeService.onSubscriptionConfirmed(CHANNEL_NAME);
+        }
+    }
+
+    @Override
     public void unsubscribe(WebSocketClient ws) {
         JSONObject msg = new JSONObject();
         JSONObject arg = new JSONObject();

--
Gitblit v1.9.1