package com.xcong.excoin.modules.okxNewPrice;
|
|
import com.alibaba.fastjson.JSON;
|
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.OKXAccount;
|
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.enums.HttpMethod;
|
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.OKXContants;
|
import lombok.extern.slf4j.Slf4j;
|
|
import java.util.LinkedHashMap;
|
import java.util.concurrent.*;
|
import java.util.function.Consumer;
|
|
/**
|
* OKX REST API 异步执行器,所有下单/撤单操作经此类提交。
|
*
|
* <h3>设计目的</h3>
|
* REST API 调用可能耗时数百毫秒,若在 WebSocket 回调线程中同步执行会阻塞消息处理。
|
* 本类将所有网络 I/O 提交到独立单线程池异步执行。
|
*
|
* <h3>线程模型</h3>
|
* <ul>
|
* <li><b>单线程 + 有界队列(64)</b> — 保证下单顺序,避免并发竞争</li>
|
* <li><b>CallerRunsPolicy</b> — 队列满时由提交线程直接执行,形成自然背压</li>
|
* <li><b>Daemon 线程</b> — 60s 空闲自动回收</li>
|
* </ul>
|
*
|
* <h3>OKX API 适配说明</h3>
|
* OKX 使用 algo order(条件单)代替 Gate 的 FuturesPriceTriggeredOrder:
|
* <ul>
|
* <li>开仓条件单:ordType=conditional, side=buy/sell, posSide=long/short</li>
|
* <li>止盈止损单:可通过订单附带的 tpTriggerPx/slTriggerPx 或单独的 algo order 实现</li>
|
* <li>市价单:ordType=market, price不传</li>
|
* </ul>
|
*
|
* @author Administrator
|
*/
|
@Slf4j
|
public class OkxTradeExecutor {
|
|
/** OKX 账户配置(用于 REST API 调用) */
|
private final OKXAccount okxAccount;
|
/** 合约名称(如 ETH-USDT-SWAP) */
|
private final String instId;
|
/** 保证金模式 */
|
private final String tdMode;
|
|
/** 交易线程池:单线程 + 有界队列 + 背压策略 */
|
private final ExecutorService executor;
|
|
public OkxTradeExecutor(OKXAccount okxAccount, String instId, String tdMode) {
|
this.okxAccount = okxAccount;
|
this.instId = instId;
|
this.tdMode = tdMode;
|
this.executor = new ThreadPoolExecutor(
|
1, 1,
|
60L, TimeUnit.SECONDS,
|
new LinkedBlockingQueue<>(64),
|
r -> {
|
Thread t = new Thread(r, "okx-trade-worker");
|
t.setDaemon(true);
|
return t;
|
},
|
new ThreadPoolExecutor.CallerRunsPolicy()
|
);
|
((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
|
}
|
|
/**
|
* 优雅关闭:等待 10 秒让队列中的任务执行完毕,超时则强制中断。
|
*/
|
public void shutdown() {
|
executor.shutdown();
|
try {
|
executor.awaitTermination(10, TimeUnit.SECONDS);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
executor.shutdownNow();
|
}
|
}
|
|
/**
|
* 异步 IOC 市价开多。
|
*
|
* @param quantity 开仓张数(正数)
|
* @param onSuccess 成功回调,接收 orderId
|
* @param onFailure 失败回调
|
*/
|
public void openLong(String quantity, Consumer<String> onSuccess, Runnable onFailure) {
|
submitOrder("buy", "long", quantity, "market", null, false, null, onSuccess, onFailure);
|
}
|
|
/**
|
* 异步 IOC 市价开空。
|
*
|
* @param quantity 开仓张数(正数)
|
* @param onSuccess 成功回调,接收 orderId
|
* @param onFailure 失败回调
|
*/
|
public void openShort(String quantity, Consumer<String> onSuccess, Runnable onFailure) {
|
submitOrder("sell", "short", quantity, "market", null, false, null, onSuccess, onFailure);
|
}
|
|
/**
|
* 异步市价平仓(reduceOnly=true)。
|
*
|
* @param side "buy" 平空 / "sell" 平多
|
* @param posSide "long" / "short"
|
* @param quantity 平仓张数(正数)
|
*/
|
public void marketClose(String side, String posSide, String quantity) {
|
executor.execute(() -> {
|
try {
|
LinkedHashMap<String, Object> params = buildBaseParams(side, posSide, quantity, "market");
|
params.put("reduceOnly", true);
|
String resp = okxAccount.requestHandler.sendSignedRequest(
|
okxAccount.baseUrl, OKXContants.ORDER, params, HttpMethod.POST, okxAccount.isSimluate());
|
log.info("[OkxExec] 市价平仓成功, side:{}, posSide:{}, sz:{}, resp:{}", side, posSide, quantity, resp);
|
} catch (Exception e) {
|
log.error("[OkxExec] 市价平仓失败, side:{}, posSide:{}, sz:{}", side, posSide, quantity, e);
|
}
|
});
|
}
|
|
/**
|
* 异步创建条件开仓单(价格触发后市价开仓)。
|
* OKX 使用 algo order: /api/v5/trade/order-algo, ordType=conditional
|
*
|
* @param triggerPrice 触发价格
|
* @param side "buy" 开多 / "sell" 开空
|
* @param posSide "long" / "short"
|
* @param size 开仓张数
|
* @param onSuccess 成功回调,接收 algoId
|
* @param onFailure 失败回调
|
*/
|
public void placeConditionalEntryOrder(String triggerPrice, String side, String posSide,
|
String size, Consumer<String> onSuccess, Runnable onFailure) {
|
executor.execute(() -> {
|
try {
|
LinkedHashMap<String, Object> params = new LinkedHashMap<>();
|
params.put("instId", instId);
|
params.put("tdMode", tdMode);
|
params.put("side", side);
|
params.put("posSide", posSide);
|
params.put("ordType", "trigger");
|
params.put("sz", size);
|
params.put("triggerPx", triggerPrice);
|
params.put("triggerPxType", "last");
|
params.put("orderPx", "-1");
|
|
String resp = okxAccount.requestHandler.sendSignedRequest(
|
okxAccount.baseUrl, "/api/v5/trade/order-algo", params, HttpMethod.POST, okxAccount.isSimluate());
|
log.info("[OkxExec] 条件开仓单已创建, trigger:{}, side:{}, posSide:{}, sz:{}, resp:{}",
|
triggerPrice, side, posSide, size, resp);
|
|
String algoId = parseAlgoId(resp);
|
if (algoId != null && onSuccess != null) {
|
onSuccess.accept(algoId);
|
}
|
} catch (Exception e) {
|
log.error("[OkxExec] 条件开仓单创建失败, trigger:{}, side:{}, sz:{}", triggerPrice, side, size, e);
|
if (onFailure != null) {
|
onFailure.run();
|
}
|
}
|
});
|
}
|
|
/**
|
* 异步创建止盈条件单。
|
* 使用 algo order: ordType=conditional, 触发后市价平仓 (reduceOnly=true)
|
*
|
* @param triggerPrice 止盈触发价
|
* @param side "sell" 平多 / "buy" 平空
|
* @param posSide "long" / "short"
|
* @param size 平仓张数
|
* @param onSuccess 成功回调,接收 algoId
|
*/
|
public void placeTakeProfit(String triggerPrice, String side, String posSide,
|
String size, Consumer<String> onSuccess) {
|
executor.execute(() -> {
|
try {
|
LinkedHashMap<String, Object> params = new LinkedHashMap<>();
|
params.put("instId", instId);
|
params.put("tdMode", tdMode);
|
params.put("side", side);
|
params.put("posSide", posSide);
|
params.put("ordType", "conditional");
|
params.put("sz", size);
|
params.put("slTriggerPx", triggerPrice);
|
params.put("slTriggerPxType", "last");
|
params.put("slOrdPx", "-1");
|
|
String resp = okxAccount.requestHandler.sendSignedRequest(
|
okxAccount.baseUrl, "/api/v5/trade/order-algo", params, HttpMethod.POST, okxAccount.isSimluate());
|
log.info("[OkxExec] 止盈单已创建, trigger:{}, side:{}, posSide:{}, sz:{}, resp:{}",
|
triggerPrice, side, posSide, size, resp);
|
|
String algoId = parseAlgoId(resp);
|
if (algoId != null && onSuccess != null) {
|
onSuccess.accept(algoId);
|
}
|
} catch (Exception e) {
|
log.error("[OkxExec] 止盈单创建失败, trigger:{}, side:{}, sz:{}, 立即市价止盈",
|
triggerPrice, side, size, e);
|
// 止盈单创建失败 → 立即市价平仓
|
marketClose(side, posSide, size);
|
}
|
});
|
}
|
|
/**
|
* 异步取消单个条件单(algo order)。
|
*
|
* @param algoId 条件单 ID,为 null 时跳过
|
*/
|
public void cancelAlgoOrder(String algoId, Consumer<String> onSuccess) {
|
if (algoId == null) {
|
return;
|
}
|
executor.execute(() -> {
|
try {
|
String body = "[{\"instId\":\"" + instId + "\",\"algoId\":\"" + algoId + "\"}]";
|
String resp = okxAccount.requestHandler.sendSignedRequestRaw(
|
okxAccount.baseUrl, "/api/v5/trade/cancel-algos", body, HttpMethod.POST, okxAccount.isSimluate());
|
log.info("[OkxExec] 条件单已取消, algoId:{}", algoId);
|
if (onSuccess != null) {
|
onSuccess.accept(algoId);
|
}
|
} catch (Exception e) {
|
log.warn("[OkxExec] 取消条件单失败(可能已触发), algoId:{}", algoId);
|
}
|
});
|
}
|
|
/**
|
* 异步取消所有未完成的 algo 订单(best-effort,失败仅警告)。
|
*/
|
public void cancelAllAlgoOrders() {
|
executor.execute(() -> {
|
try {
|
String body = "[{\"instId\":\"" + instId + "\",\"instType\":\"SWAP\"}]";
|
String resp = okxAccount.requestHandler.sendSignedRequestRaw(
|
okxAccount.baseUrl, "/api/v5/trade/cancel-algos", body, HttpMethod.POST, okxAccount.isSimluate());
|
log.info("[OkxExec] 已尝试清除条件单, resp:{}", resp);
|
} catch (Exception e) {
|
log.warn("[OkxExec] 清除条件单失败(若无挂单可忽略), msg:{}", e.getMessage());
|
}
|
});
|
}
|
|
/**
|
* 查询账户余额(同步方法,在策略线程中调用)。
|
*/
|
public String getBalance() {
|
try {
|
LinkedHashMap<String, Object> params = new LinkedHashMap<>();
|
params.put("ccy", "USDT");
|
return okxAccount.requestHandler.sendSignedRequest(
|
okxAccount.baseUrl, OKXContants.BALANCE, params, HttpMethod.GET, okxAccount.isSimluate());
|
} catch (Exception e) {
|
log.error("[OkxExec] 查询余额失败", e);
|
return null;
|
}
|
}
|
|
/**
|
* 查询持仓信息(同步方法)。
|
*/
|
public String getPositions() {
|
try {
|
LinkedHashMap<String, Object> params = new LinkedHashMap<>();
|
params.put("instId", instId);
|
return okxAccount.requestHandler.sendSignedRequest(
|
okxAccount.baseUrl, OKXContants.POSITIONS, params, HttpMethod.GET, okxAccount.isSimluate());
|
} catch (Exception e) {
|
log.error("[OkxExec] 查询持仓失败", e);
|
return null;
|
}
|
}
|
|
/**
|
* 设置杠杆倍数(同步方法)。
|
*/
|
public void setLeverage(String leverage) {
|
try {
|
LinkedHashMap<String, Object> params = new LinkedHashMap<>();
|
params.put("instId", instId);
|
params.put("lever", leverage);
|
params.put("mgnMode", tdMode);
|
String resp = okxAccount.requestHandler.sendSignedRequest(
|
okxAccount.baseUrl, OKXContants.SETLEVERAGE, params, HttpMethod.POST, okxAccount.isSimluate());
|
log.info("[OkxExec] 设置杠杆成功, lever:{}, resp:{}", leverage, resp);
|
} catch (Exception e) {
|
log.error("[OkxExec] 设置杠杆失败, lever:{}", leverage, e);
|
}
|
}
|
|
// ==================== 私有方法 ====================
|
|
private LinkedHashMap<String, Object> buildBaseParams(String side, String posSide, String sz, String ordType) {
|
LinkedHashMap<String, Object> params = new LinkedHashMap<>();
|
params.put("instId", instId);
|
params.put("tdMode", tdMode);
|
params.put("side", side);
|
params.put("posSide", posSide);
|
params.put("ordType", ordType);
|
params.put("sz", sz);
|
return params;
|
}
|
|
private void submitOrder(String side, String posSide, String quantity, String ordType,
|
String price, boolean reduceOnly, String tag,
|
Consumer<String> onSuccess, Runnable onFailure) {
|
executor.execute(() -> {
|
try {
|
LinkedHashMap<String, Object> params = buildBaseParams(side, posSide, quantity, ordType);
|
if (price != null) {
|
params.put("px", price);
|
}
|
if (reduceOnly) {
|
params.put("reduceOnly", true);
|
}
|
if (tag != null) {
|
params.put("tag", tag);
|
}
|
String resp = okxAccount.requestHandler.sendSignedRequest(
|
okxAccount.baseUrl, OKXContants.ORDER, params, HttpMethod.POST, okxAccount.isSimluate());
|
log.info("[OkxExec] 下单成功, side:{}, posSide:{}, sz:{}, ordType:{}, resp:{}",
|
side, posSide, quantity, ordType, resp);
|
|
String orderId = parseOrderId(resp);
|
if (orderId != null && onSuccess != null) {
|
onSuccess.accept(orderId);
|
}
|
} catch (Exception e) {
|
log.error("[OkxExec] 下单失败, side:{}, posSide:{}, sz:{}", side, posSide, quantity, e);
|
if (onFailure != null) {
|
onFailure.run();
|
}
|
}
|
});
|
}
|
|
/**
|
* 从 OKX 响应中解析订单 ID。
|
* 响应格式: {"code":"0","data":[{"ordId":"xxx",...}]}
|
*/
|
private String parseOrderId(String resp) {
|
try {
|
com.alibaba.fastjson.JSONObject json = JSON.parseObject(resp);
|
if ("0".equals(json.getString("code"))) {
|
com.alibaba.fastjson.JSONArray data = json.getJSONArray("data");
|
if (data != null && !data.isEmpty()) {
|
return data.getJSONObject(0).getString("ordId");
|
}
|
}
|
} catch (Exception e) {
|
log.warn("[OkxExec] 解析订单ID失败, resp:{}", resp);
|
}
|
return null;
|
}
|
|
/**
|
* 从 OKX 响应中解析 algo 订单 ID。
|
*/
|
private String parseAlgoId(String resp) {
|
try {
|
com.alibaba.fastjson.JSONObject json = JSON.parseObject(resp);
|
if ("0".equals(json.getString("code"))) {
|
com.alibaba.fastjson.JSONArray data = json.getJSONArray("data");
|
if (data != null && !data.isEmpty()) {
|
return data.getJSONObject(0).getString("algoId");
|
}
|
}
|
} catch (Exception e) {
|
log.warn("[OkxExec] 解析algoId失败, resp:{}", resp);
|
}
|
return null;
|
}
|
}
|