package com.xcong.excoin.modules.okxApi; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxApi.enums.OkxEnums; import com.xcong.excoin.modules.okxApi.param.TradeRequestParam; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import java.math.BigDecimal; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * OKX WebSocket 交易执行器。 * *

设计目的

* WebSocket 消息在回调线程中处理。下单操作参数构建等逻辑 * 提交到独立线程池异步执行,避免阻塞 WS 回调线程。 * *

回调设计

* 每个下单方法接受 onSuccess/onFailure 两个 Runnable。 * 基底开仓时 onSuccess 用于标记基底已开,网格触发时通常为 null(成交状态由仓位推送驱动)。 * *

线程模型

* * *

调用链

*
 *   OkxGridTradeService.onKline → executor.openLong/openShort (基底双开 + 网格触发)
 *   OkxGridTradeService.onPositionUpdate → executor.placeTakeProfit (开仓成交后设止盈)
 *   OkxGridTradeService.stopGrid → executor.cancelAllPriceTriggeredOrders
 * 
* * @author Administrator */ @Slf4j public class OkxTradeExecutor { private final String contract; private final String marginMode; private final String accountName; private volatile WebSocketClient wsClient; private final ExecutorService executor; public OkxTradeExecutor(String contract, String marginMode, String accountName) { this.contract = contract; this.marginMode = marginMode; this.accountName = accountName; this.executor = new ThreadPoolExecutor( 1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(64), r -> { Thread t = new Thread(r, "okxApi-trade-worker"); t.setDaemon(true); return t; }, new ThreadPoolExecutor.CallerRunsPolicy() ); ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); } public void setWebSocketClient(WebSocketClient wsClient) { this.wsClient = wsClient; } public void shutdown() { executor.shutdown(); try { executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } /** * 异步市价开多。quantity 为正数(如 "1")。 * * @param quantity 开仓张数(正数) * @param onSuccess 成交成功回调(可为 null) * @param onFailure 成交失败回调(可为 null) */ public void openLong(String quantity, Runnable onSuccess, Runnable onFailure) { openPosition(quantity, OkxEnums.POSSIDE_LONG, OkxEnums.SIDE_BUY, "开多", onSuccess, onFailure); } /** * 异步市价开空。quantity 为正数(如 "1")。 * * @param quantity 开仓张数(正数) * @param onSuccess 成交成功回调(可为 null) * @param onFailure 成交失败回调(可为 null) */ public void openShort(String quantity, Runnable onSuccess, Runnable onFailure) { openPosition(quantity, OkxEnums.POSSIDE_SHORT, OkxEnums.SIDE_SELL, "开空", onSuccess, onFailure); } private void openPosition(String sz, String posSide, String side, String label, Runnable onSuccess, Runnable onFailure) { executor.execute(() -> { try { TradeRequestParam param = buildParam(side, posSide, sz, OkxEnums.ORDTYPE_MARKET); sendOrder(param); log.info("[TradeExec] {}已发送, 方向:{}, 数量:{}", label, posSide, sz); if (onSuccess != null) { onSuccess.run(); } } catch (Exception e) { log.error("[TradeExec] {}发送失败", label, e); if (onFailure != null) { onFailure.run(); } } }); } /** * 异步创建止盈条件单(仓位计划止盈止损)。 * *

通过 OKX WebSocket 发送 algo order(conditional order),服务器监控价格, * 达到触发价后自动平指定张数。 * *

orderType 说明

* * *

止盈单创建失败时,立即市价平仓兜底(marketClose)。 * * @param triggerPrice 触发价格 * @param orderType stop 类型(plan-close-long-position / plan-close-short-position) * @param size 平仓张数(正数) */ public void placeTakeProfit(BigDecimal triggerPrice, String orderType, String size) { executor.execute(() -> { String posSide; String side; if (OkxEnums.ORDER_TYPE_CLOSE_LONG.equals(orderType)) { posSide = OkxEnums.POSSIDE_LONG; side = OkxEnums.SIDE_SELL; } else if (OkxEnums.ORDER_TYPE_CLOSE_SHORT.equals(orderType)) { posSide = OkxEnums.POSSIDE_SHORT; side = OkxEnums.SIDE_BUY; } else { log.error("[TradeExec] 未知止盈类型: {}", orderType); return; } try { if (wsClient == null || !wsClient.isOpen()) { log.warn("[TradeExec] WS未连接,跳过止盈单"); return; } if (BigDecimal.ZERO.compareTo(new BigDecimal(size)) >= 0) { log.warn("[TradeExec] 止盈数量<=0,跳过"); return; } JSONArray argsArray = new JSONArray(); JSONObject args = new JSONObject(); args.put("instId", contract); args.put("tdMode", marginMode); args.put("side", side); args.put("posSide", posSide); args.put("ordType", OkxEnums.ORDTYPE_CONDITIONAL); args.put("sz", size); args.put("tpTriggerPx", triggerPrice.toString()); args.put("tpOrdPx", "-1"); argsArray.add(args); String connId = OkxWsUtil.getOrderNum("algo"); JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order-algo", argsArray); wsClient.send(msg.toJSONString()); log.info("[TradeExec] 止盈单已发送, 触发价:{}, 类型:{}, size:{}", triggerPrice, orderType, size); } catch (Exception e) { log.error("[TradeExec] 止盈单发送失败, 触发价:{}, size:{}, 立即市价止盈", triggerPrice, size, e); marketClose(side, posSide, size); } }); } /** * 市价止盈:在止盈条件单创建失败时立即市价平仓。 */ private void marketClose(String side, String posSide, String size) { try { TradeRequestParam param = buildParam(side, posSide, size, OkxEnums.ORDTYPE_MARKET); param.setTradeType("3"); sendOrder(param); log.info("[TradeExec] 市价止盈已发送, posSide:{}, size:{}", posSide, size); } catch (Exception e) { log.error("[TradeExec] 市价止盈也失败, posSide:{}, size:{}", posSide, size, e); } } /** * 异步清除指定合约的所有止盈止损条件单。 */ public void cancelAllPriceTriggeredOrders() { executor.execute(() -> { try { if (wsClient == null || !wsClient.isOpen()) { log.warn("[TradeExec] WS未连接,跳过撤销条件单"); return; } JSONArray argsArray = new JSONArray(); JSONObject args = new JSONObject(); args.put("instId", contract); args.put("algoOrdType", "conditional"); argsArray.add(args); String connId = OkxWsUtil.getOrderNum("cancel"); JSONObject msg = OkxWsUtil.buildJsonObject(connId, "cancel-algos", argsArray); wsClient.send(msg.toJSONString()); log.info("[TradeExec] 已发送撤销所有条件单请求"); } catch (Exception e) { log.error("[TradeExec] 撤销条件单失败", e); } }); } private TradeRequestParam buildParam(String side, String posSide, String sz, String ordType) { TradeRequestParam param = new TradeRequestParam(); param.setAccountName(accountName); param.setInstId(contract); param.setTdMode(marginMode); param.setPosSide(posSide); param.setOrdType(ordType); param.setSide(side); param.setClOrdId(OkxWsUtil.getOrderNum(side)); param.setSz(sz); param.setTradeType("1"); return param; } private JSONObject buildOrderArgs(TradeRequestParam param) { JSONObject args = new JSONObject(); args.put("instId", param.getInstId()); args.put("tdMode", param.getTdMode()); args.put("clOrdId", param.getClOrdId()); args.put("side", param.getSide()); args.put("posSide", param.getPosSide()); args.put("ordType", param.getOrdType()); args.put("sz", param.getSz()); return args; } private void sendOrder(TradeRequestParam param) { if (wsClient == null || !wsClient.isOpen()) { log.warn("[TradeExec] WS未连接,跳过下单"); return; } if (BigDecimal.ZERO.compareTo(new BigDecimal(param.getSz())) >= 0) { log.warn("[TradeExec] 下单数量<=0,跳过"); return; } JSONArray argsArray = new JSONArray(); argsArray.add(buildOrderArgs(param)); String connId = OkxWsUtil.getOrderNum("order"); JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order", argsArray); wsClient.send(msg.toJSONString()); log.info("[TradeExec] 发送下单: side={}, sz={}", param.getSide(), param.getSz()); } private void sendBatchOrders(List params) { if (wsClient == null || !wsClient.isOpen() || params == null || params.isEmpty()) { log.warn("[TradeExec] WS未连接或参数为空,跳过批量下单"); return; } JSONArray argsArray = new JSONArray(); for (TradeRequestParam p : params) { argsArray.add(buildOrderArgs(p)); } String connId = OkxWsUtil.getOrderNum(null); JSONObject msg = OkxWsUtil.buildJsonObject(connId, "batch-orders", argsArray); wsClient.send(msg.toJSONString()); log.info("[TradeExec] 发送批量下单: {}条", params.size()); } }