Administrator
2026-06-05 70b000665c80284571ed653afbbd6d10af74739d
feat(okx): 添加WebSocket订阅确认机制和优化网格交易配置

- 在OkxGridChannelHandler接口中添加onSubscribed默认回调方法
- 添加candle1m、positions、orders三个频道订阅就绪标志位
- 实现订阅就绪检查逻辑,确保所有频道就绪后才开仓
- 添加订阅确认时的价格缓存机制,避免消息丢失
- 在OkxWsClient中解析subscribe事件并通知对应处理器
- 为K线、订单、持仓三个频道处理器实现订阅确认回调
- 修改WebSocket客户端管理器中的默认交易对和网格参数配置
7 files modified
130 ■■■■ changed files
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java 75 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java 15 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java 10 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxGridChannelHandler.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxKlineChannelHandler.java 8 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxOrdersChannelHandler.java 8 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxPositionsChannelHandler.java 8 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridTradeService.java
@@ -79,6 +79,16 @@
    /** 基底空头是否已开 */
    private volatile boolean baseShortOpened = false;
    // ---- WS 订阅就绪标志 ----
    /** candle1m (Business WS) 订阅已确认 */
    private volatile boolean candle1mSubscribed = false;
    /** positions (Private WS) 订阅已确认 */
    private volatile boolean positionsSubscribed = false;
    /** orders (Private WS) 订阅已确认 */
    private volatile boolean ordersSubscribed = false;
    /** 等待所有订阅就绪期间缓存的最新 K 线价格 */
    private volatile BigDecimal pendingKlinePrice = null;
    private volatile boolean shortActive = false;
    private volatile boolean longActive = false;
@@ -195,6 +205,10 @@
        baseShortOpened = false;
        longActive = false;
        shortActive = false;
        candle1mSubscribed = false;
        positionsSubscribed = false;
        ordersSubscribed = false;
        pendingKlinePrice = null;
        shortPriceQueue.clear();
        longPriceQueue.clear();
        currentLongOrderIds.clear();
@@ -262,18 +276,16 @@
        }
        if (state == StrategyState.WAITING_KLINE) {
            // 等待所有 WS 订阅就绪后再开仓
            if (!allSubscriptionsReady()) {
                pendingKlinePrice = closePrice;
                log.info("[OKX] 等待所有 WS 订阅就绪(candle1m={}, positions={}, orders={}), 当前价: {}",
                        candle1mSubscribed, positionsSubscribed, ordersSubscribed, closePrice);
                return;
            }
            state = StrategyState.OPENING;
            log.info("[OKX] 首根K线到达,开基底仓位 多空各{}张...", config.getBaseQuantity());
            executor.openLong(config.getBaseQuantity(), (orderId) -> {
                OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build();
                config.setBaseLongTraderParam(baseLongTp);
                tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE
            }, null);
            executor.openShort(config.getBaseQuantity(), (orderId) -> {
                OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build();
                config.setBaseShortTraderParam(baseShortTp);
                tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE
            }, null);
            log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
            openBasePositions();
            return;
        }
