package com.xcong.excoin.modules.okxApi; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import okhttp3.MediaType; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.RequestBody; import okhttp3.Response; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.math.BigDecimal; import java.text.SimpleDateFormat; import java.util.Base64; import java.util.Date; import java.util.TimeZone; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** * OKX REST API 异步执行器,所有下单/撤单操作经此类提交。 * *

设计目的

* REST API 调用可能耗时数百毫秒,若在 WebSocket 回调线程中同步执行会阻塞消息处理, * 导致心跳超时误判。本类将所有网络 I/O 提交到独立单线程池异步执行。 * *

与 GateTradeExecutor 的主要差异

* * *

线程模型

* * *

对外接口

* * * * * * * *
方法用途
openLong / openShort市价基底开仓
placeConditionalEntryOrder挂条件开仓单(价格触发后市价开仓)
placeTakeProfit挂止盈条件单
cancelConditionalOrder取消单个条件单
cancelAllPriceTriggeredOrders取消所有条件单(策略停止时)
* *

容错

* * * @author Administrator */ @Slf4j public class OkxTradeExecutor { /** JSON content-type */ private static final MediaType JSON_MEDIA_TYPE = MediaType.parse("application/json; charset=utf-8"); /** OKX 配置 */ private final OkxConfig config; /** 合约名称(如 ETH-USDT-SWAP) */ private final String contract; /** OKHttp 客户端 */ private final OkHttpClient httpClient; /** 交易线程池:单线程 + 有界队列 + 背压策略 */ private final ExecutorService executor; /** * 构造 OKX 交易执行器。 * * @param config OKX 配置对象(包含 API 密钥、合约、URL 等信息) */ public OkxTradeExecutor(OkxConfig config) { this.config = config; this.contract = config.getContract(); this.httpClient = new OkHttpClient.Builder() .connectTimeout(10, TimeUnit.SECONDS) .readTimeout(10, TimeUnit.SECONDS) .writeTimeout(10, TimeUnit.SECONDS) .build(); this.executor = new ThreadPoolExecutor( 1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(64), r -> { Thread t = new Thread(r, "okx-trade-worker"); t.setDaemon(true); return t; }, new ThreadPoolExecutor.CallerRunsPolicy() ); ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); } // ==================== 生命周期 ==================== /** * 优雅关闭:等待 10 秒让队列中的任务执行完毕,超时则强制中断。 * 关闭后的 REST 调用将通过 CallerRunsPolicy 直接在提交线程执行。 */ public void shutdown() { executor.shutdown(); try { executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } /** * 提交一个通用任务到交易线程池末尾。 * 利用单线程池的 FIFO 特性确保任务按提交顺序执行。 * * @param task 待执行的任务 */ public void submitTask(Runnable task) { executor.execute(task); } // ==================== 市价开仓 ==================== /** * 异步市价开多。 * * @param quantity 开仓张数(正数,如 "15") * @param onSuccess 成交成功回调,接收 ordId(可为 null) * @param onFailure 成交失败回调(可为 null) */ public void openLong(String quantity, Consumer onSuccess, Runnable onFailure) { openPosition(quantity, "buy", "开多", onSuccess, onFailure); } /** * 异步市价开空。 * * @param quantity 开仓张数(正数,如 "15") * @param onSuccess 成交成功回调,接收 ordId(可为 null) * @param onFailure 成交失败回调(可为 null) */ public void openShort(String quantity, Consumer onSuccess, Runnable onFailure) { openPosition(quantity, "sell", "开空", onSuccess, onFailure); } /** * 通用异步市价下单。 * * @param sz 下单张数 * @param side 交易方向(buy=开多 / sell=开空) * @param label 日志标签 * @param onSuccess 成功回调,接收 ordId * @param onFailure 失败回调 */ private void openPosition(String sz, String side, String label, Consumer onSuccess, Runnable onFailure) { executor.execute(() -> { try { // long_short_mode 双向持仓下,开仓须指定 posSide String posSide = "buy".equals(side) ? "long" : "short"; JSONObject body = new JSONObject(); body.put("instId", contract); body.put("tdMode", "cross"); body.put("side", side); body.put("posSide", posSide); body.put("ordType", "market"); body.put("sz", sz); JSONObject resp = okPost("/api/v5/trade/order", body.toJSONString()); String code = resp.getString("code"); if (!"0".equals(code)) { log.error("[TradeExec-OKX] {}失败, code:{}, msg:{}", label, code, resp.getString("msg")); if (onFailure != null) { onFailure.run(); } return; } JSONArray data = resp.getJSONArray("data"); String ordId = (data != null && !data.isEmpty()) ? data.getJSONObject(0).getString("ordId") : null; log.info("[TradeExec-OKX] {}成功, sz:{}, ordId:{}", label, sz, ordId); if (onSuccess != null) { onSuccess.accept(ordId); } } catch (Exception e) { log.error("[TradeExec-OKX] {}失败", label, e); if (onFailure != null) { onFailure.run(); } } }); } // ==================== 止盈条件单 ==================== /** * 异步创建止盈条件单(OKX 算法订单 — conditional 类型)。 * *

