Administrator
yesterday 1cbbbb86a6ed0036540ff40f9d0051d400d692b4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package com.xcong.excoin.modules.okxApi;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.enums.OkxEnums;
import com.xcong.excoin.modules.okxApi.param.TradeRequestParam;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
 
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * OKX WebSocket 交易执行器。
 *
 * <h3>设计目的</h3>
 * WebSocket 消息在回调线程中处理。下单操作参数构建等逻辑
 * 提交到独立线程池异步执行,避免阻塞 WS 回调线程。
 *
 * <h3>线程模型</h3>
 * <ul>
 *   <li><b>单线程</b>:保证下单顺序(开多→开空→止盈单),避免并发竞争</li>
 *   <li><b>有界队列 64</b>:防止堆积</li>
 *   <li><b>CallerRunsPolicy</b>:队列满时由提交线程直接同步执行,形成自然背压</li>
 *   <li><b>allowCoreThreadTimeOut</b>:60s 空闲后线程回收</li>
 * </ul>
 *
 * @author Administrator
 */
@Slf4j
public class OkxTradeExecutor {
 
    private final OkxConfig config;
    private final String accountName;
 
    private final ExecutorService executor;
 
    public OkxTradeExecutor(OkxConfig config, String accountName) {
        this.config = config;
        this.accountName = accountName;
        this.executor = new ThreadPoolExecutor(
                1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(64),
                r -> {
                    Thread t = new Thread(r, "okxApi-trade-worker");
                    t.setDaemon(true);
                    return t;
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
    }
 
    public void shutdown() {
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
        }
    }
 
    public void openLong(WebSocketClient wsClient, Runnable onSuccess) {
        openPosition(wsClient, config.getQuantity(), OkxEnums.POSSIDE_LONG, OkxEnums.SIDE_BUY, "开多", onSuccess);
    }
 
    public void openShort(WebSocketClient wsClient, Runnable onSuccess) {
        openPosition(wsClient, config.getQuantity(), OkxEnums.POSSIDE_SHORT, OkxEnums.SIDE_SELL, "开空", onSuccess);
    }
 
    private void openPosition(WebSocketClient wsClient, String sz, String posSide, String side, String label, Runnable onSuccess) {
        executor.execute(() -> {
            try {
                TradeRequestParam param = buildParam(side, posSide, sz, OkxEnums.ORDTYPE_MARKET);
                sendOrder(wsClient, param);
 
                log.info("[TradeExec] {}已发送, 方向:{}, 数量:{}", label, posSide, sz);
                if (onSuccess != null) {
                    onSuccess.run();
                }
            } catch (Exception e) {
                log.error("[TradeExec] {}发送失败", label, e);
            }
        });
    }
 
    public void placeTakeProfit(WebSocketClient wsClient, String posSide, BigDecimal triggerPrice, String size) {
        executor.execute(() -> {
            try {
                String side = OkxEnums.POSSIDE_LONG.equals(posSide) ? OkxEnums.SIDE_SELL : OkxEnums.SIDE_BUY;
 
                TradeRequestParam param = buildParam(side, posSide, size, OkxEnums.ORDTYPE_LIMIT);
                param.setMarkPx(triggerPrice.toString());
 
                List<TradeRequestParam> params = new ArrayList<>();
                params.add(param);
                sendBatchOrders(wsClient, params);
                log.info("[TradeExec] 止盈单已发送, 方向:{}, 触发价:{}, 数量:{}", posSide, triggerPrice, size);
            } catch (Exception e) {
                log.error("[TradeExec] 止盈单发送失败", e);
            }
        });
    }
 
    private TradeRequestParam buildParam(String side, String posSide, String sz, String ordType) {
        TradeRequestParam param = new TradeRequestParam();
        param.setAccountName(accountName);
        param.setInstId(config.getContract());
        param.setTdMode(config.getMarginMode());
        param.setPosSide(posSide);
        param.setOrdType(ordType);
        param.setSide(side);
        param.setClOrdId(OkxWsUtil.getOrderNum(side));
        param.setSz(sz);
        param.setTradeType("1");
        return param;
    }
 
    private void sendOrder(WebSocketClient wsClient, TradeRequestParam param) {
        if (wsClient == null || !wsClient.isOpen()) {
            log.warn("[TradeExec] WS未连接,跳过下单");
            return;
        }
        if (BigDecimal.ZERO.compareTo(new BigDecimal(param.getSz())) >= 0) {
            log.warn("[TradeExec] 下单数量<=0,跳过");
            return;
        }
 
        JSONArray argsArray = new JSONArray();
        JSONObject args = new JSONObject();
        args.put("instId", param.getInstId());
        args.put("tdMode", param.getTdMode());
        args.put("clOrdId", param.getClOrdId());
        args.put("side", param.getSide());
        args.put("posSide", param.getPosSide());
        args.put("ordType", param.getOrdType());
        args.put("sz", param.getSz());
        argsArray.add(args);
 
        String connId = OkxWsUtil.getOrderNum("order");
        JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order", argsArray);
        wsClient.send(msg.toJSONString());
        log.info("[TradeExec] 发送下单: side={}, sz={}", param.getSide(), param.getSz());
    }
 
    private void sendBatchOrders(WebSocketClient wsClient, List<TradeRequestParam> params) {
        if (wsClient == null || !wsClient.isOpen() || params == null || params.isEmpty()) {
            log.warn("[TradeExec] WS未连接或参数为空,跳过批量下单");
            return;
        }
 
        JSONArray argsArray = new JSONArray();
        for (TradeRequestParam p : params) {
            JSONObject args = new JSONObject();
            args.put("instId", p.getInstId());
            args.put("tdMode", p.getTdMode());
            args.put("clOrdId", p.getClOrdId());
            args.put("side", p.getSide());
            args.put("posSide", p.getPosSide());
            args.put("ordType", p.getOrdType());
            args.put("sz", p.getSz());
            args.put("px", p.getMarkPx());
            argsArray.add(args);
        }
 
        String connId = OkxWsUtil.getOrderNum(null);
        JSONObject msg = OkxWsUtil.buildJsonObject(connId, "batch-orders", argsArray);
        wsClient.send(msg.toJSONString());
        log.info("[TradeExec] 发送批量下单: {}条", params.size());
    }
}