Administrator
17 hours ago 025c66091b6b6903b5e830c5bde981fdbacbc744
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package com.xcong.excoin.modules.okxApi;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.enums.OkxEnums;
import com.xcong.excoin.modules.okxApi.param.TradeRequestParam;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
 
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * OKX WebSocket 交易执行器。
 *
 * <h3>设计目的</h3>
 * WebSocket 消息在回调线程中处理。下单操作参数构建等逻辑
 * 提交到独立线程池异步执行,避免阻塞 WS 回调线程。
 *
 * <h3>回调设计</h3>
 * 每个下单方法接受 onSuccess/onFailure 两个 Runnable。
 * 基底开仓时 onSuccess 用于标记基底已开,网格触发时通常为 null(成交状态由仓位推送驱动)。
 *
 * <h3>线程模型</h3>
 * <ul>
 *   <li><b>单线程</b>:保证下单顺序(开多→开空→止盈单),避免并发竞争</li>
 *   <li><b>有界队列 64</b>:防止堆积。极端行情下最多累积 64 个任务</li>
 *   <li><b>CallerRunsPolicy</b>:队列满时由提交线程直接同步执行,形成自然背压</li>
 *   <li><b>allowCoreThreadTimeOut</b>:60s 空闲后线程回收,不浪费资源</li>
 * </ul>
 *
 * <h3>调用链</h3>
 * <pre>
 *   OkxGridTradeService.onKline → executor.openLong/openShort (基底双开 + 网格触发)
 *   OkxGridTradeService.onPositionUpdate → executor.placeTakeProfit (开仓成交后设止盈)
 *   OkxGridTradeService.stopGrid → executor.cancelAllPriceTriggeredOrders
 * </pre>
 *
 * @author Administrator
 */
@Slf4j
public class OkxTradeExecutor {
 
    private static final String ORDER_TYPE_CLOSE_LONG = "plan-close-long-position";
    private static final String ORDER_TYPE_CLOSE_SHORT = "plan-close-short-position";
 
    private final String contract;
    private final String marginMode;
    private final String accountName;
 
    private volatile WebSocketClient wsClient;
 
    private final ExecutorService executor;
 
