Administrator
5 days ago 83017f78860526483a24e89052534222fd2e6466
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package com.xcong.excoin.modules.okxApi.wsHandler.handler;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.OkxGridTradeService;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
 
import java.math.BigDecimal;
 
/**
 * OKX K线频道处理器(candle1m)— 策略的唯一价格时间驱动源。
 *
 * <h3>定位</h3>
 * 这是一个<b>公开频道</b>,连接到 OKX 公开 WebSocket 端点,
 * 无需登录认证。订阅 1 分钟 K 线实时推送,每收到一根 K 线即触发
 * {@link OkxGridTradeService#onKline(BigDecimal)},由策略引擎决定是否开仓/止盈。
 *
 * <h3>订阅格式</h3>
 * <pre>
 * {"op":"subscribe","args":[{"channel":"candle1m","instId":"ETH-USDT-SWAP"}]}
 * </pre>
 *
 * <h3>数据推送格式</h3>
 * <pre>
 * {
 *   "arg": {"channel":"candle1m","instId":"ETH-USDT-SWAP"},
 *   "data": [
 *     ["timestamp","open","high","low","close","vol","volCcy","volCcyQuote","confirm"]
 *   ]
 * }
 * </pre>
 *
 * <h3>字段说明</h3>
 * <table>
 *   <tr><th>索引</th><th>字段</th><th>含义</th></tr>
 *   <tr><td>0</td><td>ts</td><td>K线起始时间(Unix ms)</td></tr>
 *   <tr><td>1</td><td>o</td><td>开盘价</td></tr>
 *   <tr><td>2</td><td>h</td><td>最高价</td></tr>
 *   <tr><td>3</td><td>l</td><td>最低价</td></tr>
 *   <tr><td>4</td><td>c</td><td><b>收盘价</b>(用于驱动策略)</td></tr>
 *   <tr><td>5</td><td>vol</td><td>成交量(张)</td></tr>
 *   <tr><td>6</td><td>volCcy</td><td>成交量(币)</td></tr>
 *   <tr><td>7</td><td>volCcyQuote</td><td>成交量(USDT)</td></tr>
 *   <tr><td>8</td><td>confirm</td><td>K线状态("0"=未完结,"1"=已完结)</td></tr>
 * </table>
 *
 * <h3>注意</h3>
 * 不判断 confirm 字段(K线是否完结)——策略需要 tick 级实时响应价格变动,
 * 而非等 1 分钟烛线完结后才行动。
 *
 * @author Administrator
 */
@Slf4j
public class CandlestickOkxChannelHandler implements OkxChannelHandler {
 
    /** 频道名称 */
    private static final String CHANNEL_NAME = "candle1m";
 
    /** 交易对标识,如 "ETH-USDT-SWAP" */
    private final String instId;
 
    /** 网格交易服务,接收 K 线回调 */
    private final OkxGridTradeService gridTradeService;
 
    /** 订阅确认状态 */
    private volatile boolean subscribed = false;
 
    /**
     * 构造 K 线频道处理器。
     *
     * @param instId           交易对标识(如 "ETH-USDT-SWAP")
     * @param gridTradeService OKX 网格交易策略服务实例
     */
    public CandlestickOkxChannelHandler(String instId, OkxGridTradeService gridTradeService) {
        this.instId = instId;
        this.gridTradeService = gridTradeService;
    }
 
    /**
     * @return 频道名称 "candle1m"
     */
    @Override
    public String getChannelName() {
        return CHANNEL_NAME;
    }
 
    /**
     * @return 交易对标识(如 "ETH-USDT-SWAP")
     */
    @Override
    public String getInstId() {
        return instId;
    }
 
    /**
     * 发送 K 线频道订阅请求(公开频道,无需签名)。
     *
     * <h3>订阅格式</h3>
     * <pre>
     * {"op":"subscribe","args":[{"channel":"candle1m","instId":"ETH-USDT-SWAP"}]}
     * </pre>
     *
     * @param ws OKX 公开 WebSocket 客户端
     */
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "subscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL_NAME);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[OKX-WS] 订阅K线频道, instId: {}", instId);
    }
 
    /**
     * 发送 K 线频道取消订阅请求。
     *
     * @param ws OKX 公开 WebSocket 客户端
     */
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "unsubscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL_NAME);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[OKX-WS] 取消订阅K线频道, instId: {}", instId);
    }
 
    /**
     * 处理 K 线推送消息。
     *
     * <h3>数据提取</h3>
     * 从 {@code data[0]} 数组中提取索引 4(收盘价 close),
     * 传给 {@code gridTradeService.onKline(closePrice)}。
     *
     * <h3>OKX 数据格式</h3>
     * data 是一个二维数组,第一层是 K 线条数(通常 1 条),
     * 第二层是各字段值。data[0][4] = 收盘价。
     *
     * @param response WebSocket 推送的完整 JSON
     * @return true 表示已处理(匹配成功)
     */
    @Override
    public boolean handleMessage(JSONObject response) {
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null || !CHANNEL_NAME.equals(arg.getString("channel"))) {
            return false;
        }
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray == null || dataArray.isEmpty()) {
                log.debug("[OKX-WS] candle1m 数据为空");
                return true;
            }
            // data[0] 是一个数组: [ts, o, h, l, c, vol, volCcy, volCcyQuote, confirm]
            JSONArray candle = dataArray.getJSONArray(0);
            if (candle == null || candle.size() < 5) {
                log.warn("[OKX-WS] candle1m 数据格式异常: {}", dataArray);
                return true;
            }
            // 索引 4 = 收盘价 close
            BigDecimal closePrice = candle.getBigDecimal(4);
 
            if (gridTradeService != null && closePrice != null) {
                gridTradeService.onKline(closePrice);
            }
        } catch (Exception e) {
            log.error("[OKX-WS] 处理 candle1m 数据失败", e);
        }
        return true;
    }
 
    // ==================== 订阅状态 ====================
 
    @Override
    public boolean isSubscribed() {
        return subscribed;
    }
 
    @Override
    public void setSubscribed(boolean subscribed) {
        this.subscribed = subscribed;
    }
}