Administrator
2026-06-08 b12289898dcc3b61769420df7c43c4eb073c5d57
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package com.xcong.excoin.modules.gateApi.wsHandler.handler;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.blackchain.service.DateUtil;
import com.xcong.excoin.modules.gateApi.GateGridTradeService;
import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
 
import java.math.BigDecimal;
 
/**
 * K 线频道处理器(futures.candlesticks)— 策略的唯一价格时间驱动源。
 *
 * <h3>定位</h3>
 * 订阅 1 分钟 K 线实时推送,每收到一根 K 线(不等待完结)即触发
 * {@link GateGridTradeService#onKline(BigDecimal)},由策略引擎决定是否开仓/止盈。
 *
 * <h3>订阅格式</h3>
 * 公开频道,无需认证签名。payload: {@code ["1m", contract]}。
 *
 * @author Administrator
 */
@Slf4j
public class CandlestickChannelHandler implements GateChannelHandler {
 
    private static final String CHANNEL_NAME = "futures.candlesticks";
    /** K 线周期,固定 1 分钟 */
    private static final String INTERVAL = "1m";
 
    /** 合约名称 */
    private final String contract;
    /** 网格交易服务,接收 K 线回调 */
    private final GateGridTradeService gridTradeService;
 
    private volatile boolean subscribed = false;
 
    /**
      * @param contract         合约名称(如 ETH_USDT)
      * @param gridTradeService 网格交易策略服务实例
      */
    public CandlestickChannelHandler(String contract, GateGridTradeService gridTradeService) {
        this.contract = contract;
        this.gridTradeService = gridTradeService;
    }
 
    /** @return 频道名称 "futures.candlesticks" */
    @Override
    public String getChannelName() { return CHANNEL_NAME; }
 
    /**
     * 发送 K 线频道订阅请求(公开频道,无需签名)。
     *
     * <h3>订阅格式</h3>
     * <pre>
     * {
     *   "time":    &lt;unix时间戳(秒)&gt;,
     *   "channel": "futures.candlesticks",
     *   "event":   "subscribe",
     *   "payload": ["1m", "{contract}"]
     * }
     * </pre>
     */
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("time", System.currentTimeMillis() / 1000);
        msg.put("channel", CHANNEL_NAME);
        msg.put("event", "subscribe");
        JSONArray payload = new JSONArray();
        payload.add(INTERVAL);
        payload.add(contract);
        msg.put("payload", payload);
        ws.send(msg.toJSONString());
        log.info("[{}] 订阅成功, 合约:{}, 周期:{}", CHANNEL_NAME, contract, INTERVAL);
    }
 
    /**
     * 发送 K 线频道取消订阅请求。
     */
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("time", System.currentTimeMillis() / 1000);
        msg.put("channel", CHANNEL_NAME);
        msg.put("event", "unsubscribe");
        JSONArray payload = new JSONArray();
        payload.add(INTERVAL);
        payload.add(contract);
        msg.put("payload", payload);
        ws.send(msg.toJSONString());
        log.info("[{}] 取消订阅成功", CHANNEL_NAME);
    }
 
    /**
     * 处理 K 线推送消息。
     *
     * <h3>数据提取</h3>
     * result[0] 中提取:
     * <ul>
     *   <li>c(close):收盘价 → 传给 gridTradeService.onKline()</li>
     *   <li>n(name):烛线名称(如 "1m_ETH_USDT")</li>
     *   <li>t(time):烛线起始时间戳</li>
     *   <li>w(window_close):烛线是否完结(仅日志输出,不做门控)</li>
     * </ul>
     *
     * <h3>注意</h3>
     * 不判断 w(已完结)——策略需要 tick 级实时响应价格变动,
     * 而非等 1 分钟烛线完结后才行动。
     *
     * @param response WebSocket 推送的完整 JSON
     * @return true 表示已处理(匹配成功)
     */
    @Override
    public boolean handleMessage(JSONObject response) {
        if (!CHANNEL_NAME.equals(response.getString("channel"))) {
            return false;
        }
        try {
            JSONArray resultArray = response.getJSONArray("result");
            if (resultArray == null || resultArray.isEmpty()) { log.warn("[{}] 数据为空", CHANNEL_NAME); return true; }
            JSONObject data = resultArray.getJSONObject(0);
            BigDecimal closePx = new BigDecimal(data.getString("c"));
 
//            log.info("========== Gate K线数据 ==========");
//            log.info("名称: {} 时间: {}", data.getString("n"), DateUtil.TimeStampToDateTime(data.getLong("t")));
//            log.info("收盘: {} 已完结: {}",data.getString("c"),data.getBooleanValue("w"));
//            log.info("==================================");
 
            if (gridTradeService != null) {
                gridTradeService.onKline(closePx);
            }
        } catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); }
        return true;
    }
 
    @Override
    public boolean isSubscribed() { return subscribed; }
 
    @Override
    public void setSubscribed(boolean subscribed) { this.subscribed = subscribed; }
}