package com.xcong.excoin.modules.okxNewPrice; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.OKXAccount; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.enums.HttpMethod; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.OKXContants; import com.xcong.excoin.utils.dingtalk.DingTalkUtils; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; /** * OKX 网格交易策略引擎 — 多空对冲网格。 * *

策略原理

* 对齐 Gate 版本逻辑:以空仓基底入场价为价格基准,向上/向下各生成价格网格队列。 * 价格触发网格层级时挂条件单,成交后自动挂止盈单。 * *

完整生命周期

*
 *   startGrid() → WAITING_KLINE
 *     ↓
 *   onKline(首根K线) → OPENING → 异步市价双开基底(开多+开空)
 *     ↓
 *   onPositionUpdate() → 基底成交 → baseLongOpened && baseShortOpened
 *     ↓
 *   tryGenerateQueues() → generateShortQueue()+generateLongQueue()+updateGridElements()
 *     → 挂基座止损单 + state=ACTIVE
 *     ↓
 *   ACTIVE 状态每根K线:
 *     processShortGrid() + processLongGrid() → 匹配队列 → 挂条件单 → 订单成交后挂止盈
 *     ↓
 *   onPositionClose() → cumulativePnl 累加 → 检查停止条件
 * 
* *

OKX vs Gate 差异

* * * @author Administrator */ @Slf4j public class OkxGridTradeService { public enum StrategyState { WAITING_KLINE, OPENING, ACTIVE, STOPPED } private final OkxConfig config; private final OkxTradeExecutor executor; private final OKXAccount okxAccount; private final StopLossManager stopLossManager; private volatile StrategyState state = StrategyState.WAITING_KLINE; /** 空仓价格队列,降序排列(大→小) */ private final List shortPriceQueue = Collections.synchronizedList(new ArrayList<>()); /** 多仓价格队列,升序排列(小→大) */ private final List longPriceQueue = Collections.synchronizedList(new ArrayList<>()); /** 当前多仓条件单映射:algoId → 止盈价格 */ private final Map currentLongOrderIds = Collections.synchronizedMap(new LinkedHashMap<>()); /** 当前空仓条件单映射:algoId → 止盈价格 */ private final Map currentShortOrderIds = Collections.synchronizedMap(new LinkedHashMap<>()); /** 基底空头入场价 */ private BigDecimal shortBaseEntryPrice; /** 基底多头入场价 */ private BigDecimal longBaseEntryPrice; /** 基底多头是否已开 */ private volatile boolean baseLongOpened = false; /** 基底空头是否已开 */ private volatile boolean baseShortOpened = false; // ---- WS 订阅就绪标志 ---- /** candle1m (Business WS) 订阅已确认 */ private volatile boolean candle1mSubscribed = false; /** positions (Private WS) 订阅已确认 */ private volatile boolean positionsSubscribed = false; /** orders (Private WS) 订阅已确认 */ private volatile boolean ordersSubscribed = false; /** 等待所有订阅就绪期间缓存的最新 K 线价格 */ private volatile BigDecimal pendingKlinePrice = null; private volatile boolean shortActive = false; private volatile boolean longActive = false; private volatile BigDecimal lastKlinePrice; private volatile BigDecimal cumulativePnl = BigDecimal.ZERO; private volatile BigDecimal unrealizedPnl = BigDecimal.ZERO; private volatile BigDecimal longEntryPrice = BigDecimal.ZERO; private volatile BigDecimal shortEntryPrice = BigDecimal.ZERO; private volatile BigDecimal longPositionSize = BigDecimal.ZERO; private volatile BigDecimal shortPositionSize = BigDecimal.ZERO; private volatile BigDecimal initialPrincipal = BigDecimal.ZERO; public OkxGridTradeService(OkxConfig config, OKXAccount okxAccount) { this.config = config; this.okxAccount = okxAccount; this.executor = new OkxTradeExecutor(okxAccount, config.getInstId(), config.getTdMode()); this.stopLossManager = new StopLossManager(config, executor); } // ---- 初始化 ---- /** * 初始化策略环境:获取账户 → 清旧条件单 → 平已有仓位 → 设杠杆。 */ public void init() { try { // 1. 查询账户获取初始本金(仅取 USDT 合约账户余额) String balanceResp = executor.getBalance(); if (balanceResp != null) { JSONObject json = JSON.parseObject(balanceResp); if ("0".equals(json.getString("code"))) { JSONArray data = json.getJSONArray("data"); if (data != null && !data.isEmpty()) { JSONObject accountData = data.getJSONObject(0); JSONArray details = accountData.getJSONArray("details"); if (details != null) { for (int i = 0; i < details.size(); i++) { JSONObject detail = details.getJSONObject(i); if ("USDT".equals(detail.getString("ccy"))) { String eq = detail.getString("eq"); if (eq != null) { this.initialPrincipal = new BigDecimal(eq); log.info("[OKX] 初始本金(USDT合约): {} USDT", initialPrincipal); } break; } } } } } } // 2. 清除旧条件单 executor.cancelAllAlgoOrders(); log.info("[OKX] 旧条件单已清除"); // 3. 平掉已有仓位 closeExistingPositions(); // 4. 设置杠杆 executor.setLeverage(config.getLeverage()); log.info("[OKX] 初始化完成, 合约:{}, 杠杆:{}x", config.getInstId(), config.getLeverage()); } catch (Exception e) { log.error("[OKX] 初始化失败", e); } } /** * 平掉当前合约的所有已有仓位。 */ private void closeExistingPositions() { try { String posResp = executor.getPositions(); if (posResp == null) return; JSONObject json = JSON.parseObject(posResp); if (!"0".equals(json.getString("code"))) return; JSONArray data = json.getJSONArray("data"); if (data == null || data.isEmpty()) { log.info("[OKX] 无已有仓位"); return; } for (int i = 0; i < data.size(); i++) { JSONObject pos = data.getJSONObject(i); String instId = pos.getString("instId"); if (!config.getInstId().equals(instId)) continue; String posSide = pos.getString("posSide"); String posSz = pos.getString("pos"); if (posSz == null || "0".equals(posSz)) continue; String side = "long".equals(posSide) ? "sell" : "buy"; executor.marketClose(side, posSide, posSz); log.info("[OKX] 平已有仓位, posSide:{}, sz:{}, side:{}", posSide, posSz, side); } } catch (Exception e) { log.warn("[OKX] 平仓位异常", e); } } // ---- 启动/停止 ---- public void startGrid() { if (state != StrategyState.WAITING_KLINE && state != StrategyState.STOPPED) { log.warn("[OKX] 策略已在运行中, state:{}", state); return; } state = StrategyState.WAITING_KLINE; cumulativePnl = BigDecimal.ZERO; unrealizedPnl = BigDecimal.ZERO; longEntryPrice = BigDecimal.ZERO; shortEntryPrice = BigDecimal.ZERO; longPositionSize = BigDecimal.ZERO; shortPositionSize = BigDecimal.ZERO; baseLongOpened = false; baseShortOpened = false; longActive = false; shortActive = false; candle1mSubscribed = false; positionsSubscribed = false; ordersSubscribed = false; pendingKlinePrice = null; shortPriceQueue.clear(); longPriceQueue.clear(); currentLongOrderIds.clear(); currentShortOrderIds.clear(); // 每次重启重新获取当前本金,确保盈亏对比基准正确 refreshInitialPrincipal(); log.info("[OKX] 网格策略已启动, 当前本金: {} USDT", initialPrincipal); } /** * 重新获取当前账户权益作为初始本金。 */ private void refreshInitialPrincipal() { try { String balanceResp = executor.getBalance(); if (balanceResp != null) { JSONObject json = JSON.parseObject(balanceResp); if ("0".equals(json.getString("code"))) { JSONArray data = json.getJSONArray("data"); if (data != null && !data.isEmpty()) { JSONObject accountData = data.getJSONObject(0); JSONArray details = accountData.getJSONArray("details"); if (details != null) { for (int i = 0; i < details.size(); i++) { JSONObject detail = details.getJSONObject(i); if ("USDT".equals(detail.getString("ccy"))) { String eq = detail.getString("eq"); if (eq != null) { this.initialPrincipal = new BigDecimal(eq); } break; } } } } } } } catch (Exception e) { log.warn("[OKX] 获取初始化本金失败,使用旧值: {}", initialPrincipal); } } public void stopGrid() { state = StrategyState.STOPPED; executor.cancelAllAlgoOrders(); executor.shutdown(); log.info("[OKX] 策略已停止, 累计盈亏: {}", cumulativePnl); } // ---- K线回调 ---- public void onKline(BigDecimal closePrice) { lastKlinePrice = closePrice; updateUnrealizedPnl(); if (state == StrategyState.STOPPED) { // stopGrid() 已做清理,仅打印日志不重复操作 BigDecimal totalPnl = cumulativePnl.add(unrealizedPnl); log.info("[OKX] 已实现:{}, 未实现:{}, 合计:{}", cumulativePnl, unrealizedPnl, totalPnl); return; } if (state == StrategyState.WAITING_KLINE) { // 等待所有 WS 订阅就绪后再开仓 if (!allSubscriptionsReady()) { pendingKlinePrice = closePrice; log.info("[OKX] 等待所有 WS 订阅就绪(candle1m={}, positions={}, orders={}), 当前价: {}", candle1mSubscribed, positionsSubscribed, ordersSubscribed, closePrice); return; } state = StrategyState.OPENING; log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity()); openBasePositions(); return; } if (state != StrategyState.ACTIVE) { return; } checkProfitAndReset(); } /** * WS 订阅成功确认回调,由各 {@link OkxGridChannelHandler#onSubscribed()} 触发。 * 当所有订阅就绪且已有缓存的 K 线价格时,自动触发开仓。 */ public void onSubscriptionConfirmed(String channel) { if ("candle1m".equals(channel)) candle1mSubscribed = true; else if ("positions".equals(channel)) positionsSubscribed = true; else if ("orders".equals(channel)) ordersSubscribed = true; log.info("[OKX] 订阅就绪: {}, 全部就绪: {}", channel, allSubscriptionsReady()); // 所有订阅就绪 + 有缓存 K 线价格 + 仍处于等待状态 → 触发开仓 if (allSubscriptionsReady() && pendingKlinePrice != null && state == StrategyState.WAITING_KLINE) { BigDecimal price = pendingKlinePrice; pendingKlinePrice = null; log.info("[OKX] 所有 WS 订阅就绪,触发开仓, 价格: {}", price); state = StrategyState.OPENING; log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity()); openBasePositions(); } } /** @return true 表示 candle1m + positions + orders 三个频道均已订阅成功 */ private boolean allSubscriptionsReady() { return candle1mSubscribed && positionsSubscribed && ordersSubscribed; } /** 市价双开基底仓位(多 + 空),对齐 Gate 版本逻辑 */ private void openBasePositions() { executor.openLong(config.getBaseQuantity(), (orderId) -> { OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build(); config.setBaseLongTraderParam(baseLongTp); tryGenerateQueues(); }, null); executor.openShort(config.getBaseQuantity(), (orderId) -> { OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build(); config.setBaseShortTraderParam(baseShortTp); tryGenerateQueues(); }, null); } // ---- 仓位推送回调 ---- public void onPositionUpdate(String instId, String posSide, BigDecimal posSize, BigDecimal avgPx) { if (state == StrategyState.STOPPED || state == StrategyState.WAITING_KLINE) { return; } boolean hasPosition = posSize.compareTo(BigDecimal.ZERO) > 0; if ("long".equals(posSide)) { if (hasPosition) { longActive = true; longEntryPrice = avgPx; if (!baseLongOpened) { longPositionSize = posSize; longBaseEntryPrice = avgPx; baseLongOpened = true; log.info("[OKX] 基底多成交价: {}", longBaseEntryPrice); tryGenerateQueues(); } else { longPositionSize = posSize; tryGenerateQueues(); // 后续 WS 推送触发重试,兜底此前 NPE 失败的情况 } } else { if (longActive && state == StrategyState.ACTIVE) { handlePositionZeroAndReset("多仓"); } longActive = false; longPositionSize = BigDecimal.ZERO; } } else if ("short".equals(posSide)) { if (hasPosition) { shortActive = true; shortEntryPrice = avgPx; if (!baseShortOpened) { shortPositionSize = posSize; shortBaseEntryPrice = avgPx; baseShortOpened = true; log.info("[OKX] 基底空成交价: {}", shortBaseEntryPrice); tryGenerateQueues(); } else { shortPositionSize = posSize; tryGenerateQueues(); // 后续 WS 推送触发重试,兜底此前 NPE 失败的情况 } } else { if (shortActive && state == StrategyState.ACTIVE) { handlePositionZeroAndReset("空仓"); } shortActive = false; shortPositionSize = BigDecimal.ZERO; } } } // ---- 平仓推送回调 ---- public void onPositionClose(String side, BigDecimal pnl) { if (state == StrategyState.STOPPED) { return; } cumulativePnl = cumulativePnl.add(pnl); updateUnrealizedPnl(); BigDecimal totalPnl = cumulativePnl.add(unrealizedPnl); log.info("[OKX] 已实现:{}, 未实现:{}, 合计:{}", cumulativePnl, unrealizedPnl, totalPnl); if (totalPnl.compareTo(config.getMaxLoss().negate()) <= 0) { String logMsg = StrUtil.format("[OKX] 已达亏损风险值(合计{}), 已实现:{}, 未实现:{}", totalPnl, cumulativePnl, unrealizedPnl); log.info(logMsg); DingTalkUtils.getDefault().sendActionCard("风险提醒", logMsg, config.getApiKey(), ""); } } // ---- 订单/条件单推送回调 ---- public void onOrderUpdate(String algoId, String state, String ordType) { if (!"filled".equals(state) && !"canceled".equals(state)) { return; } // 匹配止损单 → 委托 StopLossManager OkxGridElement byLongStopLoss = OkxGridElement.findByLongStopLossOrderId(algoId); if (byLongStopLoss != null) { stopLossManager.handleLongStopLossTriggered(byLongStopLoss, longEntryPrice); return; } OkxGridElement byShortStopLoss = OkxGridElement.findByShortStopLossOrderId(algoId); if (byShortStopLoss != null) { stopLossManager.handleShortStopLossTriggered(byShortStopLoss, shortEntryPrice); return; } // 匹配挂单 —— 条件单成交后:清空挂单状态 + 追挂止损 OkxGridElement shortGridElement = OkxGridElement.findByShortOrderId(algoId); if (shortGridElement != null && shortGridElement.isHasShortOrder()) { int filledQty = Integer.parseInt(shortGridElement.getShortTraderParam().getQuantity()); stopLossManager.clearShortEntryState(shortGridElement); stopLossManager.extendShortStopLoss(filledQty); log.info("[OKX] 空单成交 gridId:{}, qty:{}, 追挂止损", shortGridElement.getId(), filledQty); return; } OkxGridElement longGridElement = OkxGridElement.findByLongOrderId(algoId); if (longGridElement != null && longGridElement.isHasLongOrder()) { int filledQty = Integer.parseInt(longGridElement.getLongTraderParam().getQuantity()); stopLossManager.clearLongEntryState(longGridElement); stopLossManager.extendLongStopLoss(filledQty); log.info("[OKX] 多单成交 gridId:{}, qty:{}, 追挂止损", longGridElement.getId(), filledQty); return; } } // ---- 网格队列处理 ---- private void tryGenerateQueues() { // 防止重复执行:一旦已进入 ACTIVE 状态不再重复初始化 if (state == StrategyState.ACTIVE) { return; } if (baseLongOpened && baseShortOpened) { // 防御异步竞态:openLong/openShort 的回调可能还未设置 trader param OkxTraderParam baseLongTp = config.getBaseLongTraderParam(); OkxTraderParam baseShortTp = config.getBaseShortTraderParam(); if (baseLongTp == null || baseShortTp == null) { log.warn("[OKX] tryGenerateQueues 等待异步回调: longTp={}, shortTp={}", baseLongTp != null, baseShortTp != null); return; } // 委托 GridQueueBuilder 构建价格队列 + GridElements List tmpShort = GridQueueBuilder.buildShortQueue(config, shortBaseEntryPrice); List tmpLong = GridQueueBuilder.buildLongQueue(config, shortBaseEntryPrice); synchronized (shortPriceQueue) { shortPriceQueue.clear(); shortPriceQueue.addAll(tmpShort); } synchronized (longPriceQueue) { longPriceQueue.clear(); longPriceQueue.addAll(tmpLong); } GridQueueBuilder.buildGridElements(config, shortPriceQueue, longPriceQueue, shortBaseEntryPrice); // 标记基座挂单 OkxGridElement baseGridElement = OkxGridElement.findById(0); baseGridElement.setLongOrderId(baseLongTp.getEntryOrderId()); baseGridElement.setHasLongOrder(true); baseGridElement.setShortOrderId(baseShortTp.getEntryOrderId()); baseGridElement.setHasShortOrder(true); // 委托 StopLossManager 挂止损单 stopLossManager.setupBaseStopLosses(); state = StrategyState.ACTIVE; } } // ---- 盈亏计算 ---- private void updateUnrealizedPnl() { if (lastKlinePrice == null || lastKlinePrice.compareTo(BigDecimal.ZERO) == 0) return; BigDecimal multiplier = config.getCtVal(); BigDecimal longPnl = BigDecimal.ZERO; BigDecimal shortPnl = BigDecimal.ZERO; if (longPositionSize.compareTo(BigDecimal.ZERO) > 0 && longEntryPrice.compareTo(BigDecimal.ZERO) > 0) { longPnl = longPositionSize.multiply(multiplier).multiply(lastKlinePrice.subtract(longEntryPrice)); } if (shortPositionSize.compareTo(BigDecimal.ZERO) > 0 && shortEntryPrice.compareTo(BigDecimal.ZERO) > 0) { shortPnl = shortPositionSize.multiply(multiplier).multiply(shortEntryPrice.subtract(lastKlinePrice)); } unrealizedPnl = longPnl.add(shortPnl); log.info("[OKX] 未实现盈亏: {}", unrealizedPnl); } private boolean isMarginSafe() { try { String balanceResp = executor.getBalance(); if (balanceResp == null) return true; JSONObject json = JSON.parseObject(balanceResp); if (!"0".equals(json.getString("code"))) return true; JSONArray data = json.getJSONArray("data"); if (data == null || data.isEmpty()) return true; JSONObject detail = data.getJSONObject(0); String imr = detail.getString("imr"); if (imr == null) return true; BigDecimal margin = new BigDecimal(imr); if (initialPrincipal.compareTo(BigDecimal.ZERO) == 0) return true; BigDecimal ratio = margin.divide(initialPrincipal, 4, RoundingMode.HALF_UP); log.debug("[OKX] 保证金比例: {}/{}={}", margin, initialPrincipal, ratio); return ratio.compareTo(config.getMarginRatioLimit()) < 0; } catch (Exception e) { log.warn("[OKX] 查保证金失败,默认放行", e); return true; } } private void checkProfitAndReset() { try { String balanceResp = executor.getBalance(); if (balanceResp == null) return; JSONObject json = JSON.parseObject(balanceResp); if (!"0".equals(json.getString("code"))) return; JSONArray data = json.getJSONArray("data"); if (data == null || data.isEmpty()) return; JSONObject detail = data.getJSONObject(0); String upl = detail.getString("upl"); String availEq = detail.getString("availEq"); if (upl == null || availEq == null) return; BigDecimal unrealisedPnl = new BigDecimal(upl); BigDecimal available = new BigDecimal(availEq); BigDecimal totalEquity = unrealisedPnl.add(available); BigDecimal target = initialPrincipal.add(config.getExpectedProfit()); log.info("[OKX] 盈亏检查 upl:{}, availEq:{}, 合计:{}, 目标:{}", unrealisedPnl, available, totalEquity, target); if (totalEquity.compareTo(target) > 0) { log.info("[OKX] 盈亏达标({}>{}),重置策略", totalEquity, target); state = StrategyState.STOPPED; closeExistingPositions(); executor.cancelAllAlgoOrders(); // 提交到 executor 末尾:单线程FIFO保证前面所有平仓/取消任务完成后才重置 executor.submitTask(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } startGrid(); }); } } catch (Exception e) { log.warn("[OKX] 盈亏检查失败", e); } } private void handlePositionZeroAndReset(String direction) { log.info("[OKX] {}持仓归零,重置策略", direction); state = StrategyState.STOPPED; executor.cancelAllAlgoOrders(); closeExistingPositions(); // 提交到 executor 末尾:FIFO保证平仓完成后再重置 executor.submitTask(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } startGrid(); }); } // ---- getters ---- public BigDecimal getLastKlinePrice() { return lastKlinePrice; } public boolean isStrategyActive() { return state != StrategyState.STOPPED && state != StrategyState.WAITING_KLINE; } public BigDecimal getCumulativePnl() { return cumulativePnl; } public BigDecimal getUnrealizedPnl() { return unrealizedPnl; } public StrategyState getState() { return state; } }