Administrator
119 mins ago e9a397babbbfa9cff8a5ed026447d585e739c37f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package com.xcong.excoin.modules.gateApi.wsHandler.handler;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.blackchain.service.DateUtil;
import com.xcong.excoin.modules.gateApi.GateGridTradeService;
import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
 
import java.math.BigDecimal;
 
/**
 * K 线(Candlestick)频道处理器。
 *
 * <h3>特点</h3>
 * 公开频道,无需 HMAC-SHA512 认证签名。订阅 1m 周期的 K 线数据。
 *
 * <h3>数据流</h3>
 * <pre>
 *   WebSocket 推送 update event
 *     → handleMessage() → 解析 OHLCV → log 打印 → gridTradeService.onKline(closePx)
 *       → WAITING_KLINE: 首次 K 线触发基底双开
 *       → ACTIVE: 驱动 processShortGrid + processLongGrid 网格触发
 * </pre>
 *
 * <h3>订阅格式</h3>
 * payload: {@code ["1m", contract]}
 *
 * @author Administrator
 */
@Slf4j
public class CandlestickChannelHandler implements GateChannelHandler {
 
    private static final String CHANNEL_NAME = "futures.candlesticks";
    /** K 线周期,固定 1 分钟 */
    private static final String INTERVAL = "1m";
 
    /** 合约名称 */
    private final String contract;
    /** 网格交易服务,接收 K 线回调 */
    private final GateGridTradeService gridTradeService;
 
    /**
      * @param contract         合约名称(如 ETH_USDT)
      * @param gridTradeService 网格交易策略服务实例
      */
    public CandlestickChannelHandler(String contract, GateGridTradeService gridTradeService) {
        this.contract = contract;
        this.gridTradeService = gridTradeService;
    }
 
    /** @return 频道名称 "futures.candlesticks" */
    @Override
    public String getChannelName() { return CHANNEL_NAME; }
 
    /**
     * 发送 K 线频道订阅请求(公开频道,无需签名)。
     *
     * <h3>订阅格式</h3>
     * <pre>
     * {
     *   "time":    &lt;unix时间戳(秒)&gt;,
     *   "channel": "futures.candlesticks",
     *   "event":   "subscribe",
     *   "payload": ["1m", "{contract}"]
     * }
     * </pre>
     */
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("time", System.currentTimeMillis() / 1000);
        msg.put("channel", CHANNEL_NAME);
        msg.put("event", "subscribe");
        JSONArray payload = new JSONArray();
        payload.add(INTERVAL);
        payload.add(contract);
        msg.put("payload", payload);
        ws.send(msg.toJSONString());
        log.info("[{}] 订阅成功, 合约:{}, 周期:{}", CHANNEL_NAME, contract, INTERVAL);
    }
 
    /**
     * 发送 K 线频道取消订阅请求。
     */
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("time", System.currentTimeMillis() / 1000);
        msg.put("channel", CHANNEL_NAME);
        msg.put("event", "unsubscribe");
        JSONArray payload = new JSONArray();
        payload.add(INTERVAL);
        payload.add(contract);
        msg.put("payload", payload);
        ws.send(msg.toJSONString());
        log.info("[{}] 取消订阅成功", CHANNEL_NAME);
    }
 
    /**
     * 处理 K 线推送消息。
     *
     * <h3>数据提取</h3>
     * result[0] 中提取:
     * <ul>
     *   <li>c(close):收盘价 → 传给 gridTradeService.onKline()</li>
     *   <li>n(name):烛线名称(如 "1m_ETH_USDT")</li>
     *   <li>t(time):烛线起始时间戳</li>
     *   <li>w(window_close):烛线是否完结(仅日志输出,不做门控)</li>
     * </ul>
     *
     * <h3>注意</h3>
     * 不判断 w(已完结)——策略需要 tick 级实时响应价格变动,
     * 而非等 1 分钟烛线完结后才行动。
     *
     * @param response WebSocket 推送的完整 JSON
     * @return true 表示已处理(匹配成功)
     */
    @Override
    public boolean handleMessage(JSONObject response) {
        if (!CHANNEL_NAME.equals(response.getString("channel"))) {
            return false;
        }
        try {
            JSONArray resultArray = response.getJSONArray("result");
            if (resultArray == null || resultArray.isEmpty()) { log.warn("[{}] 数据为空", CHANNEL_NAME); return true; }
            JSONObject data = resultArray.getJSONObject(0);
            BigDecimal closePx = new BigDecimal(data.getString("c"));
 
            log.info("========== Gate K线数据 ==========");
            log.info("名称: {} 时间: {}", data.getString("n"), DateUtil.TimeStampToDateTime(data.getLong("t")));
            log.info("收盘: {} 已完结: {}",data.getString("c"),data.getBooleanValue("w"));
            log.info("==================================");
 
            if (gridTradeService != null) {
                gridTradeService.onKline(closePx);
            }
        } catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); }
        return true;
    }
}