package com.xcong.excoin.modules.okxNewPrice;
import com.alibaba.fastjson.JSON;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.OKXAccount;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.enums.HttpMethod;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.OKXContants;
import lombok.extern.slf4j.Slf4j;
import java.util.LinkedHashMap;
import java.util.concurrent.*;
import java.util.function.Consumer;
/**
* OKX REST API 异步执行器,所有下单/撤单操作经此类提交。
*
*
设计目的
* REST API 调用可能耗时数百毫秒,若在 WebSocket 回调线程中同步执行会阻塞消息处理。
* 本类将所有网络 I/O 提交到独立单线程池异步执行。
*
* 线程模型
*
* - 单线程 + 有界队列(64) — 保证下单顺序,避免并发竞争
* - CallerRunsPolicy — 队列满时由提交线程直接执行,形成自然背压
* - Daemon 线程 — 60s 空闲自动回收
*
*
* OKX API 适配说明
* OKX 使用 algo order(条件单)代替 Gate 的 FuturesPriceTriggeredOrder:
*
* - 开仓条件单:ordType=conditional, side=buy/sell, posSide=long/short
* - 止盈止损单:可通过订单附带的 tpTriggerPx/slTriggerPx 或单独的 algo order 实现
* - 市价单:ordType=market, price不传
*
*
* @author Administrator
*/
@Slf4j
public class OkxTradeExecutor {
/** OKX 账户配置(用于 REST API 调用) */
private final OKXAccount okxAccount;
/** 合约名称(如 ETH-USDT-SWAP) */
private final String instId;
/** 保证金模式 */
private final String tdMode;
/** 交易线程池:单线程 + 有界队列 + 背压策略 */
private final ExecutorService executor;
public OkxTradeExecutor(OKXAccount okxAccount, String instId, String tdMode) {
this.okxAccount = okxAccount;
this.instId = instId;
this.tdMode = tdMode;
this.executor = new ThreadPoolExecutor(
1, 1,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(64),
r -> {
Thread t = new Thread(r, "okx-trade-worker");
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
}
/**
* 优雅关闭:等待 10 秒让队列中的任务执行完毕,超时则强制中断。
*/
public void shutdown() {
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
/**
* 异步 IOC 市价开多。
*
* @param quantity 开仓张数(正数)
* @param onSuccess 成功回调,接收 orderId
* @param onFailure 失败回调
*/
public void openLong(String quantity, Consumer onSuccess, Runnable onFailure) {
submitOrder("buy", "long", quantity, "market", null, false, null, onSuccess, onFailure);
}
/**
* 异步 IOC 市价开空。
*
* @param quantity 开仓张数(正数)
* @param onSuccess 成功回调,接收 orderId
* @param onFailure 失败回调
*/
public void openShort(String quantity, Consumer onSuccess, Runnable onFailure) {
submitOrder("sell", "short", quantity, "market", null, false, null, onSuccess, onFailure);
}
/**
* 异步市价平仓(reduceOnly=true)。
*
* @param side "buy" 平空 / "sell" 平多
* @param posSide "long" / "short"
* @param quantity 平仓张数(正数)
*/
public void marketClose(String side, String posSide, String quantity) {
executor.execute(() -> {
try {
LinkedHashMap params = buildBaseParams(side, posSide, quantity, "market");
params.put("reduceOnly", true);
String resp = okxAccount.requestHandler.sendSignedRequest(
okxAccount.baseUrl, OKXContants.ORDER, params, HttpMethod.POST, okxAccount.isSimluate());
log.info("[OkxExec] 市价平仓成功, side:{}, posSide:{}, sz:{}, resp:{}", side, posSide, quantity, resp);
} catch (Exception e) {
log.error("[OkxExec] 市价平仓失败, side:{}, posSide:{}, sz:{}", side, posSide, quantity, e);
}
});
}
/**
* 异步创建条件开仓单(价格触发后市价开仓)。
* OKX 使用 algo order: /api/v5/trade/order-algo, ordType=conditional
*
* @param triggerPrice 触发价格
* @param side "buy" 开多 / "sell" 开空
* @param posSide "long" / "short"
* @param size 开仓张数
* @param onSuccess 成功回调,接收 algoId
* @param onFailure 失败回调
*/
public void placeConditionalEntryOrder(String triggerPrice, String side, String posSide,
String size, Consumer onSuccess, Runnable onFailure) {
executor.execute(() -> {
try {
LinkedHashMap params = new LinkedHashMap<>();
params.put("instId", instId);
params.put("tdMode", tdMode);
params.put("side", side);
params.put("posSide", posSide);
params.put("ordType", "conditional");
params.put("sz", size);
params.put("slTriggerPx", triggerPrice);
params.put("slTriggerPxType", "last");
params.put("slOrdPx", "-1");
String resp = okxAccount.requestHandler.sendSignedRequest(
okxAccount.baseUrl, "/api/v5/trade/order-algo", params, HttpMethod.POST, okxAccount.isSimluate());
log.info("[OkxExec] 条件开仓单已创建, trigger:{}, side:{}, posSide:{}, sz:{}, resp:{}",
triggerPrice, side, posSide, size, resp);
String algoId = parseAlgoId(resp);
if (algoId != null && onSuccess != null) {
onSuccess.accept(algoId);
}
} catch (Exception e) {
log.error("[OkxExec] 条件开仓单创建失败, trigger:{}, side:{}, sz:{}", triggerPrice, side, size, e);
if (onFailure != null) {
onFailure.run();
}
}
});
}
/**
* 异步创建止盈条件单。
* 使用 algo order: ordType=conditional, 触发后市价平仓 (reduceOnly=true)
*
* @param triggerPrice 止盈触发价
* @param side "sell" 平多 / "buy" 平空
* @param posSide "long" / "short"
* @param size 平仓张数
* @param onSuccess 成功回调,接收 algoId
*/
public void placeTakeProfit(String triggerPrice, String side, String posSide,
String size, Consumer onSuccess) {
executor.execute(() -> {
try {
LinkedHashMap params = new LinkedHashMap<>();
params.put("instId", instId);
params.put("tdMode", tdMode);
params.put("side", side);
params.put("posSide", posSide);
params.put("ordType", "conditional");
params.put("sz", size);
params.put("slTriggerPx", triggerPrice);
params.put("slTriggerPxType", "last");
params.put("slOrdPx", "-1");
String resp = okxAccount.requestHandler.sendSignedRequest(
okxAccount.baseUrl, "/api/v5/trade/order-algo", params, HttpMethod.POST, okxAccount.isSimluate());
log.info("[OkxExec] 止盈单已创建, trigger:{}, side:{}, posSide:{}, sz:{}, resp:{}",
triggerPrice, side, posSide, size, resp);
String algoId = parseAlgoId(resp);
if (algoId != null && onSuccess != null) {
onSuccess.accept(algoId);
}
} catch (Exception e) {
log.error("[OkxExec] 止盈单创建失败, trigger:{}, side:{}, sz:{}, 立即市价止盈",
triggerPrice, side, size, e);
// 止盈单创建失败 → 立即市价平仓
marketClose(side, posSide, size);
}
});
}
/**
* 异步取消单个条件单(algo order)。
*
* @param algoId 条件单 ID,为 null 时跳过
*/
public void cancelAlgoOrder(String algoId, Consumer onSuccess) {
if (algoId == null) {
return;
}
executor.execute(() -> {
try {
String body = "[{\"instId\":\"" + instId + "\",\"algoId\":\"" + algoId + "\"}]";
String resp = okxAccount.requestHandler.sendSignedRequestRaw(
okxAccount.baseUrl, "/api/v5/trade/cancel-algos", body, HttpMethod.POST, okxAccount.isSimluate());
log.info("[OkxExec] 条件单已取消, algoId:{}", algoId);
if (onSuccess != null) {
onSuccess.accept(algoId);
}
} catch (Exception e) {
log.warn("[OkxExec] 取消条件单失败(可能已触发), algoId:{}", algoId);
}
});
}
/**
* 异步取消所有未完成的 algo 订单(best-effort,失败仅警告)。
*/
public void cancelAllAlgoOrders() {
executor.execute(() -> {
try {
String body = "[{\"instId\":\"" + instId + "\",\"instType\":\"SWAP\"}]";
String resp = okxAccount.requestHandler.sendSignedRequestRaw(
okxAccount.baseUrl, "/api/v5/trade/cancel-algos", body, HttpMethod.POST, okxAccount.isSimluate());
log.info("[OkxExec] 已尝试清除条件单, resp:{}", resp);
} catch (Exception e) {
log.warn("[OkxExec] 清除条件单失败(若无挂单可忽略), msg:{}", e.getMessage());
}
});
}
/**
* 查询账户余额(同步方法,在策略线程中调用)。
*/
public String getBalance() {
try {
LinkedHashMap params = new LinkedHashMap<>();
params.put("ccy", "USDT");
return okxAccount.requestHandler.sendSignedRequest(
okxAccount.baseUrl, OKXContants.BALANCE, params, HttpMethod.GET, okxAccount.isSimluate());
} catch (Exception e) {
log.error("[OkxExec] 查询余额失败", e);
return null;
}
}
/**
* 查询持仓信息(同步方法)。
*/
public String getPositions() {
try {
LinkedHashMap params = new LinkedHashMap<>();
params.put("instId", instId);
return okxAccount.requestHandler.sendSignedRequest(
okxAccount.baseUrl, OKXContants.POSITIONS, params, HttpMethod.GET, okxAccount.isSimluate());
} catch (Exception e) {
log.error("[OkxExec] 查询持仓失败", e);
return null;
}
}
/**
* 设置杠杆倍数(同步方法)。
*/
public void setLeverage(String leverage) {
try {
LinkedHashMap params = new LinkedHashMap<>();
params.put("instId", instId);
params.put("lever", leverage);
params.put("mgnMode", tdMode);
String resp = okxAccount.requestHandler.sendSignedRequest(
okxAccount.baseUrl, OKXContants.SETLEVERAGE, params, HttpMethod.POST, okxAccount.isSimluate());
log.info("[OkxExec] 设置杠杆成功, lever:{}, resp:{}", leverage, resp);
} catch (Exception e) {
log.error("[OkxExec] 设置杠杆失败, lever:{}", leverage, e);
}
}
// ==================== 私有方法 ====================
private LinkedHashMap buildBaseParams(String side, String posSide, String sz, String ordType) {
LinkedHashMap params = new LinkedHashMap<>();
params.put("instId", instId);
params.put("tdMode", tdMode);
params.put("side", side);
params.put("posSide", posSide);
params.put("ordType", ordType);
params.put("sz", sz);
return params;
}
private void submitOrder(String side, String posSide, String quantity, String ordType,
String price, boolean reduceOnly, String tag,
Consumer onSuccess, Runnable onFailure) {
executor.execute(() -> {
try {
LinkedHashMap params = buildBaseParams(side, posSide, quantity, ordType);
if (price != null) {
params.put("px", price);
}
if (reduceOnly) {
params.put("reduceOnly", true);
}
if (tag != null) {
params.put("tag", tag);
}
String resp = okxAccount.requestHandler.sendSignedRequest(
okxAccount.baseUrl, OKXContants.ORDER, params, HttpMethod.POST, okxAccount.isSimluate());
log.info("[OkxExec] 下单成功, side:{}, posSide:{}, sz:{}, ordType:{}, resp:{}",
side, posSide, quantity, ordType, resp);
String orderId = parseOrderId(resp);
if (orderId != null && onSuccess != null) {
onSuccess.accept(orderId);
}
} catch (Exception e) {
log.error("[OkxExec] 下单失败, side:{}, posSide:{}, sz:{}", side, posSide, quantity, e);
if (onFailure != null) {
onFailure.run();
}
}
});
}
/**
* 从 OKX 响应中解析订单 ID。
* 响应格式: {"code":"0","data":[{"ordId":"xxx",...}]}
*/
private String parseOrderId(String resp) {
try {
com.alibaba.fastjson.JSONObject json = JSON.parseObject(resp);
if ("0".equals(json.getString("code"))) {
com.alibaba.fastjson.JSONArray data = json.getJSONArray("data");
if (data != null && !data.isEmpty()) {
return data.getJSONObject(0).getString("ordId");
}
}
} catch (Exception e) {
log.warn("[OkxExec] 解析订单ID失败, resp:{}", resp);
}
return null;
}
/**
* 从 OKX 响应中解析 algo 订单 ID。
*/
private String parseAlgoId(String resp) {
try {
com.alibaba.fastjson.JSONObject json = JSON.parseObject(resp);
if ("0".equals(json.getString("code"))) {
com.alibaba.fastjson.JSONArray data = json.getJSONArray("data");
if (data != null && !data.isEmpty()) {
return data.getJSONObject(0).getString("algoId");
}
}
} catch (Exception e) {
log.warn("[OkxExec] 解析algoId失败, resp:{}", resp);
}
return null;
}
}