Administrator
4 days ago bfe3af2d95418b326d707834be6c6ba91f86ecb5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
package com.xcong.excoin.modules.gateApi;
 
import io.gate.gateapi.ApiClient;
import io.gate.gateapi.GateApiException;
import io.gate.gateapi.api.FuturesApi;
import io.gate.gateapi.models.FuturesInitialOrder;
import io.gate.gateapi.models.FuturesOrder;
import io.gate.gateapi.models.FuturesPriceTrigger;
import io.gate.gateapi.models.FuturesPriceTriggeredOrder;
import io.gate.gateapi.models.TriggerOrderResponse;
import lombok.extern.slf4j.Slf4j;
 
import java.math.BigDecimal;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
/**
 * Gate REST API 执行器。
 *
 * <h3>设计目的</h3>
 * WebSocket 消息在回调线程中处理(如 {@code WebSocketClient} 的 {@code onMessage} 线程)。
 * 下单 REST API 调用可能耗时数百毫秒,若同步执行会阻塞 WS 回调线程,导致心跳超时误判。
 * 本类将所有 REST 调用提交到独立线程池异步执行。
 *
 * <h3>线程模型</h3>
 * 单线程 ThreadPoolExecutor + 有界队列 64 + CallerRunsPolicy:
 * <ul>
 *   <li><b>单线程</b>:保证下单顺序(开多→开空→止盈单),避免并发竞争</li>
 *   <li><b>有界队列 64</b>:防止堆积。极端行情下最多累积 64 个任务</li>
 *   <li><b>CallerRunsPolicy</b>:队列满时由提交线程直接同步执行,形成自然背压</li>
 *   <li><b>allowCoreThreadTimeOut</b>:60s 空闲后线程回收,不浪费资源</li>
 * </ul>
 *
 * <h3>调用链</h3>
 * <pre>
 *   GateGridTradeService.onKline → executor.openLong/openShort → REST API
 *   GateGridTradeService.onPositionUpdate → executor.openLong/openShort → REST API
 *   (每一次开仓后) → executor.placeTakeProfit → REST API
 * </pre>
 *
 * @author Administrator
 */
@Slf4j
public class GateTradeExecutor {
 
    private static final String SETTLE = "usdt";
 
    private final FuturesApi futuresApi;
    private final String contract;
 
    /** 交易线程池:单线程 + 有界队列 + 背压策略 */
    private final ExecutorService executor;
 
