package com.xcong.excoin.modules.okxApi;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.enums.OkxEnums;
import com.xcong.excoin.modules.okxApi.param.TradeRequestParam;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* OKX WebSocket 交易执行器。
*
*
设计目的
* WebSocket 消息在回调线程中处理。下单操作参数构建等逻辑
* 提交到独立线程池异步执行,避免阻塞 WS 回调线程。
*
* 线程模型
*
* - 单线程:保证下单顺序(开多→开空→止盈单),避免并发竞争
* - 有界队列 64:防止堆积
* - CallerRunsPolicy:队列满时由提交线程直接同步执行,形成自然背压
* - allowCoreThreadTimeOut:60s 空闲后线程回收
*
*
* @author Administrator
*/
@Slf4j
public class OkxTradeExecutor {
private final OkxConfig config;
private final String accountName;
private final ExecutorService executor;
public OkxTradeExecutor(OkxConfig config, String accountName) {
this.config = config;
this.accountName = accountName;
this.executor = new ThreadPoolExecutor(
1, 1,
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(64),
r -> {
Thread t = new Thread(r, "okxApi-trade-worker");
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.CallerRunsPolicy()
);
((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
}
public void shutdown() {
executor.shutdown();
try {
executor.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
public void openLong(WebSocketClient wsClient, Runnable onSuccess) {
openPosition(wsClient, config.getQuantity(), OkxEnums.POSSIDE_LONG, OkxEnums.SIDE_BUY, "开多", onSuccess);
}
public void openShort(WebSocketClient wsClient, Runnable onSuccess) {
openPosition(wsClient, config.getQuantity(), OkxEnums.POSSIDE_SHORT, OkxEnums.SIDE_SELL, "开空", onSuccess);
}
private void openPosition(WebSocketClient wsClient, String sz, String posSide, String side, String label, Runnable onSuccess) {
executor.execute(() -> {
try {
TradeRequestParam param = buildParam(side, posSide, sz, OkxEnums.ORDTYPE_MARKET);
sendOrder(wsClient, param);
log.info("[TradeExec] {}已发送, 方向:{}, 数量:{}", label, posSide, sz);
if (onSuccess != null) {
onSuccess.run();
}
} catch (Exception e) {
log.error("[TradeExec] {}发送失败", label, e);
}
});
}
public void placeTakeProfit(WebSocketClient wsClient, String posSide, BigDecimal triggerPrice, String size) {
executor.execute(() -> {
try {
String side = OkxEnums.POSSIDE_LONG.equals(posSide) ? OkxEnums.SIDE_SELL : OkxEnums.SIDE_BUY;
TradeRequestParam param = buildParam(side, posSide, size, OkxEnums.ORDTYPE_LIMIT);
param.setMarkPx(triggerPrice.toString());
List params = new ArrayList<>();
params.add(param);
sendBatchOrders(wsClient, params);
log.info("[TradeExec] 止盈单已发送, 方向:{}, 触发价:{}, 数量:{}", posSide, triggerPrice, size);
} catch (Exception e) {
log.error("[TradeExec] 止盈单发送失败", e);
}
});
}
private TradeRequestParam buildParam(String side, String posSide, String sz, String ordType) {
TradeRequestParam param = new TradeRequestParam();
param.setAccountName(accountName);
param.setInstId(config.getContract());
param.setTdMode(config.getMarginMode());
param.setPosSide(posSide);
param.setOrdType(ordType);
param.setSide(side);
param.setClOrdId(OkxWsUtil.getOrderNum(side));
param.setSz(sz);
param.setTradeType("1");
return param;
}
private void sendOrder(WebSocketClient wsClient, TradeRequestParam param) {
if (wsClient == null || !wsClient.isOpen()) {
log.warn("[TradeExec] WS未连接,跳过下单");
return;
}
if (BigDecimal.ZERO.compareTo(new BigDecimal(param.getSz())) >= 0) {
log.warn("[TradeExec] 下单数量<=0,跳过");
return;
}
JSONArray argsArray = new JSONArray();
JSONObject args = new JSONObject();
args.put("instId", param.getInstId());
args.put("tdMode", param.getTdMode());
args.put("clOrdId", param.getClOrdId());
args.put("side", param.getSide());
args.put("posSide", param.getPosSide());
args.put("ordType", param.getOrdType());
args.put("sz", param.getSz());
argsArray.add(args);
String connId = OkxWsUtil.getOrderNum("order");
JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order", argsArray);
wsClient.send(msg.toJSONString());
log.info("[TradeExec] 发送下单: side={}, sz={}", param.getSide(), param.getSz());
}
private void sendBatchOrders(WebSocketClient wsClient, List params) {
if (wsClient == null || !wsClient.isOpen() || params == null || params.isEmpty()) {
log.warn("[TradeExec] WS未连接或参数为空,跳过批量下单");
return;
}
JSONArray argsArray = new JSONArray();
for (TradeRequestParam p : params) {
JSONObject args = new JSONObject();
args.put("instId", p.getInstId());
args.put("tdMode", p.getTdMode());
args.put("clOrdId", p.getClOrdId());
args.put("side", p.getSide());
args.put("posSide", p.getPosSide());
args.put("ordType", p.getOrdType());
args.put("sz", p.getSz());
args.put("px", p.getMarkPx());
argsArray.add(args);
}
String connId = OkxWsUtil.getOrderNum(null);
JSONObject msg = OkxWsUtil.buildJsonObject(connId, "batch-orders", argsArray);
wsClient.send(msg.toJSONString());
log.info("[TradeExec] 发送批量下单: {}条", params.size());
}
}