| | |
| | | /** 基底空头是否已开 */ |
| | | private volatile boolean baseShortOpened = false; |
| | | |
| | | // ---- WS 订阅就绪标志 ---- |
| | | /** candle1m (Business WS) 订阅已确认 */ |
| | | private volatile boolean candle1mSubscribed = false; |
| | | /** positions (Private WS) 订阅已确认 */ |
| | | private volatile boolean positionsSubscribed = false; |
| | | /** orders (Private WS) 订阅已确认 */ |
| | | private volatile boolean ordersSubscribed = false; |
| | | /** 等待所有订阅就绪期间缓存的最新 K 线价格 */ |
| | | private volatile BigDecimal pendingKlinePrice = null; |
| | | |
| | | private volatile boolean shortActive = false; |
| | | private volatile boolean longActive = false; |
| | | |
| | |
| | | baseShortOpened = false; |
| | | longActive = false; |
| | | shortActive = false; |
| | | candle1mSubscribed = false; |
| | | positionsSubscribed = false; |
| | | ordersSubscribed = false; |
| | | pendingKlinePrice = null; |
| | | shortPriceQueue.clear(); |
| | | longPriceQueue.clear(); |
| | | currentLongOrderIds.clear(); |
| | |
| | | } |
| | | |
| | | if (state == StrategyState.WAITING_KLINE) { |
| | | // 等待所有 WS 订阅就绪后再开仓 |
| | | if (!allSubscriptionsReady()) { |
| | | pendingKlinePrice = closePrice; |
| | | log.info("[OKX] 等待所有 WS 订阅就绪(candle1m={}, positions={}, orders={}), 当前价: {}", |
| | | candle1mSubscribed, positionsSubscribed, ordersSubscribed, closePrice); |
| | | return; |
| | | } |
| | | state = StrategyState.OPENING; |
| | | log.info("[OKX] 首根K线到达,开基底仓位 多空各{}张...", config.getBaseQuantity()); |
| | | executor.openLong(config.getBaseQuantity(), (orderId) -> { |
| | | OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build(); |
| | | config.setBaseLongTraderParam(baseLongTp); |
| | | tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE |
| | | }, null); |
| | | executor.openShort(config.getBaseQuantity(), (orderId) -> { |
| | | OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build(); |
| | | config.setBaseShortTraderParam(baseShortTp); |
| | | tryGenerateQueues(); // 异步回调到达后重试,防止 WS 推送先到导致 NPE |
| | | }, null); |
| | | log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity()); |
| | | openBasePositions(); |
| | | return; |
| | | } |
| | | |
| | |
| | | checkProfitAndReset(); |
| | | } |
| | | |
| | | /** |
| | | * WS 订阅成功确认回调,由各 {@link OkxGridChannelHandler#onSubscribed()} 触发。 |
| | | * 当所有订阅就绪且已有缓存的 K 线价格时,自动触发开仓。 |
| | | */ |
| | | public void onSubscriptionConfirmed(String channel) { |
| | | if ("candle1m".equals(channel)) candle1mSubscribed = true; |
| | | else if ("positions".equals(channel)) positionsSubscribed = true; |
| | | else if ("orders".equals(channel)) ordersSubscribed = true; |
| | | |
| | | log.info("[OKX] 订阅就绪: {}, 全部就绪: {}", channel, allSubscriptionsReady()); |
| | | |
| | | // 所有订阅就绪 + 有缓存 K 线价格 + 仍处于等待状态 → 触发开仓 |
| | | if (allSubscriptionsReady() && pendingKlinePrice != null && state == StrategyState.WAITING_KLINE) { |
| | | BigDecimal price = pendingKlinePrice; |
| | | pendingKlinePrice = null; |
| | | log.info("[OKX] 所有 WS 订阅就绪,触发开仓, 价格: {}", price); |
| | | state = StrategyState.OPENING; |
| | | log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity()); |
| | | openBasePositions(); |
| | | } |
| | | } |
| | | |
| | | /** @return true 表示 candle1m + positions + orders 三个频道均已订阅成功 */ |
| | | private boolean allSubscriptionsReady() { |
| | | return candle1mSubscribed && positionsSubscribed && ordersSubscribed; |
| | | } |
| | | |
| | | /** 市价双开基底仓位(多 + 空),对齐 Gate 版本逻辑 */ |
| | | private void openBasePositions() { |
| | | executor.openLong(config.getBaseQuantity(), (orderId) -> { |
| | | OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build(); |
| | | config.setBaseLongTraderParam(baseLongTp); |
| | | tryGenerateQueues(); |
| | | }, null); |
| | | executor.openShort(config.getBaseQuantity(), (orderId) -> { |
| | | OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build(); |
| | | config.setBaseShortTraderParam(baseShortTp); |
| | | tryGenerateQueues(); |
| | | }, null); |
| | | } |
| | | |
| | | // ---- 仓位推送回调 ---- |
| | | |
| | | public void onPositionUpdate(String instId, String posSide, BigDecimal posSize, BigDecimal avgPx) { |