From 70b000665c80284571ed653afbbd6d10af74739d Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Fri, 05 Jun 2026 14:24:10 +0800
Subject: [PATCH] feat(okx): 添加WebSocket订阅确认机制和优化网格交易配置
---
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java | 75 ++++++++++++++++++++++++++++++++-----
1 files changed, 64 insertions(+), 11 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
index aa9f09b..841cfb0 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
@@ -79,6 +79,16 @@
/** 基底空头是否已开 */
private volatile boolean baseShortOpened = false;
+ // ---- WS 订阅就绪标志 ----
+ /** candle1m (Business WS) 订阅已确认 */
+ private volatile boolean candle1mSubscribed = false;
+ /** positions (Private WS) 订阅已确认 */
+ private volatile boolean positionsSubscribed = false;
+ /** orders (Private WS) 订阅已确认 */
+ private volatile boolean ordersSubscribed = false;
+ /** 等待所有订阅就绪期间缓存的最新 K 线价格 */
+ private volatile BigDecimal pendingKlinePrice = null;
+
private volatile boolean shortActive = false;
private volatile boolean longActive = false;
@@ -195,6 +205,10 @@
baseShortOpened = false;
longActive = false;
shortActive = false;
+ candle1mSubscribed = false;
+ positionsSubscribed = false;
+ ordersSubscribed = false;
+ pendingKlinePrice = null;
shortPriceQueue.clear();
longPriceQueue.clear();
currentLongOrderIds.clear();
@@ -262,18 +276,16 @@
}
if (state == StrategyState.WAITING_KLINE) {
+ // 等待所有 WS 订阅就绪后再开仓
+ if (!allSubscriptionsReady()) {
+ pendingKlinePrice = closePrice;
+ log.info("[OKX] 等待所有 WS 订阅就绪(candle1m={}, positions={}, orders={}), 当前价: {}",
+ candle1mSubscribed, positionsSubscribed, ordersSubscribed, closePrice);
+ return;
+ }
state = StrategyState.OPENING;
- log.info("[OKX] 首根K线到达,开基底仓位 多空各{}张...", config.getBaseQuantity());
- executor.openLong(config.getBaseQuantity(), (orderId) -> {
- OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build();
- config.setBaseLongTraderParam(baseLongTp);
- tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE
- }, null);
- executor.openShort(config.getBaseQuantity(), (orderId) -> {
- OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build();
- config.setBaseShortTraderParam(baseShortTp);
- tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE
- }, null);
+ log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
+ openBasePositions();
return;
}
@@ -283,6 +295,47 @@
checkProfitAndReset();
}
+ /**
+ * WS 订阅成功确认回调,由各 {@link OkxGridChannelHandler#onSubscribed()} 触发。
+ * 当所有订阅就绪且已有缓存的 K 线价格时,自动触发开仓。
+ */
+ public void onSubscriptionConfirmed(String channel) {
+ if ("candle1m".equals(channel)) candle1mSubscribed = true;
+ else if ("positions".equals(channel)) positionsSubscribed = true;
+ else if ("orders".equals(channel)) ordersSubscribed = true;
+
+ log.info("[OKX] 订阅就绪: {}, 全部就绪: {}", channel, allSubscriptionsReady());
+
+ // 所有订阅就绪 + 有缓存 K 线价格 + 仍处于等待状态 → 触发开仓
+ if (allSubscriptionsReady() && pendingKlinePrice != null && state == StrategyState.WAITING_KLINE) {
+ BigDecimal price = pendingKlinePrice;
+ pendingKlinePrice = null;
+ log.info("[OKX] 所有 WS 订阅就绪,触发开仓, 价格: {}", price);
+ state = StrategyState.OPENING;
+ log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
+ openBasePositions();
+ }
+ }
+
+ /** @return true 表示 candle1m + positions + orders 三个频道均已订阅成功 */
+ private boolean allSubscriptionsReady() {
+ return candle1mSubscribed && positionsSubscribed && ordersSubscribed;
+ }
+
+ /** 市价双开基底仓位(多 + 空),对齐 Gate 版本逻辑 */
+ private void openBasePositions() {
+ executor.openLong(config.getBaseQuantity(), (orderId) -> {
+ OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build();
+ config.setBaseLongTraderParam(baseLongTp);
+ tryGenerateQueues();
+ }, null);
+ executor.openShort(config.getBaseQuantity(), (orderId) -> {
+ OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build();
+ config.setBaseShortTraderParam(baseShortTp);
+ tryGenerateQueues();
+ }, null);
+ }
+
// ---- 仓位推送回调 ----
public void onPositionUpdate(String instId, String posSide, BigDecimal posSize, BigDecimal avgPx) {
--
Gitblit v1.9.1