package com.xcong.excoin.modules.okxNewPrice; import com.alibaba.fastjson.JSON; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.OKXAccount; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.enums.HttpMethod; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.OKXContants; import lombok.extern.slf4j.Slf4j; import java.util.LinkedHashMap; import java.util.concurrent.*; import java.util.function.Consumer; /** * OKX REST API 异步执行器,所有下单/撤单操作经此类提交。 * *

设计目的

* REST API 调用可能耗时数百毫秒,若在 WebSocket 回调线程中同步执行会阻塞消息处理。 * 本类将所有网络 I/O 提交到独立单线程池异步执行。 * *

线程模型

* * *

OKX API 适配说明

* OKX 使用 algo order(条件单)代替 Gate 的 FuturesPriceTriggeredOrder: * * * @author Administrator */ @Slf4j public class OkxTradeExecutor { /** OKX 账户配置(用于 REST API 调用) */ private final OKXAccount okxAccount; /** 合约名称(如 ETH-USDT-SWAP) */ private final String instId; /** 保证金模式 */ private final String tdMode; /** 交易线程池:单线程 + 有界队列 + 背压策略 */ private final ExecutorService executor; public OkxTradeExecutor(OKXAccount okxAccount, String instId, String tdMode) { this.okxAccount = okxAccount; this.instId = instId; this.tdMode = tdMode; this.executor = new ThreadPoolExecutor( 1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(64), r -> { Thread t = new Thread(r, "okx-trade-worker"); t.setDaemon(true); return t; }, new ThreadPoolExecutor.CallerRunsPolicy() ); ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); } /** * 优雅关闭:等待 10 秒让队列中的任务执行完毕,超时则强制中断。 */ public void shutdown() { executor.shutdown(); try { executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } /** * 异步 IOC 市价开多。 * * @param quantity 开仓张数(正数) * @param onSuccess 成功回调,接收 orderId * @param onFailure 失败回调 */ public void openLong(String quantity, Consumer onSuccess, Runnable onFailure) { submitOrder("buy", "long", quantity, "market", null, false, null, onSuccess, onFailure); } /** * 异步 IOC 市价开空。 * * @param quantity 开仓张数(正数) * @param onSuccess 成功回调,接收 orderId * @param onFailure 失败回调 */ public void openShort(String quantity, Consumer onSuccess, Runnable onFailure) { submitOrder("sell", "short", quantity, "market", null, false, null, onSuccess, onFailure); } /** * 异步市价平仓(reduceOnly=true)。 * * @param side "buy" 平空 / "sell" 平多 * @param posSide "long" / "short" * @param quantity 平仓张数(正数) */ public void marketClose(String side, String posSide, String quantity) { executor.execute(() -> { try { LinkedHashMap params = buildBaseParams(side, posSide, quantity, "market"); params.put("reduceOnly", true); String resp = okxAccount.requestHandler.sendSignedRequest( okxAccount.baseUrl, OKXContants.ORDER, params, HttpMethod.POST, okxAccount.isSimluate()); log.info("[OkxExec] 市价平仓成功, side:{}, posSide:{}, sz:{}, resp:{}", side, posSide, quantity, resp); } catch (Exception e) { log.error("[OkxExec] 市价平仓失败, side:{}, posSide:{}, sz:{}", side, posSide, quantity, e); } }); } /** * 异步创建条件开仓单(价格触发后市价开仓)。 * OKX 使用 algo order: /api/v5/trade/order-algo, ordType=conditional * * @param triggerPrice 触发价格 * @param side "buy" 开多 / "sell" 开空 * @param posSide "long" / "short" * @param size 开仓张数 * @param onSuccess 成功回调,接收 algoId * @param onFailure 失败回调 */ public void placeConditionalEntryOrder(String triggerPrice, String side, String posSide, String size, Consumer onSuccess, Runnable onFailure) { executor.execute(() -> { try { LinkedHashMap params = new LinkedHashMap<>(); params.put("instId", instId); params.put("tdMode", tdMode); params.put("side", side); params.put("posSide", posSide); params.put("ordType", "conditional"); params.put("sz", size); params.put("triggerPx", triggerPrice); params.put("triggerPxType", "last"); params.put("orderPx", "-1"); // 市价成交 String resp = okxAccount.requestHandler.sendSignedRequest( okxAccount.baseUrl, "/api/v5/trade/order-algo", params, HttpMethod.POST, okxAccount.isSimluate()); log.info("[OkxExec] 条件开仓单已创建, trigger:{}, side:{}, posSide:{}, sz:{}, resp:{}", triggerPrice, side, posSide, size, resp); String algoId = parseAlgoId(resp); if (algoId != null && onSuccess != null) { onSuccess.accept(algoId); } } catch (Exception e) { log.error("[OkxExec] 条件开仓单创建失败, trigger:{}, side:{}, sz:{}", triggerPrice, side, size, e); if (onFailure != null) { onFailure.run(); } } }); } /** * 异步创建止盈条件单。 * 使用 algo order: ordType=conditional, 触发后市价平仓 (reduceOnly=true) * * @param triggerPrice 止盈触发价 * @param side "sell" 平多 / "buy" 平空 * @param posSide "long" / "short" * @param size 平仓张数 * @param onSuccess 成功回调,接收 algoId */ public void placeTakeProfit(String triggerPrice, String side, String posSide, String size, Consumer onSuccess) { executor.execute(() -> { try { LinkedHashMap params = new LinkedHashMap<>(); params.put("instId", instId); params.put("tdMode", tdMode); params.put("side", side); params.put("posSide", posSide); params.put("ordType", "conditional"); params.put("sz", size); params.put("triggerPx", triggerPrice); params.put("triggerPxType", "last"); params.put("orderPx", "-1"); // 市价成交 String resp = okxAccount.requestHandler.sendSignedRequest( okxAccount.baseUrl, "/api/v5/trade/order-algo", params, HttpMethod.POST, okxAccount.isSimluate()); log.info("[OkxExec] 止盈单已创建, trigger:{}, side:{}, posSide:{}, sz:{}, resp:{}", triggerPrice, side, posSide, size, resp); String algoId = parseAlgoId(resp); if (algoId != null && onSuccess != null) { onSuccess.accept(algoId); } } catch (Exception e) { log.error("[OkxExec] 止盈单创建失败, trigger:{}, side:{}, sz:{}, 立即市价止盈", triggerPrice, side, size, e); // 止盈单创建失败 → 立即市价平仓 marketClose(side, posSide, size); } }); } /** * 异步取消单个条件单(algo order)。 * * @param algoId 条件单 ID,为 null 时跳过 */ public void cancelAlgoOrder(String algoId, Consumer onSuccess) { if (algoId == null) { return; } executor.execute(() -> { try { String body = "[{\"instId\":\"" + instId + "\",\"algoId\":\"" + algoId + "\"}]"; String resp = okxAccount.requestHandler.sendSignedRequestRaw( okxAccount.baseUrl, "/api/v5/trade/cancel-algos", body, HttpMethod.POST, okxAccount.isSimluate()); log.info("[OkxExec] 条件单已取消, algoId:{}", algoId); if (onSuccess != null) { onSuccess.accept(algoId); } } catch (Exception e) { log.warn("[OkxExec] 取消条件单失败(可能已触发), algoId:{}", algoId); } }); } /** * 异步取消所有未完成的 algo 订单(best-effort,失败仅警告)。 */ public void cancelAllAlgoOrders() { executor.execute(() -> { try { String body = "[{\"instId\":\"" + instId + "\",\"instType\":\"SWAP\"}]"; String resp = okxAccount.requestHandler.sendSignedRequestRaw( okxAccount.baseUrl, "/api/v5/trade/cancel-algos", body, HttpMethod.POST, okxAccount.isSimluate()); log.info("[OkxExec] 已尝试清除条件单, resp:{}", resp); } catch (Exception e) { log.warn("[OkxExec] 清除条件单失败(若无挂单可忽略), msg:{}", e.getMessage()); } }); } /** * 查询账户余额(同步方法,在策略线程中调用)。 */ public String getBalance() { try { LinkedHashMap params = new LinkedHashMap<>(); params.put("ccy", "USDT"); return okxAccount.requestHandler.sendSignedRequest( okxAccount.baseUrl, OKXContants.BALANCE, params, HttpMethod.GET, okxAccount.isSimluate()); } catch (Exception e) { log.error("[OkxExec] 查询余额失败", e); return null; } } /** * 查询持仓信息(同步方法)。 */ public String getPositions() { try { LinkedHashMap params = new LinkedHashMap<>(); params.put("instId", instId); return okxAccount.requestHandler.sendSignedRequest( okxAccount.baseUrl, OKXContants.POSITIONS, params, HttpMethod.GET, okxAccount.isSimluate()); } catch (Exception e) { log.error("[OkxExec] 查询持仓失败", e); return null; } } /** * 设置杠杆倍数(同步方法)。 */ public void setLeverage(String leverage) { try { LinkedHashMap params = new LinkedHashMap<>(); params.put("instId", instId); params.put("lever", leverage); params.put("mgnMode", tdMode); String resp = okxAccount.requestHandler.sendSignedRequest( okxAccount.baseUrl, OKXContants.SETLEVERAGE, params, HttpMethod.POST, okxAccount.isSimluate()); log.info("[OkxExec] 设置杠杆成功, lever:{}, resp:{}", leverage, resp); } catch (Exception e) { log.error("[OkxExec] 设置杠杆失败, lever:{}", leverage, e); } } // ==================== 私有方法 ==================== private LinkedHashMap buildBaseParams(String side, String posSide, String sz, String ordType) { LinkedHashMap params = new LinkedHashMap<>(); params.put("instId", instId); params.put("tdMode", tdMode); params.put("side", side); params.put("posSide", posSide); params.put("ordType", ordType); params.put("sz", sz); return params; } private void submitOrder(String side, String posSide, String quantity, String ordType, String price, boolean reduceOnly, String tag, Consumer onSuccess, Runnable onFailure) { executor.execute(() -> { try { LinkedHashMap params = buildBaseParams(side, posSide, quantity, ordType); if (price != null) { params.put("px", price); } if (reduceOnly) { params.put("reduceOnly", true); } if (tag != null) { params.put("tag", tag); } String resp = okxAccount.requestHandler.sendSignedRequest( okxAccount.baseUrl, OKXContants.ORDER, params, HttpMethod.POST, okxAccount.isSimluate()); log.info("[OkxExec] 下单成功, side:{}, posSide:{}, sz:{}, ordType:{}, resp:{}", side, posSide, quantity, ordType, resp); String orderId = parseOrderId(resp); if (orderId != null && onSuccess != null) { onSuccess.accept(orderId); } } catch (Exception e) { log.error("[OkxExec] 下单失败, side:{}, posSide:{}, sz:{}", side, posSide, quantity, e); if (onFailure != null) { onFailure.run(); } } }); } /** * 从 OKX 响应中解析订单 ID。 * 响应格式: {"code":"0","data":[{"ordId":"xxx",...}]} */ private String parseOrderId(String resp) { try { com.alibaba.fastjson.JSONObject json = JSON.parseObject(resp); if ("0".equals(json.getString("code"))) { com.alibaba.fastjson.JSONArray data = json.getJSONArray("data"); if (data != null && !data.isEmpty()) { return data.getJSONObject(0).getString("ordId"); } } } catch (Exception e) { log.warn("[OkxExec] 解析订单ID失败, resp:{}", resp); } return null; } /** * 从 OKX 响应中解析 algo 订单 ID。 */ private String parseAlgoId(String resp) { try { com.alibaba.fastjson.JSONObject json = JSON.parseObject(resp); if ("0".equals(json.getString("code"))) { com.alibaba.fastjson.JSONArray data = json.getJSONArray("data"); if (data != null && !data.isEmpty()) { return data.getJSONObject(0).getString("algoId"); } } } catch (Exception e) { log.warn("[OkxExec] 解析algoId失败, resp:{}", resp); } return null; } }