Administrator
5 days ago 3f2a5a15d15052833f1eb864d9f04bc02ecd6cc0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package com.xcong.excoin.modules.okxApi.wsHandler.handler;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.OkxGridTradeService;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
 
import java.math.BigDecimal;
 
/**
 * OKX 标记价格频道处理器(mark-price)— 策略的价格驱动源。
 *
 * <h3>定位</h3>
 * 这是一个<b>公开频道</b>,连接到 OKX 公开 WebSocket 端点,无需登录认证。
 * 替代 candle1m K 线频道,作为策略唯一的实时价格来源。
 * 标记价格变化时每 200ms 推送一次,无变化时每 10s 推送一次。
 *
 * <h3>双回调</h3>
 * <ul>
 *   <li>{@link OkxGridTradeService#onKline(BigDecimal)} — 驱动网格策略(处理开仓/止盈逻辑)</li>
 *   <li>{@link OkxGridTradeService#setMarkPrice(BigDecimal)} — PnLPriceMode.MARK_PRICE 计算未实现盈亏</li>
 * </ul>
 *
 * <h3>订阅格式</h3>
 * <pre>
 * {"op":"subscribe","args":[{"channel":"mark-price","instId":"ETH-USDT-SWAP"}]}
 * </pre>
 *
 * <h3>数据推送格式</h3>
 * <pre>
 * {
 *   "arg": {"channel":"mark-price","instId":"ETH-USDT-SWAP"},
 *   "data": [{
 *     "instType": "SWAP",
 *     "instId": "ETH-USDT-SWAP",
 *     "markPx": "42310.6",
 *     "ts": "1630049139746"
 *   }]
 * }
 * </pre>
 *
 * <h3>推送频率</h3>
 * <ul>
 *   <li>标记价格变化 → 每 200ms 推送一次</li>
 *   <li>标记价格无变化 → 每 10s 推送一次</li>
 * </ul>
 *
 * @author Administrator
 */
@Slf4j
public class MarkPriceOkxChannelHandler implements OkxChannelHandler {
 
    /** 频道名称 */
    private static final String CHANNEL_NAME = "mark-price";
 
    /** 交易对标识,如 "ETH-USDT-SWAP" */
    private final String instId;
 
    /** 网格交易服务,接收标记价格回调 */
    private final OkxGridTradeService gridTradeService;
 
    /** 订阅确认状态 */
    private volatile boolean subscribed = false;
 
    /**
     * 构造标记价格频道处理器。
     *
     * @param instId           交易对标识(如 "ETH-USDT-SWAP")
     * @param gridTradeService OKX 网格交易策略服务实例
     */
    public MarkPriceOkxChannelHandler(String instId, OkxGridTradeService gridTradeService) {
        this.instId = instId;
        this.gridTradeService = gridTradeService;
    }
 
    /**
     * @return 频道名称 "mark-price"
     */
    @Override
    public String getChannelName() {
        return CHANNEL_NAME;
    }
 
    /**
     * @return 交易对标识(如 "ETH-USDT-SWAP")
     */
    @Override
    public String getInstId() {
        return instId;
    }
 
    /**
     * 发送标记价格频道订阅请求(公开频道,无需签名)。
     *
     * @param ws OKX 公开 WebSocket 客户端
     */
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "subscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL_NAME);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[OKX-WS] 订阅标记价格频道, instId: {}", instId);
    }
 
    /**
     * 发送标记价格频道取消订阅请求。
     *
     * @param ws OKX 公开 WebSocket 客户端
     */
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "unsubscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL_NAME);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[OKX-WS] 取消订阅标记价格频道, instId: {}", instId);
    }
 
    /**
     * 处理标记价格推送消息。
     *
     * <h3>数据提取</h3>
     * 从 {@code data[0].markPx} 提取标记价格,同时回调:
     * <ol>
     *   <li>{@code gridTradeService.onKline(markPrice)} — 驱动网格策略</li>
     *   <li>{@code gridTradeService.setMarkPrice(markPrice)} — PnL 计算用</li>
     * </ol>
     *
     * @param response WebSocket 推送的完整 JSON
     * @return true 表示已处理(匹配成功)
     */
    @Override
    public boolean handleMessage(JSONObject response) {
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null || !CHANNEL_NAME.equals(arg.getString("channel"))) {
            return false;
        }
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray == null || dataArray.isEmpty()) {
                return true;
            }
            // data[0] 是一个 JSONObject: {instType, instId, markPx, ts}
            JSONObject markData = dataArray.getJSONObject(0);
            if (markData == null) {
                return true;
            }
            String markPxStr = markData.getString("markPx");
            if (markPxStr == null || markPxStr.isEmpty()) {
                return true;
            }
            BigDecimal markPrice = new BigDecimal(markPxStr);
 
            if (gridTradeService != null) {
                gridTradeService.setMarkPrice(markPrice);
                gridTradeService.onKline(markPrice);
            }
        } catch (Exception e) {
            log.error("[OKX-WS] 处理 mark-price 数据失败", e);
        }
        return true;
    }
 
    // ==================== 订阅状态 ====================
 
    @Override
    public boolean isSubscribed() {
        return subscribed;
    }
 
    @Override
    public void setSubscribed(boolean subscribed) {
        this.subscribed = subscribed;
    }
}