Administrator
4 days ago e692f08fddcfb73b8a830957a14309917deccf24
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
package com.xcong.excoin.modules.gateApi;
 
import io.gate.gateapi.ApiClient;
import io.gate.gateapi.GateApiException;
import io.gate.gateapi.api.FuturesApi;
import io.gate.gateapi.models.FuturesInitialOrder;
import io.gate.gateapi.models.FuturesOrder;
import io.gate.gateapi.models.FuturesPriceTrigger;
import io.gate.gateapi.models.FuturesPriceTriggeredOrder;
import io.gate.gateapi.models.TriggerOrderResponse;
import lombok.extern.slf4j.Slf4j;
 
import java.math.BigDecimal;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * Gate REST API 执行器。
 *
 * <h3>设计目的</h3>
 * WebSocket 消息在回调线程中处理(如 {@code WebSocketClient} 的 {@code onMessage} 线程)。
 * 下单 REST API 调用可能耗时数百毫秒,若同步执行会阻塞 WS 回调线程,导致心跳超时误判。
 * 本类将所有 REST 调用提交到独立线程池异步执行。
 *
 * <h3>回调设计</h3>
 * 每个下单方法接受 onSuccess/onFailure 两个 Runnable。
 * 基底开仓时 onSuccess 用于标记基底已开,网格触发时通常为 null(成交状态由仓位推送驱动)。
 *
 * <h3>线程模型</h3>
 * <ul>
 *   <li><b>单线程</b>:保证下单顺序(开多→开空→止盈单),避免并发竞争</li>
 *   <li><b>有界队列 64</b>:防止堆积。极端行情下最多累积 64 个任务</li>
 *   <li><b>CallerRunsPolicy</b>:队列满时由提交线程直接同步执行,形成自然背压</li>
 *   <li><b>allowCoreThreadTimeOut</b>:60s 空闲后线程回收,不浪费资源</li>
 * </ul>
 *
 * <h3>调用链</h3>
 * <pre>
 *   GateGridTradeService.onKline → executor.openLong/openShort (基底双开 + 网格触发)
 *   GateGridTradeService.onPositionUpdate → executor.placeTakeProfit (开仓成交后设止盈)
 *   GateGridTradeService.stopGrid → executor.cancelAllPriceTriggeredOrders
 * </pre>
 *
 * @author Administrator
 */
@Slf4j
public class GateTradeExecutor {
 
    private static final String SETTLE = "usdt";
 
    private final FuturesApi futuresApi;
    private final String contract;
 
    /** 交易线程池:单线程 + 有界队列 + 背压策略 */
    private final ExecutorService executor;
 