服务器监控价格,达到触发价后自动以市价平仓。 * 使用 OKX 的 {@code order-algo} 接口,ordType=conditional。 * *

止盈失败兜底

* 止盈单创建失败时立即调用 {@link #marketClose(String, String)} 市价平仓, * 确保仓位不会因止损条件未挂上而无保护。 * * @param triggerPrice 触发价格 * @param orderType 平仓类型:"close_long" 平多 / "close_short" 平空 * @param size 平仓张数(正数,如 "15") * @param onSuccess 成功回调,接收 algoId(可为 null) */ public void placeTakeProfit(BigDecimal triggerPrice, String orderType, String size, Consumer onSuccess) { executor.execute(() -> { String posSide = null; try { String side; if ("close_long".equals(orderType)) { side = "sell"; posSide = "long"; } else if ("close_short".equals(orderType)) { side = "buy"; posSide = "short"; } else { log.error("[TradeExec-OKX] 未知止盈类型: {}", orderType); return; } // OKX conditional 止盈止损使用 tpTriggerPx/tpOrdPx 或 slTriggerPx/slOrdPx JSONObject body = new JSONObject(); body.put("instId", contract); body.put("tdMode", "cross"); body.put("side", side); body.put("posSide", posSide); body.put("ordType", "conditional"); body.put("sz", size); // "close_long"=平多(止盈), "close_short"=平空(止盈) body.put("tpTriggerPx", triggerPrice.stripTrailingZeros().toPlainString()); body.put("tpTriggerPxType", "last"); body.put("tpOrdPx", "-1"); JSONObject resp = okPost("/api/v5/trade/order-algo", body.toJSONString()); String code = resp.getString("code"); if (!"0".equals(code)) { log.error("[TradeExec-OKX] 止盈单创建失败, code:{}, msg:{}, 立即市价止盈", code, resp.getString("msg")); marketClose(size, posSide); return; } JSONArray data = resp.getJSONArray("data"); String algoId = (data != null && !data.isEmpty()) ? data.getJSONObject(0).getString("algoId") : null; log.info("[TradeExec-OKX] 止盈单已创建, triggerPx:{}, type:{}, sz:{}, algoId:{}", triggerPrice, orderType, size, algoId); if (onSuccess != null) { onSuccess.accept(algoId); } } catch (Exception e) { log.error("[TradeExec-OKX] 止盈单创建失败, triggerPx:{}, sz:{}, 立即市价止盈", triggerPrice, size, e); if (posSide != null) { marketClose(size, posSide); } } }); } /** * 市价止盈兜底:在止盈条件单创建失败时立即市价平仓。 * *

通过 posSide 指定平仓方向: *

    *
  • posSide=long:平多(side=sell)
  • *
  • posSide=short:平空(side=buy)
  • *
* * @param size 平仓张数(正数) * @param posSide 持仓方向(long / short) */ private void marketClose(String size, String posSide) { String side = "long".equals(posSide) ? "sell" : "buy"; marketClose(size, side, posSide); } /** * 指定方向的市价平仓。 * * @param sz 平仓张数 * @param side 交易方向(sell=平多 / buy=平空) * @param posSide 持仓方向(long / short) */ private void marketClose(String sz, String side, String posSide) { try { JSONObject body = new JSONObject(); body.put("instId", contract); body.put("tdMode", "cross"); body.put("side", side); body.put("posSide", posSide); body.put("ordType", "market"); body.put("sz", sz); JSONObject resp = okPost("/api/v5/trade/order", body.toJSONString()); String code = resp.getString("code"); if (!"0".equals(code)) { log.warn("[TradeExec-OKX] 市价止盈失败, side:{}, posSide:{}, sz:{}, code:{}, msg:{}", side, posSide, sz, code, resp.getString("msg")); return; } JSONArray data = resp.getJSONArray("data"); String ordId = (data != null && !data.isEmpty()) ? data.getJSONObject(0).getString("ordId") : null; log.info("[TradeExec-OKX] 市价止盈成功, side:{}, posSide:{}, sz:{}, ordId:{}", side, posSide, sz, ordId); } catch (Exception e) { log.error("[TradeExec-OKX] 市价止盈也失败, sz:{}", sz, e); } } // ==================== 条件开仓单 ==================== /** * 异步创建条件开仓单(价格触发后市价开仓)。 * *

使用 OKX 的 {@code order-algo} 接口,ordType=trigger(计划委托)。 * 服务器监控价格,达到触发价后以市价开仓。 * *

与止盈止损的区别

*
    *
  • 开仓 = ordType=trigger,字段 triggerPx + orderPx
  • *
  • 止盈 = ordType=conditional,字段 tpTriggerPx + tpOrdPx
  • *
  • 止损 = ordType=conditional,字段 slTriggerPx + slOrdPx
  • *
* * @param triggerPrice 触发价格 * @param isLong true=开多(side=buy)/ false=开空(side=sell) * @param size 开仓张数(正数,如 "1") * @param onSuccess 成功回调,接收 algoId(可为 null) * @param onFailure 失败回调(可为 null) */ public void placeConditionalEntryOrder(BigDecimal triggerPrice, boolean isLong, String size, Consumer onSuccess, Runnable onFailure) { executor.execute(() -> { try { String side = isLong ? "buy" : "sell"; JSONObject body = new JSONObject(); body.put("instId", contract); body.put("tdMode", "cross"); body.put("side", side); body.put("ordType", "trigger"); // 计划委托 = 触发后开仓 body.put("sz", size); body.put("triggerPx", triggerPrice.stripTrailingZeros().toPlainString()); body.put("triggerPxType", "last"); body.put("orderPx", "-1"); // OKX 使用 orderPx,非 ordPx JSONObject resp = okPost("/api/v5/trade/order-algo", body.toJSONString()); String code = resp.getString("code"); if (!"0".equals(code)) { log.error("[TradeExec-OKX] 条件开仓单创建失败, code:{}, msg:{}", code, resp.getString("msg")); if (onFailure != null) { onFailure.run(); } return; } JSONArray data = resp.getJSONArray("data"); String algoId = (data != null && !data.isEmpty()) ? data.getJSONObject(0).getString("algoId") : null; log.info("[TradeExec-OKX] 条件开仓单已创建, triggerPx:{}, isLong:{}, sz:{}, algoId:{}", triggerPrice, isLong, size, algoId); if (onSuccess != null) { onSuccess.accept(algoId); } } catch (Exception e) { log.error("[TradeExec-OKX] 条件开仓单创建失败, triggerPx:{}, sz:{}", triggerPrice, size, e); if (onFailure != null) { onFailure.run(); } } }); } // ==================== 取消订单 ==================== /** * 异步取消单个算法订单(条件单)。 * * @param algoId 算法订单 ID,为 null 时跳过 * @param onSuccess 成功回调,接收 algoId(可为 null) */ public void cancelConditionalOrder(String algoId, Consumer onSuccess) { if (algoId == null) { return; } executor.execute(() -> { try { JSONArray bodyArr = new JSONArray(); JSONObject item = new JSONObject(); item.put("algoId", algoId); item.put("instId", contract); bodyArr.add(item); JSONObject resp = okPost("/api/v5/trade/cancel-algos", bodyArr.toJSONString()); String code = resp.getString("code"); if (!"0".equals(code)) { log.warn("[TradeExec-OKX] 取消条件单失败(可能已触发), algoId:{}, code:{}, msg:{}", algoId, code, resp.getString("msg")); return; } log.info("[TradeExec-OKX] 条件单已取消, algoId:{}", algoId); if (onSuccess != null) { onSuccess.accept(algoId); } } catch (Exception e) { log.warn("[TradeExec-OKX] 取消条件单失败(可能已触发), algoId:{}", algoId, e); } }); } /** * 异步清除指定合约的所有算法订单(条件单)。 * 发送不含 algoId 的取消请求,OKX 会取消该合约下所有待触发算法单。 */ public void cancelAllPriceTriggeredOrders() { executor.execute(() -> { try { JSONArray bodyArr = new JSONArray(); JSONObject item = new JSONObject(); item.put("instId", contract); bodyArr.add(item); JSONObject resp = okPost("/api/v5/trade/cancel-algos", bodyArr.toJSONString()); String code = resp.getString("code"); if (!"0".equals(code)) { log.warn("[TradeExec-OKX] 清除所有条件单失败, code:{}, msg:{}", code, resp.getString("msg")); return; } log.info("[TradeExec-OKX] 已清除所有条件单"); } catch (Exception e) { log.error("[TradeExec-OKX] 清除条件单失败", e); } }); } // ==================== HTTP 请求帮助方法 ==================== /** * 发送 OKX 签名 POST 请求并返回解析后的 JSONObject。 * *

自动添加 OK-ACCESS-KEY、OK-ACCESS-SIGN、OK-ACCESS-TIMESTAMP、OK-ACCESS-PASSPHRASE * 四个认证头。签名算法:base64(HMAC-SHA256(timestamp + method + path + body))。 * * @param path API 路径(如 /api/v5/trade/order) * @param body 请求体 JSON 字符串 * @return 解析后的响应 JSONObject * @throws IOException 网络异常或业务错误 */ JSONObject okPost(String path, String body) throws IOException { String method = "POST"; String timestamp = getIsoTimestamp(); String sign = null; try { sign = sign(timestamp, method, path, body); } catch (Exception e) { e.printStackTrace(); } Request.Builder builder = new Request.Builder() .url(config.getRestBasePath() + path) .header("OK-ACCESS-KEY", config.getApiKey()) .header("OK-ACCESS-SIGN", sign) .header("OK-ACCESS-TIMESTAMP", timestamp) .header("OK-ACCESS-PASSPHRASE", config.getPassphrase()) .header("Content-Type", "application/json; charset=utf-8") .post(RequestBody.create(JSON_MEDIA_TYPE, body)); // 模拟盘需加 x-simulated-trading 头,与生产网共用同一 REST 地址 if (!config.isProduction()) { builder.header("x-simulated-trading", "1"); } Request request = builder.build(); try (Response response = httpClient.newCall(request).execute()) { String responseBody = response.body() != null ? response.body().string() : "{}"; if (!response.isSuccessful()) { log.error("[TradeExec-OKX] HTTP {} POST {}: {}", response.code(), path, responseBody); throw new IOException("HTTP " + response.code() + ": " + responseBody); } return JSON.parseObject(responseBody); } } /** * 发送 OKX 签名 GET 请求并返回解析后的 JSONObject。 * *

GET 请求的签名中 body 为空字符串。 * * @param path API 路径(如 /api/v5/account/positions) * @return 解析后的响应 JSONObject * @throws IOException 网络异常 */ JSONObject okGet(String path) throws IOException { String method = "GET"; String timestamp = getIsoTimestamp(); String sign = null; try { sign = sign(timestamp, method, path, ""); } catch (Exception e) { e.printStackTrace(); } Request.Builder builder = new Request.Builder() .url(config.getRestBasePath() + path) .header("OK-ACCESS-KEY", config.getApiKey()) .header("OK-ACCESS-SIGN", sign) .header("OK-ACCESS-TIMESTAMP", timestamp) .header("OK-ACCESS-PASSPHRASE", config.getPassphrase()) .get(); // 模拟盘需加 x-simulated-trading 头 if (!config.isProduction()) { builder.header("x-simulated-trading", "1"); } Request request = builder.build(); try (Response response = httpClient.newCall(request).execute()) { String responseBody = response.body() != null ? response.body().string() : "{}"; if (!response.isSuccessful()) { log.error("[TradeExec-OKX] HTTP {} GET {}: {}", response.code(), path, responseBody); throw new IOException("HTTP " + response.code() + ": " + responseBody); } return JSON.parseObject(responseBody); } } // ==================== 签名工具方法 ==================== /** * 生成 OKX API 签名。 * *

签名算法: *

    *
  1. 拼接签名字符串:{@code timestamp + method + path + body}
  2. *
  3. 使用 apiSecret 对签名字符串做 HMAC-SHA256
  4. *
  5. Base64 编码
  6. *
* * @param timestamp OKX 格式时间戳(ISO 8601) * @param method HTTP 方法(GET/POST) * @param path API 路径(如 /api/v5/trade/order) * @param body 请求体(GET 请求传 "") * @return Base64 编码的签名字符串 * @throws Exception 签名计算异常 */ private String sign(String timestamp, String method, String path, String body) throws Exception { String signString = timestamp + method + path + body; Mac sha256Hmac = Mac.getInstance("HmacSHA256"); SecretKeySpec secretKey = new SecretKeySpec(config.getApiSecret().getBytes(), "HmacSHA256"); sha256Hmac.init(secretKey); byte[] signedBytes = sha256Hmac.doFinal(signString.getBytes()); return Base64.getEncoder().encodeToString(signedBytes); } /** * 获取 OKX 格式的 ISO 8601 时间戳。 * *

格式示例:{@code 2023-01-01T00:00:00.000Z} * * @return ISO 8601 格式的 UTC 时间戳字符串 */ private String getIsoTimestamp() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"); sdf.setTimeZone(TimeZone.getTimeZone("UTC")); return sdf.format(new Date()); } }