    public GateTradeExecutor(ApiClient apiClient, String contract) {
        this.futuresApi = new FuturesApi(apiClient);
        this.contract = contract;
        this.executor = new ThreadPoolExecutor(
                1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(64),
                r -> {
                    Thread t = new Thread(r, "gate-trade-worker");
                    t.setDaemon(true);
                    return t;
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
    }
 
    /**
     * 优雅关闭:等待 10 秒,超时则强制中断。
     */
    public void shutdown() {
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
        }
    }
 
    /**
     * 异步市价开多。
     * <p>创建 IOC 市价单(price=0),数量为正数。成功后调用 onSuccess 回调。
     *
     * @param quantity  数量(正数,如 "10")
     * @param onSuccess 成功后回调,在交易线程中执行
     */
    public void openLong(String quantity, Runnable onSuccess) {
        executor.execute(() -> {
            try {
                FuturesOrder order = new FuturesOrder();
                order.setContract(contract);
                order.setSize(quantity);
                order.setPrice("0");
                order.setTif(FuturesOrder.TifEnum.IOC);
                order.setText("t-grid-long");
                FuturesOrder result = futuresApi.createFuturesOrder(SETTLE, order, null);
                log.info("[TradeExec] 开多成功, price:{}, id:{}", result.getFillPrice(), result.getId());
                if (onSuccess != null) onSuccess.run();
            } catch (Exception e) {
                log.error("[TradeExec] 开多失败", e);
            }
        });
    }
 
    /**
     * 异步市价开空。
     * <p>创建 IOC 市价单(price=0),size 需为负数。
     *
     * @param negQuantity 负数数量(如 "-10")
     * @param onSuccess   成功后回调
     */
    public void openShort(String negQuantity, Runnable onSuccess) {
        executor.execute(() -> {
            try {
                FuturesOrder order = new FuturesOrder();
                order.setContract(contract);
                order.setSize(negQuantity);
                order.setPrice("0");
                order.setTif(FuturesOrder.TifEnum.IOC);
                order.setText("t-grid-short");
                FuturesOrder result = futuresApi.createFuturesOrder(SETTLE, order, null);
                log.info("[TradeExec] 开空成功, price:{}, id:{}", result.getFillPrice(), result.getId());
                if (onSuccess != null) onSuccess.run();
            } catch (Exception e) {
                log.error("[TradeExec] 开空失败", e);
            }
        });
    }
 
    /**
     * 异步创建止盈条件单。
     * <p>使用 Gate 的 PriceTriggeredOrder:服务器监控价格,达到触发价后自动平仓。
     * 如果账户已有同方向同规则的条件单(label=UNIQUE),自动清除后重试一次。
     *
     * @param triggerPrice 触发价格
     * @param rule         触发规则(NUMBER_1: ≥ 触发价,NUMBER_2: ≤ 触发价)
     * @param orderType    stop 类型(close-long-position / close-short-position)
     * @param autoSize     双仓平仓方向(close_long / close_short)
     */
    public void placeTakeProfit(BigDecimal triggerPrice,
                                 FuturesPriceTrigger.RuleEnum rule,
                                 String orderType,
                                 String autoSize) {
        executor.execute(() -> {
            FuturesPriceTriggeredOrder order = buildTriggeredOrder(triggerPrice, rule, orderType, autoSize);
            try {
                TriggerOrderResponse response = futuresApi.createPriceTriggeredOrder(SETTLE, order);
                log.info("[TradeExec] 止盈单已创建, tp:{}, orderType:{}, id:{}",
                        triggerPrice, orderType, response.getId());
            } catch (GateApiException e) {
                if ("AUTO_USER_EXIST_POSITION_ORDER".equals(e.getErrorLabel())) {
                    log.warn("[TradeExec] 止盈单已存在,清除后重试");
                    try {
                        futuresApi.cancelPriceTriggeredOrderList(SETTLE, contract);
                        TriggerOrderResponse response = futuresApi.createPriceTriggeredOrder(SETTLE, order);
                        log.info("[TradeExec] 止盈单重试成功, tp:{}, id:{}", triggerPrice, response.getId());
                    } catch (Exception retryEx) {
                        log.error("[TradeExec] 止盈单重试失败", retryEx);
                    }
                } else {
                    log.error("[TradeExec] 止盈单创建失败, tp:{}", triggerPrice, e);
                }
            } catch (Exception e) {
                log.error("[TradeExec] 止盈单创建失败, tp:{}", triggerPrice, e);
            }
        });
    }
 
    /**
     * 异步清除指定合约的所有止盈止损条件单。
     */
    public void cancelAllPriceTriggeredOrders() {
        executor.execute(() -> {
            try {
                futuresApi.cancelPriceTriggeredOrderList(SETTLE, contract);
                log.info("[TradeExec] 已清除所有止盈止损单");
            } catch (Exception e) {
                log.error("[TradeExec] 清除止盈止损单失败", e);
            }
        });
    }
 
    /**
     * 构建 FuturesPriceTriggeredOrder 对象。
     * <p>策略=0(价格触发),price_type=0(最新价),expiration=0(永不过期),
     * tif=IOC(立即成交或取消),reduce_only=true(只减仓不开新仓)。
     */
    private FuturesPriceTriggeredOrder buildTriggeredOrder(BigDecimal triggerPrice,
                                                            FuturesPriceTrigger.RuleEnum rule,
                                                            String orderType,
                                                            String autoSize) {
        FuturesPriceTrigger trigger = new FuturesPriceTrigger();
        trigger.setStrategyType(FuturesPriceTrigger.StrategyTypeEnum.NUMBER_0);
        trigger.setPriceType(FuturesPriceTrigger.PriceTypeEnum.NUMBER_0);
        trigger.setPrice(triggerPrice.toString());
        trigger.setRule(rule);
        trigger.setExpiration(0);
 
        FuturesInitialOrder initial = new FuturesInitialOrder();
        initial.setContract(contract);
        initial.setSize(0L);
        initial.setPrice("0");
        initial.setTif(FuturesInitialOrder.TifEnum.IOC);
        initial.setReduceOnly(true);
        initial.setAutoSize(autoSize);
 
        FuturesPriceTriggeredOrder order = new FuturesPriceTriggeredOrder();
        order.setTrigger(trigger);
        order.setInitial(initial);
        order.setOrderType(orderType);
        return order;
    }
}