package com.xcong.excoin.modules.okxApi; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxApi.enums.OkxEnums; import com.xcong.excoin.modules.okxApi.param.TradeRequestParam; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * OKX WebSocket 交易执行器。 * *

设计目的

* WebSocket 消息在回调线程中处理。下单操作参数构建等逻辑 * 提交到独立线程池异步执行,避免阻塞 WS 回调线程。 * *

线程模型

* * * @author Administrator */ @Slf4j public class OkxTradeExecutor { private final OkxConfig config; private final String accountName; private final ExecutorService executor; public OkxTradeExecutor(OkxConfig config, String accountName) { this.config = config; this.accountName = accountName; this.executor = new ThreadPoolExecutor( 1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(64), r -> { Thread t = new Thread(r, "okxApi-trade-worker"); t.setDaemon(true); return t; }, new ThreadPoolExecutor.CallerRunsPolicy() ); ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); } public void shutdown() { executor.shutdown(); try { executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } public void openLong(WebSocketClient wsClient, Runnable onSuccess) { openPosition(wsClient, config.getQuantity(), OkxEnums.POSSIDE_LONG, OkxEnums.SIDE_BUY, "开多", onSuccess); } public void openShort(WebSocketClient wsClient, Runnable onSuccess) { openPosition(wsClient, config.getQuantity(), OkxEnums.POSSIDE_SHORT, OkxEnums.SIDE_SELL, "开空", onSuccess); } private void openPosition(WebSocketClient wsClient, String sz, String posSide, String side, String label, Runnable onSuccess) { executor.execute(() -> { try { TradeRequestParam param = buildParam(side, posSide, sz, OkxEnums.ORDTYPE_MARKET); sendOrder(wsClient, param); log.info("[TradeExec] {}已发送, 方向:{}, 数量:{}", label, posSide, sz); if (onSuccess != null) { onSuccess.run(); } } catch (Exception e) { log.error("[TradeExec] {}发送失败", label, e); } }); } public void placeTakeProfit(WebSocketClient wsClient, String posSide, BigDecimal triggerPrice, String size) { executor.execute(() -> { try { String side = OkxEnums.POSSIDE_LONG.equals(posSide) ? OkxEnums.SIDE_SELL : OkxEnums.SIDE_BUY; TradeRequestParam param = buildParam(side, posSide, size, OkxEnums.ORDTYPE_LIMIT); param.setMarkPx(triggerPrice.toString()); List params = new ArrayList<>(); params.add(param); sendBatchOrders(wsClient, params); log.info("[TradeExec] 止盈单已发送, 方向:{}, 触发价:{}, 数量:{}", posSide, triggerPrice, size); } catch (Exception e) { log.error("[TradeExec] 止盈单发送失败", e); } }); } private TradeRequestParam buildParam(String side, String posSide, String sz, String ordType) { TradeRequestParam param = new TradeRequestParam(); param.setAccountName(accountName); param.setInstId(config.getContract()); param.setTdMode(config.getMarginMode()); param.setPosSide(posSide); param.setOrdType(ordType); param.setSide(side); param.setClOrdId(OkxWsUtil.getOrderNum(side)); param.setSz(sz); param.setTradeType("1"); return param; } private void sendOrder(WebSocketClient wsClient, TradeRequestParam param) { if (wsClient == null || !wsClient.isOpen()) { log.warn("[TradeExec] WS未连接,跳过下单"); return; } if (BigDecimal.ZERO.compareTo(new BigDecimal(param.getSz())) >= 0) { log.warn("[TradeExec] 下单数量<=0,跳过"); return; } JSONArray argsArray = new JSONArray(); JSONObject args = new JSONObject(); args.put("instId", param.getInstId()); args.put("tdMode", param.getTdMode()); args.put("clOrdId", param.getClOrdId()); args.put("side", param.getSide()); args.put("posSide", param.getPosSide()); args.put("ordType", param.getOrdType()); args.put("sz", param.getSz()); argsArray.add(args); String connId = OkxWsUtil.getOrderNum("order"); JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order", argsArray); wsClient.send(msg.toJSONString()); log.info("[TradeExec] 发送下单: side={}, sz={}", param.getSide(), param.getSz()); } private void sendBatchOrders(WebSocketClient wsClient, List params) { if (wsClient == null || !wsClient.isOpen() || params == null || params.isEmpty()) { log.warn("[TradeExec] WS未连接或参数为空,跳过批量下单"); return; } JSONArray argsArray = new JSONArray(); for (TradeRequestParam p : params) { JSONObject args = new JSONObject(); args.put("instId", p.getInstId()); args.put("tdMode", p.getTdMode()); args.put("clOrdId", p.getClOrdId()); args.put("side", p.getSide()); args.put("posSide", p.getPosSide()); args.put("ordType", p.getOrdType()); args.put("sz", p.getSz()); args.put("px", p.getMarkPx()); argsArray.add(args); } String connId = OkxWsUtil.getOrderNum(null); JSONObject msg = OkxWsUtil.buildJsonObject(connId, "batch-orders", argsArray); wsClient.send(msg.toJSONString()); log.info("[TradeExec] 发送批量下单: {}条", params.size()); } }