@@ -283,6 +295,47 @@
        checkProfitAndReset();
    }
    /**
     * WS 订阅成功确认回调,由各 {@link OkxGridChannelHandler#onSubscribed()} 触发。
     * 当所有订阅就绪且已有缓存的 K 线价格时,自动触发开仓。
     */
    public void onSubscriptionConfirmed(String channel) {
        if ("candle1m".equals(channel)) candle1mSubscribed = true;
        else if ("positions".equals(channel)) positionsSubscribed = true;
        else if ("orders".equals(channel)) ordersSubscribed = true;
        log.info("[OKX] 订阅就绪: {}, 全部就绪: {}", channel, allSubscriptionsReady());
        // 所有订阅就绪 + 有缓存 K 线价格 + 仍处于等待状态 → 触发开仓
        if (allSubscriptionsReady() && pendingKlinePrice != null && state == StrategyState.WAITING_KLINE) {
            BigDecimal price = pendingKlinePrice;
            pendingKlinePrice = null;
            log.info("[OKX] 所有 WS 订阅就绪,触发开仓, 价格: {}", price);
            state = StrategyState.OPENING;
            log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
            openBasePositions();
        }
    }
    /** @return true 表示 candle1m + positions + orders 三个频道均已订阅成功 */
    private boolean allSubscriptionsReady() {
        return candle1mSubscribed && positionsSubscribed && ordersSubscribed;
    }
    /** 市价双开基底仓位(多 + 空),对齐 Gate 版本逻辑 */
    private void openBasePositions() {
        executor.openLong(config.getBaseQuantity(), (orderId) -> {
            OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build();
            config.setBaseLongTraderParam(baseLongTp);
            tryGenerateQueues();
        }, null);
        executor.openShort(config.getBaseQuantity(), (orderId) -> {
            OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build();
            config.setBaseShortTraderParam(baseShortTp);
            tryGenerateQueues();
        }, null);
    }
    // ---- 仓位推送回调 ----
    public void onPositionUpdate(String instId, String posSide, BigDecimal posSize, BigDecimal avgPx) {
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java
@@ -229,6 +229,21 @@
            if ("subscribe".equals(event) || "unsubscribe".equals(event)) {
                log.info("[{}] {}事件: {}", logPrefix, event, response.getString("arg"));
                // 订阅成功确认:解析频道名并通知对应 handler
                if ("subscribe".equals(event)) {
                    JSONObject argObj = response.getJSONObject("arg");
                    if (argObj != null) {
                        String channel = argObj.getString("channel");
                        if (channel != null) {
                            for (OkxGridChannelHandler handler : channelHandlers) {
                                if (channel.equals(handler.getChannelName())) {
                                    handler.onSubscribed();
                                    break;
                                }
                            }
                        }
                    }
                }
                return;
            }
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
@@ -86,14 +86,14 @@
                    .apiKey(primaryAccount.getApiKey())
                    .secretKey(primaryAccount.getSecretKey())
                    .passphrase(primaryAccount.getPassphrase())
                    .instId("BTC-USDT-SWAP")
                    .instId("ETH-USDT-SWAP")
                    .leverage("100")
                    .tdMode("cross")
                    .gridRate(new BigDecimal("0.001"))
                    .expectedProfit(new BigDecimal("200"))
                    .gridRate(new BigDecimal("0.002"))
                    .expectedProfit(new BigDecimal("180"))
                    .maxLoss(new BigDecimal("30"))
                    .quantity("1")
                    .baseQuantity("10")
                    .quantity("5")
                    .baseQuantity("50")
                    .priceScale(2)
                    .ctVal(new BigDecimal("0.01"))
                    .isSimulate(!primaryAccount.isAccountType())
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxGridChannelHandler.java
@@ -27,4 +27,10 @@
     * @return true 表示已处理(循环停止),false 表示频道不匹配
     */
    boolean handleMessage(JSONObject response);
    /**
     * 订阅成功确认回调,收到 OKX 服务器 subscribe 事件时触发。
     * 子类可重写以通知策略引擎订阅就绪。
     */
    default void onSubscribed() {}
}
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxKlineChannelHandler.java
@@ -46,6 +46,14 @@
    }
    @Override
    public void onSubscribed() {
        log.info("[OKX-WS] {} 订阅确认, instId:{}", CHANNEL_NAME, instId);
        if (gridTradeService != null) {
            gridTradeService.onSubscriptionConfirmed(CHANNEL_NAME);
        }
    }
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        JSONObject arg = new JSONObject();
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxOrdersChannelHandler.java
@@ -41,6 +41,14 @@
    }
    @Override
    public void onSubscribed() {
        log.info("[OKX-WS] {} 订阅确认", CHANNEL_NAME);
        if (gridTradeService != null) {
            gridTradeService.onSubscriptionConfirmed(CHANNEL_NAME);
        }
    }
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        JSONObject arg = new JSONObject();
src/main/java/com/xcong/excoin/modules/okxNewPrice/gridWs/OkxPositionsChannelHandler.java
@@ -45,6 +45,14 @@
    }
    @Override
    public void onSubscribed() {
        log.info("[OKX-WS] {} 订阅确认", CHANNEL_NAME);
        if (gridTradeService != null) {
            gridTradeService.onSubscriptionConfirmed(CHANNEL_NAME);
        }
    }
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        JSONObject arg = new JSONObject();