    public OkxTradeExecutor(String contract, String marginMode, String accountName) {
        this.contract = contract;
        this.marginMode = marginMode;
        this.accountName = accountName;
        this.executor = new ThreadPoolExecutor(
                1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(64),
                r -> {
                    Thread t = new Thread(r, "okxApi-trade-worker");
                    t.setDaemon(true);
                    return t;
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
    }
 
    public void setWebSocketClient(WebSocketClient wsClient) {
        this.wsClient = wsClient;
    }
 
    public void shutdown() {
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
        }
    }
 
    /**
     * 异步市价开多。quantity 为正数(如 "1")。
     *
     * @param quantity  开仓张数(正数)
     * @param onSuccess 成交成功回调(可为 null)
     * @param onFailure 成交失败回调(可为 null)
     */
    public void openLong(String quantity, Runnable onSuccess, Runnable onFailure) {
        openPosition(quantity, OkxEnums.POSSIDE_LONG, OkxEnums.SIDE_BUY, "开多", onSuccess, onFailure);
    }
 
    /**
     * 异步市价开空。quantity 为正数(如 "1")。
     *
     * @param quantity  开仓张数(正数)
     * @param onSuccess 成交成功回调(可为 null)
     * @param onFailure 成交失败回调(可为 null)
     */
    public void openShort(String quantity, Runnable onSuccess, Runnable onFailure) {
        openPosition(quantity, OkxEnums.POSSIDE_SHORT, OkxEnums.SIDE_SELL, "开空", onSuccess, onFailure);
    }
 
    private void openPosition(String sz, String posSide, String side, String label, Runnable onSuccess, Runnable onFailure) {
        executor.execute(() -> {
            try {
                TradeRequestParam param = buildParam(side, posSide, sz, OkxEnums.ORDTYPE_MARKET);
                sendOrder(param);
 
                log.info("[TradeExec] {}已发送, 方向:{}, 数量:{}", label, posSide, sz);
                if (onSuccess != null) {
                    onSuccess.run();
                }
            } catch (Exception e) {
                log.error("[TradeExec] {}发送失败", label, e);
                if (onFailure != null) {
                    onFailure.run();
                }
            }
        });
    }
 
    /**
     * 异步创建止盈条件单(仓位计划止盈止损)。
     *
     * <p>通过 OKX WebSocket 发送 algo order(conditional order),服务器监控价格,
     * 达到触发价后自动平指定张数。
     *
     * <h3>orderType 说明</h3>
     * <ul>
     *   <li>plan-close-long-position:平多仓,posSide=long, side=sell</li>
     *   <li>plan-close-short-position:平空仓,posSide=short, side=buy</li>
     * </ul>
     *
     * <p>止盈单创建失败时,立即市价平仓兜底(marketClose)。
     *
     * @param triggerPrice 触发价格
     * @param orderType    stop 类型(plan-close-long-position / plan-close-short-position)
     * @param size         平仓张数(正数)
     */
    public void placeTakeProfit(BigDecimal triggerPrice, String orderType, String size) {
        executor.execute(() -> {
            String posSide;
            String side;
            if (ORDER_TYPE_CLOSE_LONG.equals(orderType)) {
                posSide = OkxEnums.POSSIDE_LONG;
                side = OkxEnums.SIDE_SELL;
            } else {
                posSide = OkxEnums.POSSIDE_SHORT;
                side = OkxEnums.SIDE_BUY;
            }
 
            try {
                TradeRequestParam param = buildParam(side, posSide, size, OkxEnums.ORDTYPE_LIMIT);
                param.setMarkPx(triggerPrice.toString());
 
                List<TradeRequestParam> params = new ArrayList<>();
                params.add(param);
                sendBatchOrders(params);
                log.info("[TradeExec] 止盈单已发送, 触发价:{}, 类型:{}, size:{}", triggerPrice, orderType, size);
            } catch (Exception e) {
                log.error("[TradeExec] 止盈单发送失败, 触发价:{}, size:{}, 立即市价止盈", triggerPrice, size, e);
                marketClose(side, posSide, size);
            }
        });
    }
 
    /**
     * 市价止盈:在止盈条件单创建失败时立即市价平仓。
     */
    private void marketClose(String side, String posSide, String size) {
        try {
            TradeRequestParam param = buildParam(side, posSide, size, OkxEnums.ORDTYPE_MARKET);
            param.setTradeType("3");
            sendOrder(param);
            log.info("[TradeExec] 市价止盈已发送, posSide:{}, size:{}", posSide, size);
        } catch (Exception e) {
            log.error("[TradeExec] 市价止盈也失败, posSide:{}, size:{}", posSide, size, e);
        }
    }
 
    /**
     * 异步清除指定合约的所有止盈止损条件单。
     */
    public void cancelAllPriceTriggeredOrders() {
        executor.execute(() -> {
            try {
                if (wsClient == null || !wsClient.isOpen()) {
                    log.warn("[TradeExec] WS未连接,跳过撤销条件单");
                    return;
                }
                JSONArray argsArray = new JSONArray();
                JSONObject args = new JSONObject();
                args.put("instId", contract);
                args.put("algoOrdType", "conditional");
                argsArray.add(args);
 
                String connId = OkxWsUtil.getOrderNum("cancel");
                JSONObject msg = OkxWsUtil.buildJsonObject(connId, "cancel-algos", argsArray);
                wsClient.send(msg.toJSONString());
                log.info("[TradeExec] 已发送撤销所有条件单请求");
            } catch (Exception e) {
                log.error("[TradeExec] 撤销条件单失败", e);
            }
        });
    }
 
    private TradeRequestParam buildParam(String side, String posSide, String sz, String ordType) {
        TradeRequestParam param = new TradeRequestParam();
        param.setAccountName(accountName);
        param.setInstId(contract);
        param.setTdMode(marginMode);
        param.setPosSide(posSide);
        param.setOrdType(ordType);
        param.setSide(side);
        param.setClOrdId(OkxWsUtil.getOrderNum(side));
        param.setSz(sz);
        param.setTradeType("1");
        return param;
    }
 
    private void sendOrder(TradeRequestParam param) {
        if (wsClient == null || !wsClient.isOpen()) {
            log.warn("[TradeExec] WS未连接,跳过下单");
            return;
        }
        if (BigDecimal.ZERO.compareTo(new BigDecimal(param.getSz())) >= 0) {
            log.warn("[TradeExec] 下单数量<=0,跳过");
            return;
        }
 
        JSONArray argsArray = new JSONArray();
        JSONObject args = new JSONObject();
        args.put("instId", param.getInstId());
        args.put("tdMode", param.getTdMode());
        args.put("clOrdId", param.getClOrdId());
        args.put("side", param.getSide());
        args.put("posSide", param.getPosSide());
        args.put("ordType", param.getOrdType());
        args.put("sz", param.getSz());
        argsArray.add(args);
 
        String connId = OkxWsUtil.getOrderNum("order");
        JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order", argsArray);
        wsClient.send(msg.toJSONString());
        log.info("[TradeExec] 发送下单: side={}, sz={}", param.getSide(), param.getSz());
    }
 
    private void sendBatchOrders(List<TradeRequestParam> params) {
        if (wsClient == null || !wsClient.isOpen() || params == null || params.isEmpty()) {
            log.warn("[TradeExec] WS未连接或参数为空,跳过批量下单");
            return;
        }
 
        JSONArray argsArray = new JSONArray();
        for (TradeRequestParam p : params) {
            JSONObject args = new JSONObject();
            args.put("instId", p.getInstId());
            args.put("tdMode", p.getTdMode());
            args.put("clOrdId", p.getClOrdId());
            args.put("side", p.getSide());
            args.put("posSide", p.getPosSide());
            args.put("ordType", p.getOrdType());
            args.put("sz", p.getSz());
            args.put("px", p.getMarkPx());
            argsArray.add(args);
        }
 
        String connId = OkxWsUtil.getOrderNum(null);
        JSONObject msg = OkxWsUtil.buildJsonObject(connId, "batch-orders", argsArray);
        wsClient.send(msg.toJSONString());
        log.info("[TradeExec] 发送批量下单: {}条", params.size());
    }
}