    public GateTradeExecutor(ApiClient apiClient, String contract) {
        this.futuresApi = new FuturesApi(apiClient);
        this.contract = contract;
        this.executor = new ThreadPoolExecutor(
                1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(64),
                r -> {
                    Thread t = new Thread(r, "gate-trade-worker");
                    t.setDaemon(true);
                    return t;
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
    }
 
    /**
     * 优雅关闭:等待 10 秒,超时则强制中断。
     */
    public void shutdown() {
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
        }
    }
 
    /**
     * 异步市价开多。quantity 为正数(如 "10")。
     */
    public void openLong(String quantity, Runnable onSuccess, Runnable onFailure) {
        openPosition(quantity, "t-grid-long", "开多", onSuccess, onFailure);
    }
 
    /**
     * 异步市价开空。quantity 为负数(如 "-10")。
     */
    public void openShort(String quantity, Runnable onSuccess, Runnable onFailure) {
        openPosition(quantity, "t-grid-short", "开空", onSuccess, onFailure);
    }
 
    private void openPosition(String size, String text, String label, Runnable onSuccess, Runnable onFailure) {
        executor.execute(() -> {
            try {
                FuturesOrder order = new FuturesOrder();
                order.setContract(contract);
                order.setSize(size);
                order.setPrice("0");
                order.setTif(FuturesOrder.TifEnum.IOC);
                order.setText(text);
                FuturesOrder result = futuresApi.createFuturesOrder(SETTLE, order, null);
                log.info("[TradeExec] {}成功, 价格:{}, id:{}", label, result.getFillPrice(), result.getId());
                if (onSuccess != null) {
                    onSuccess.run();
                }
            } catch (Exception e) {
                log.error("[TradeExec] {}失败", label, e);
                if (onFailure != null) {
                    onFailure.run();
                }
            }
        });
    }
 
    /**
     * 异步创建止盈条件单。
     * <p>使用 Gate 的 PriceTriggeredOrder:服务器监控价格,达到触发价后自动平仓。
     * 如果账户已有同方向同规则的条件单(label=UNIQUE),自动清除后重试一次。
     *
     * @param triggerPrice 触发价格
     * @param rule         触发规则(NUMBER_1: ≥ 触发价,NUMBER_2: ≤ 触发价)
     * @param orderType    stop 类型(close-long-position / close-short-position)
     * @param autoSize     双仓平仓方向(close_long / close_short)
     */
    public void placeTakeProfit(BigDecimal triggerPrice,
                                 FuturesPriceTrigger.RuleEnum rule,
                                 String orderType,
                                 String autoSize) {
        executor.execute(() -> {
            FuturesPriceTriggeredOrder order = buildTriggeredOrder(triggerPrice, rule, orderType, autoSize);
            try {
                TriggerOrderResponse response = futuresApi.createPriceTriggeredOrder(SETTLE, order);
                log.info("[TradeExec] 止盈单已创建, 触发价:{}, 类型:{}, id:{}",
                        triggerPrice, orderType, response.getId());
            } catch (GateApiException e) {
                if ("AUTO_USER_EXIST_POSITION_ORDER".equals(e.getErrorLabel())) {
                    log.warn("[TradeExec] 止盈单已存在,清除旧单后重试");
                    try {
                        futuresApi.cancelPriceTriggeredOrderList(SETTLE, contract);
                        TriggerOrderResponse response = futuresApi.createPriceTriggeredOrder(SETTLE, order);
                        log.info("[TradeExec] 止盈单重试成功, 触发价:{}, id:{}", triggerPrice, response.getId());
                    } catch (Exception retryEx) {
                        log.error("[TradeExec] 止盈单重试失败", retryEx);
                    }
                } else {
                    log.error("[TradeExec] 止盈单创建失败, 触发价:{}", triggerPrice, e);
                }
            } catch (Exception e) {
                log.error("[TradeExec] 止盈单创建失败, 触发价:{}", triggerPrice, e);
            }
        });
    }
 
    /**
     * 异步清除指定合约的所有止盈止损条件单。
     */
    public void cancelAllPriceTriggeredOrders() {
        executor.execute(() -> {
            try {
                futuresApi.cancelPriceTriggeredOrderList(SETTLE, contract);
                log.info("[TradeExec] 已清除所有止盈止损条件单");
            } catch (Exception e) {
                log.error("[TradeExec] 清除止盈止损条件单失败", e);
            }
        });
    }
 
    /**
     * 构建 FuturesPriceTriggeredOrder 对象。
     * <p>策略=0(价格触发),price_type=0(最新价),expiration=0(永不过期),
     * tif=IOC(立即成交或取消),reduce_only=true(只减仓不开新仓)。
     */
    private FuturesPriceTriggeredOrder buildTriggeredOrder(BigDecimal triggerPrice,
                                                            FuturesPriceTrigger.RuleEnum rule,
                                                            String orderType,
                                                            String autoSize) {
        FuturesPriceTrigger trigger = new FuturesPriceTrigger();
        trigger.setStrategyType(FuturesPriceTrigger.StrategyTypeEnum.NUMBER_0);
        trigger.setPriceType(FuturesPriceTrigger.PriceTypeEnum.NUMBER_0);
        trigger.setPrice(triggerPrice.toString());
        trigger.setRule(rule);
        trigger.setExpiration(0);
 
        FuturesInitialOrder initial = new FuturesInitialOrder();
        initial.setContract(contract);
        initial.setSize(0L);
        initial.setPrice("0");
        initial.setTif(FuturesInitialOrder.TifEnum.IOC);
        initial.setReduceOnly(true);
        initial.setAutoSize(autoSize);
 
        FuturesPriceTriggeredOrder order = new FuturesPriceTriggeredOrder();
        order.setTrigger(trigger);
        order.setInitial(initial);
        order.setOrderType(orderType);
        return order;
    }
}