package com.xcong.excoin.modules.okxNewPrice;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.OKXAccount;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.enums.HttpMethod;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.OKXContants;
import com.xcong.excoin.utils.dingtalk.DingTalkUtils;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.*;
/**
* OKX 网格交易策略引擎 — 多空对冲网格。
*
*
策略原理
* 对齐 Gate 版本逻辑:以空仓基底入场价为价格基准,向上/向下各生成价格网格队列。
* 价格触发网格层级时挂条件单,成交后自动挂止盈单。
*
* 完整生命周期
*
* startGrid() → WAITING_KLINE
* ↓
* onKline(首根K线) → OPENING → 异步市价双开基底(开多+开空)
* ↓
* onPositionUpdate() → 基底成交 → baseLongOpened && baseShortOpened
* ↓
* tryGenerateQueues() → generateShortQueue()+generateLongQueue()+updateGridElements()
* → 挂基座止损单 + state=ACTIVE
* ↓
* ACTIVE 状态每根K线:
* processShortGrid() + processLongGrid() → 匹配队列 → 挂条件单 → 订单成交后挂止盈
* ↓
* onPositionClose() → cumulativePnl 累加 → 检查停止条件
*
*
* OKX vs Gate 差异
*
* - OKX 使用 side(buy/sell)+posSide(long/short) 下单,而非 size 正负
* - 条件单使用 algo order (ordType=conditional)
* - 止盈止损通过 algo order 实现
*
*
* @author Administrator
*/
@Slf4j
public class OkxGridTradeService {
public enum StrategyState {
WAITING_KLINE, OPENING, ACTIVE, STOPPED
}
private final OkxConfig config;
private final OkxTradeExecutor executor;
private final OKXAccount okxAccount;
private final StopLossManager stopLossManager;
private volatile StrategyState state = StrategyState.WAITING_KLINE;
/** 空仓价格队列,降序排列(大→小) */
private final List shortPriceQueue = Collections.synchronizedList(new ArrayList<>());
/** 多仓价格队列,升序排列(小→大) */
private final List longPriceQueue = Collections.synchronizedList(new ArrayList<>());
/** 当前多仓条件单映射:algoId → 止盈价格 */
private final Map currentLongOrderIds = Collections.synchronizedMap(new LinkedHashMap<>());
/** 当前空仓条件单映射:algoId → 止盈价格 */
private final Map currentShortOrderIds = Collections.synchronizedMap(new LinkedHashMap<>());
/** 基底空头入场价 */
private BigDecimal shortBaseEntryPrice;
/** 基底多头入场价 */
private BigDecimal longBaseEntryPrice;
/** 基底多头是否已开 */
private volatile boolean baseLongOpened = false;
/** 基底空头是否已开 */
private volatile boolean baseShortOpened = false;
// ---- WS 订阅就绪标志 ----
/** candle1m (Business WS) 订阅已确认 */
private volatile boolean candle1mSubscribed = false;
/** positions (Private WS) 订阅已确认 */
private volatile boolean positionsSubscribed = false;
/** orders (Private WS) 订阅已确认 */
private volatile boolean ordersSubscribed = false;
/** 等待所有订阅就绪期间缓存的最新 K 线价格 */
private volatile BigDecimal pendingKlinePrice = null;
private volatile boolean shortActive = false;
private volatile boolean longActive = false;
private volatile BigDecimal lastKlinePrice;
private volatile BigDecimal cumulativePnl = BigDecimal.ZERO;
private volatile BigDecimal unrealizedPnl = BigDecimal.ZERO;
private volatile BigDecimal longEntryPrice = BigDecimal.ZERO;
private volatile BigDecimal shortEntryPrice = BigDecimal.ZERO;
private volatile BigDecimal longPositionSize = BigDecimal.ZERO;
private volatile BigDecimal shortPositionSize = BigDecimal.ZERO;
private volatile BigDecimal initialPrincipal = BigDecimal.ZERO;
public OkxGridTradeService(OkxConfig config, OKXAccount okxAccount) {
this.config = config;
this.okxAccount = okxAccount;
this.executor = new OkxTradeExecutor(okxAccount, config.getInstId(), config.getTdMode());
this.stopLossManager = new StopLossManager(config, executor);
}
// ---- 初始化 ----
/**
* 初始化策略环境:获取账户 → 清旧条件单 → 平已有仓位 → 设杠杆。
*/
public void init() {
try {
// 1. 查询账户获取初始本金(仅取 USDT 合约账户余额)
String balanceResp = executor.getBalance();
if (balanceResp != null) {
JSONObject json = JSON.parseObject(balanceResp);
if ("0".equals(json.getString("code"))) {
JSONArray data = json.getJSONArray("data");
if (data != null && !data.isEmpty()) {
JSONObject accountData = data.getJSONObject(0);
JSONArray details = accountData.getJSONArray("details");
if (details != null) {
for (int i = 0; i < details.size(); i++) {
JSONObject detail = details.getJSONObject(i);
if ("USDT".equals(detail.getString("ccy"))) {
String eq = detail.getString("eq");
if (eq != null) {
this.initialPrincipal = new BigDecimal(eq);
log.info("[OKX] 初始本金(USDT合约): {} USDT", initialPrincipal);
}
break;
}
}
}
}
}
}
// 2. 清除旧条件单
executor.cancelAllAlgoOrders();
log.info("[OKX] 旧条件单已清除");
// 3. 平掉已有仓位
closeExistingPositions();
// 4. 设置杠杆
executor.setLeverage(config.getLeverage());
log.info("[OKX] 初始化完成, 合约:{}, 杠杆:{}x", config.getInstId(), config.getLeverage());
} catch (Exception e) {
log.error("[OKX] 初始化失败", e);
}
}
/**
* 平掉当前合约的所有已有仓位。
*/
private void closeExistingPositions() {
try {
String posResp = executor.getPositions();
if (posResp == null) return;
JSONObject json = JSON.parseObject(posResp);
if (!"0".equals(json.getString("code"))) return;
JSONArray data = json.getJSONArray("data");
if (data == null || data.isEmpty()) {
log.info("[OKX] 无已有仓位");
return;
}
for (int i = 0; i < data.size(); i++) {
JSONObject pos = data.getJSONObject(i);
String instId = pos.getString("instId");
if (!config.getInstId().equals(instId)) continue;
String posSide = pos.getString("posSide");
String posSz = pos.getString("pos");
if (posSz == null || "0".equals(posSz)) continue;
String side = "long".equals(posSide) ? "sell" : "buy";
executor.marketClose(side, posSide, posSz);
log.info("[OKX] 平已有仓位, posSide:{}, sz:{}, side:{}", posSide, posSz, side);
}
} catch (Exception e) {
log.warn("[OKX] 平仓位异常", e);
}
}
// ---- 启动/停止 ----
public void startGrid() {
if (state != StrategyState.WAITING_KLINE && state != StrategyState.STOPPED) {
log.warn("[OKX] 策略已在运行中, state:{}", state);
return;
}
state = StrategyState.WAITING_KLINE;
cumulativePnl = BigDecimal.ZERO;
unrealizedPnl = BigDecimal.ZERO;
longEntryPrice = BigDecimal.ZERO;
shortEntryPrice = BigDecimal.ZERO;
longPositionSize = BigDecimal.ZERO;
shortPositionSize = BigDecimal.ZERO;
baseLongOpened = false;
baseShortOpened = false;
longActive = false;
shortActive = false;
candle1mSubscribed = false;
positionsSubscribed = false;
ordersSubscribed = false;
pendingKlinePrice = null;
shortPriceQueue.clear();
longPriceQueue.clear();
currentLongOrderIds.clear();
currentShortOrderIds.clear();
stopLossManager.resetAllEntryQuantities();
// 每次重启重新获取当前本金,确保盈亏对比基准正确
refreshInitialPrincipal();
log.info("[OKX] 网格策略已启动, 当前本金: {} USDT", initialPrincipal);
}
/**
* 重新获取当前账户权益作为初始本金。
*/
private void refreshInitialPrincipal() {
try {
String balanceResp = executor.getBalance();
if (balanceResp != null) {
JSONObject json = JSON.parseObject(balanceResp);
if ("0".equals(json.getString("code"))) {
JSONArray data = json.getJSONArray("data");
if (data != null && !data.isEmpty()) {
JSONObject accountData = data.getJSONObject(0);
JSONArray details = accountData.getJSONArray("details");
if (details != null) {
for (int i = 0; i < details.size(); i++) {
JSONObject detail = details.getJSONObject(i);
if ("USDT".equals(detail.getString("ccy"))) {
String eq = detail.getString("eq");
if (eq != null) {
this.initialPrincipal = new BigDecimal(eq);
}
break;
}
}
}
}
}
}
} catch (Exception e) {
log.warn("[OKX] 获取初始化本金失败,使用旧值: {}", initialPrincipal);
}
}
public void stopGrid() {
state = StrategyState.STOPPED;
executor.cancelAllAlgoOrders();
executor.shutdown();
log.info("[OKX] 策略已停止, 累计盈亏: {}", cumulativePnl);
}
// ---- K线回调 ----
public void onKline(BigDecimal closePrice) {
lastKlinePrice = closePrice;
updateUnrealizedPnl();
if (state == StrategyState.STOPPED) {
// stopGrid() 已做清理,仅打印日志不重复操作
BigDecimal totalPnl = cumulativePnl.add(unrealizedPnl);
log.info("[OKX] 已实现:{}, 未实现:{}, 合计:{}", cumulativePnl, unrealizedPnl, totalPnl);
return;
}
if (state == StrategyState.WAITING_KLINE) {
// 等待所有 WS 订阅就绪后再开仓
if (!allSubscriptionsReady()) {
pendingKlinePrice = closePrice;
log.info("[OKX] 等待所有 WS 订阅就绪(candle1m={}, positions={}, orders={}), 当前价: {}",
candle1mSubscribed, positionsSubscribed, ordersSubscribed, closePrice);
return;
}
state = StrategyState.OPENING;
log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
openBasePositions();
return;
}
if (state != StrategyState.ACTIVE) {
return;
}
checkProfitAndReset();
}
/**
* WS 订阅成功确认回调,由各 {@link OkxGridChannelHandler#onSubscribed()} 触发。
* 当所有订阅就绪且已有缓存的 K 线价格时,自动触发开仓。
*/
public void onSubscriptionConfirmed(String channel) {
if ("candle1m".equals(channel)) candle1mSubscribed = true;
else if ("positions".equals(channel)) positionsSubscribed = true;
else if ("orders".equals(channel)) ordersSubscribed = true;
log.info("[OKX] 订阅就绪: {}, 全部就绪: {}", channel, allSubscriptionsReady());
// 所有订阅就绪 + 有缓存 K 线价格 + 仍处于等待状态 → 触发开仓
if (allSubscriptionsReady() && pendingKlinePrice != null && state == StrategyState.WAITING_KLINE) {
BigDecimal price = pendingKlinePrice;
pendingKlinePrice = null;
log.info("[OKX] 所有 WS 订阅就绪,触发开仓, 价格: {}", price);
state = StrategyState.OPENING;
log.info("[OKX] 首根K线到达,所有订阅就绪,开基底仓位 多空各{}张...", config.getBaseQuantity());
openBasePositions();
}
}
/** @return true 表示 candle1m + positions + orders 三个频道均已订阅成功 */
private boolean allSubscriptionsReady() {
return candle1mSubscribed && positionsSubscribed && ordersSubscribed;
}
/** 市价双开基底仓位(多 + 空),对齐 Gate 版本逻辑 */
private void openBasePositions() {
executor.openLong(config.getBaseQuantity(), (orderId) -> {
OkxTraderParam baseLongTp = OkxTraderParam.builder().entryOrderId(orderId).build();
config.setBaseLongTraderParam(baseLongTp);
tryGenerateQueues();
}, null);
executor.openShort(config.getBaseQuantity(), (orderId) -> {
OkxTraderParam baseShortTp = OkxTraderParam.builder().entryOrderId(orderId).build();
config.setBaseShortTraderParam(baseShortTp);
tryGenerateQueues();
}, null);
}
// ---- 仓位推送回调 ----
public void onPositionUpdate(String instId, String posSide, BigDecimal posSize, BigDecimal avgPx) {
if (state == StrategyState.STOPPED || state == StrategyState.WAITING_KLINE) {
return;
}
boolean hasPosition = posSize.compareTo(BigDecimal.ZERO) > 0;
if ("long".equals(posSide)) {
if (hasPosition) {
longActive = true;
longEntryPrice = avgPx;
if (!baseLongOpened) {
longPositionSize = posSize;
longBaseEntryPrice = avgPx;
baseLongOpened = true;
log.info("[OKX] 基底多成交价: {}", longBaseEntryPrice);
tryGenerateQueues();
} else {
longPositionSize = posSize;
tryGenerateQueues(); // 后续 WS 推送触发重试,兜底此前 NPE 失败的情况
}
} else {
if (longActive && state == StrategyState.ACTIVE) {
handlePositionZeroAndReset("多仓");
}
longActive = false;
longPositionSize = BigDecimal.ZERO;
}
} else if ("short".equals(posSide)) {
if (hasPosition) {
shortActive = true;
shortEntryPrice = avgPx;
if (!baseShortOpened) {
shortPositionSize = posSize;
shortBaseEntryPrice = avgPx;
baseShortOpened = true;
log.info("[OKX] 基底空成交价: {}", shortBaseEntryPrice);
tryGenerateQueues();
} else {
shortPositionSize = posSize;
tryGenerateQueues(); // 后续 WS 推送触发重试,兜底此前 NPE 失败的情况
}
} else {
if (shortActive && state == StrategyState.ACTIVE) {
handlePositionZeroAndReset("空仓");
}
shortActive = false;
shortPositionSize = BigDecimal.ZERO;
}
}
}
// ---- 平仓推送回调 ----
public void onPositionClose(String side, BigDecimal pnl) {
if (state == StrategyState.STOPPED) {
return;
}
cumulativePnl = cumulativePnl.add(pnl);
updateUnrealizedPnl();
BigDecimal totalPnl = cumulativePnl.add(unrealizedPnl);
log.info("[OKX] 已实现:{}, 未实现:{}, 合计:{}", cumulativePnl, unrealizedPnl, totalPnl);
if (totalPnl.compareTo(config.getMaxLoss().negate()) <= 0) {
String logMsg = StrUtil.format("[OKX] 已达亏损风险值(合计{}), 已实现:{}, 未实现:{}",
totalPnl, cumulativePnl, unrealizedPnl);
log.info(logMsg);
DingTalkUtils.getDefault().sendActionCard("风险提醒", logMsg, config.getApiKey(), "");
}
}
// ---- 订单/条件单推送回调 ----
public void onOrderUpdate(String algoId, String state, String ordType) {
if (!"filled".equals(state) && !"canceled".equals(state)) {
return;
}
// 匹配止损单 → 委托 StopLossManager
OkxGridElement byLongStopLoss = OkxGridElement.findByLongStopLossOrderId(algoId);
if (byLongStopLoss != null) {
stopLossManager.handleLongStopLossTriggered(byLongStopLoss, longEntryPrice);
return;
}
OkxGridElement byShortStopLoss = OkxGridElement.findByShortStopLossOrderId(algoId);
if (byShortStopLoss != null) {
stopLossManager.handleShortStopLossTriggered(byShortStopLoss, shortEntryPrice);
return;
}
// 匹配挂单 —— 条件单成交后:清空挂单状态 + 追挂止损
OkxGridElement shortGridElement = OkxGridElement.findByShortOrderId(algoId);
if (shortGridElement != null && shortGridElement.isHasShortOrder()) {
int filledQty = Integer.parseInt(shortGridElement.getShortTraderParam().getQuantity());
stopLossManager.clearShortEntryState(shortGridElement);
stopLossManager.resetShortEntryQty();
stopLossManager.extendShortStopLoss(filledQty);
log.info("[OKX] 空单成交 gridId:{}, qty:{}, 追挂止损", shortGridElement.getId(), filledQty);
return;
}
OkxGridElement longGridElement = OkxGridElement.findByLongOrderId(algoId);
if (longGridElement != null && longGridElement.isHasLongOrder()) {
int filledQty = Integer.parseInt(longGridElement.getLongTraderParam().getQuantity());
stopLossManager.clearLongEntryState(longGridElement);
stopLossManager.resetLongEntryQty();
stopLossManager.extendLongStopLoss(filledQty);
log.info("[OKX] 多单成交 gridId:{}, qty:{}, 追挂止损", longGridElement.getId(), filledQty);
return;
}
}
// ---- 网格队列处理 ----
private void tryGenerateQueues() {
// 防止重复执行:一旦已进入 ACTIVE 状态不再重复初始化
if (state == StrategyState.ACTIVE) {
return;
}
if (baseLongOpened && baseShortOpened) {
// 防御异步竞态:openLong/openShort 的回调可能还未设置 trader param
OkxTraderParam baseLongTp = config.getBaseLongTraderParam();
OkxTraderParam baseShortTp = config.getBaseShortTraderParam();
if (baseLongTp == null || baseShortTp == null) {
log.warn("[OKX] tryGenerateQueues 等待异步回调: longTp={}, shortTp={}",
baseLongTp != null, baseShortTp != null);
return;
}
// 委托 GridQueueBuilder 构建价格队列 + GridElements
List tmpShort = GridQueueBuilder.buildShortQueue(config, shortBaseEntryPrice);
List tmpLong = GridQueueBuilder.buildLongQueue(config, shortBaseEntryPrice);
synchronized (shortPriceQueue) {
shortPriceQueue.clear();
shortPriceQueue.addAll(tmpShort);
}
synchronized (longPriceQueue) {
longPriceQueue.clear();
longPriceQueue.addAll(tmpLong);
}
GridQueueBuilder.buildGridElements(config, shortPriceQueue, longPriceQueue, shortBaseEntryPrice);
// 标记基座挂单
OkxGridElement baseGridElement = OkxGridElement.findById(0);
baseGridElement.setLongOrderId(baseLongTp.getEntryOrderId());
baseGridElement.setHasLongOrder(true);
baseGridElement.setShortOrderId(baseShortTp.getEntryOrderId());
baseGridElement.setHasShortOrder(true);
// 委托 StopLossManager 挂止损单
stopLossManager.setupBaseStopLosses();
state = StrategyState.ACTIVE;
}
}
// ---- 盈亏计算 ----
private void updateUnrealizedPnl() {
if (lastKlinePrice == null || lastKlinePrice.compareTo(BigDecimal.ZERO) == 0) return;
BigDecimal multiplier = config.getCtVal();
BigDecimal longPnl = BigDecimal.ZERO;
BigDecimal shortPnl = BigDecimal.ZERO;
if (longPositionSize.compareTo(BigDecimal.ZERO) > 0 && longEntryPrice.compareTo(BigDecimal.ZERO) > 0) {
longPnl = longPositionSize.multiply(multiplier).multiply(lastKlinePrice.subtract(longEntryPrice));
}
if (shortPositionSize.compareTo(BigDecimal.ZERO) > 0 && shortEntryPrice.compareTo(BigDecimal.ZERO) > 0) {
shortPnl = shortPositionSize.multiply(multiplier).multiply(shortEntryPrice.subtract(lastKlinePrice));
}
unrealizedPnl = longPnl.add(shortPnl);
log.info("[OKX] 未实现盈亏: {}", unrealizedPnl);
}
private boolean isMarginSafe() {
try {
String balanceResp = executor.getBalance();
if (balanceResp == null) return true;
JSONObject json = JSON.parseObject(balanceResp);
if (!"0".equals(json.getString("code"))) return true;
JSONArray data = json.getJSONArray("data");
if (data == null || data.isEmpty()) return true;
JSONObject detail = data.getJSONObject(0);
String imr = detail.getString("imr");
if (imr == null) return true;
BigDecimal margin = new BigDecimal(imr);
if (initialPrincipal.compareTo(BigDecimal.ZERO) == 0) return true;
BigDecimal ratio = margin.divide(initialPrincipal, 4, RoundingMode.HALF_UP);
log.debug("[OKX] 保证金比例: {}/{}={}", margin, initialPrincipal, ratio);
return ratio.compareTo(config.getMarginRatioLimit()) < 0;
} catch (Exception e) {
log.warn("[OKX] 查保证金失败,默认放行", e);
return true;
}
}
private void checkProfitAndReset() {
try {
String balanceResp = executor.getBalance();
if (balanceResp == null) return;
JSONObject json = JSON.parseObject(balanceResp);
if (!"0".equals(json.getString("code"))) return;
JSONArray data = json.getJSONArray("data");
if (data == null || data.isEmpty()) return;
JSONObject detail = data.getJSONObject(0);
String upl = detail.getString("upl");
String availEq = detail.getString("availEq");
if (upl == null || availEq == null) return;
BigDecimal unrealisedPnl = new BigDecimal(upl);
BigDecimal available = new BigDecimal(availEq);
BigDecimal totalEquity = unrealisedPnl.add(available);
BigDecimal target = initialPrincipal.add(config.getExpectedProfit());
log.info("[OKX] 盈亏检查 upl:{}, availEq:{}, 合计:{}, 目标:{}", unrealisedPnl, available, totalEquity, target);
if (totalEquity.compareTo(target) > 0) {
log.info("[OKX] 盈亏达标({}>{}),重置策略", totalEquity, target);
state = StrategyState.STOPPED;
closeExistingPositions();
executor.cancelAllAlgoOrders();
// 提交到 executor 末尾:单线程FIFO保证前面所有平仓/取消任务完成后才重置
executor.submitTask(() -> {
try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
startGrid();
});
}
} catch (Exception e) {
log.warn("[OKX] 盈亏检查失败", e);
}
}
private void handlePositionZeroAndReset(String direction) {
log.info("[OKX] {}持仓归零,重置策略", direction);
state = StrategyState.STOPPED;
executor.cancelAllAlgoOrders();
closeExistingPositions();
// 提交到 executor 末尾:FIFO保证平仓完成后再重置
executor.submitTask(() -> {
try { Thread.sleep(3000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
startGrid();
});
}
// ---- getters ----
public BigDecimal getLastKlinePrice() { return lastKlinePrice; }
public boolean isStrategyActive() { return state != StrategyState.STOPPED && state != StrategyState.WAITING_KLINE; }
public BigDecimal getCumulativePnl() { return cumulativePnl; }
public BigDecimal getUnrealizedPnl() { return unrealizedPnl; }
public StrategyState getState() { return state; }
}