Administrator
5 days ago 672f2cf6d8d87dffb5713067b3545e74f544cca7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
package com.xcong.excoin.modules.okxApi;
 
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
 
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
 
/**
 * OKX WebSocket 连接管理器 — 双通道架构。
 *
 * <h3>与 Gate 版本的关键区别</h3>
 * OKX 使用<b>两条独立的 WebSocket 连接</b>:
 * <ul>
 *   <li><b>公开 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/public}):
 *       无需认证,订阅 K 线等公开数据。</li>
 *   <li><b>私有 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/private}):
 *       需要登录认证(login 消息),订阅仓位、条件订单等私有数据。</li>
 * </ul>
 * 而 Gate 只有一条 WS 连接,通过签名区分公开/私有频道。
 *
 * <h3>登录认证(私有 WS)</h3>
 * <pre>
 * {
 *   "op": "login",
 *   "args": [{
 *     "apiKey": "...",
 *     "passphrase": "...",
 *     "timestamp": "1734567890",   // Unix 秒级时间戳
 *     "sign": "base64(HMAC-SHA256(timestamp + 'GET' + '/users/self/verify'))"
 *   }]
 * }
 * </pre>
 *
 * <h3>心跳机制</h3>
 * OKX 标准格式为 JSON {@code {"op":"ping"}} / {@code {"op":"pong"}},
 * 同时兼容纯文本 {@code "ping"} / {@code "pong"} 格式。
 *
 * <h3>消息路由</h3>
 * <pre>
 *   onMessage → handleMessage(message, isPrivate):
 *     1. "pong" (纯文本)                            → 日志 + cancelPongTimeout
 *     2. "ping" (纯文本)                            → 回复 "pong"
 *     3. {"op":"pong"} (JSON)                       → 日志 + cancelPongTimeout
 *     4. {"op":"ping"} (JSON)                       → 回复 {"op":"pong"}
 *     5. {"event":"login"}                          → 登录成功 → 订阅所有私有 handlers
 *     6. {"event":"subscribe"}                      → 标记对应 handler subscribed=true
 *     7. {"event":"error"}                          → 错误日志
 *     8. {"arg":{...}, "data":[...]}                → 遍历 handlers 路由
 * </pre>
 *
 * <h3>生命周期</h3>
 * <pre>
 *   init()        → connect(public) + connect(private,true) → startHeartbeat()
 *   destroy()     → unsubscribe 所有 handler → closeBlocking() 两条连接 → shutdown 线程池
 *   onClose()     → reconnectWithBackoff() 重连对应连接(最多 3 次,指数退避)
 * </pre>
 *
 * <h3>线程安全</h3>
 * 连接状态用 AtomicBoolean(isPublicConnected, isPrivateConnected, isConnecting, isInitialized)。
 * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
 *
 * @author Administrator
 */
@SuppressWarnings("ALL")
@Slf4j
public class OkxKlineWebSocketClient {
 
    // ==================== 常量 ====================
 
    /** 心跳超时时间(秒) */
    private static final int HEARTBEAT_TIMEOUT = 10;
 
    /** ISO 8601 时间格式化器(毫秒精度,UTC 时区) */
    private static final DateTimeFormatter ISO_8601_FORMATTER =
            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
                    .withZone(ZoneId.of("UTC"));
 
    // ==================== 配置 ====================
 
    /** OKX 配置(提供 WS URL、API 密钥等) */
    private final OkxConfig config;
 
    /** OKX API Key */
    private final String apiKey;
 
    /** OKX API Secret */
    private final String apiSecret;
 
    /** OKX API Passphrase */
    private final String passphrase;
 
    // ==================== WebSocket 客户端 ====================
 
    /** 公开频道 WebSocket 客户端(K线等) */
    private WebSocketClient publicWsClient;
 
    /** 私有频道 WebSocket 客户端(仓位、条件单等) */
    private WebSocketClient privateWsClient;
 
    /** 心跳检测调度器 */
    private ScheduledExecutorService heartbeatExecutor;
 
    /** 心跳超时 Future */
    private volatile ScheduledFuture<?> pongTimeoutFuture;
 
    /** 最后收到消息的时间戳(毫秒) */
    private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
 
    // ==================== 连接状态 ====================
 
    /** 公开频道连接状态 */
    private final AtomicBoolean isPublicConnected = new AtomicBoolean(false);
 
    /** 私有频道连接状态 */
    private final AtomicBoolean isPrivateConnected = new AtomicBoolean(false);
 
    /** 公开频道连接中标记,防重入 */
    private final AtomicBoolean isPublicConnecting = new AtomicBoolean(false);
 
    /** 私有频道连接中标记,防重入 */
    private final AtomicBoolean isPrivateConnecting = new AtomicBoolean(false);
 
    /** 初始化标记,防重复 init */
    private final AtomicBoolean isInitialized = new AtomicBoolean(false);
 
    /** 私有频道登录成功标记 */
    private final AtomicBoolean isPrivateLoggedIn = new AtomicBoolean(false);
 
    // ==================== 频道处理器 ====================
 
    /** 公开频道处理器列表(如 K线) */
    private final List<OkxChannelHandler> publicHandlers = new ArrayList<>();
 
    /** 私有频道处理器列表(如 仓位、条件单) */
    private final List<OkxChannelHandler> privateHandlers = new ArrayList<>();
 
    // ==================== 异步线程池 ====================
 
    /** 重连等异步任务的缓存线程池(daemon 线程) */
    private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
        Thread t = new Thread(r, "okx-ws-worker");
        t.setDaemon(true);
        return t;
    });
 
    // ==================== 重连配置 ====================
 
    /** 重连最大次数 */
    private static final int MAX_RECONNECT_ATTEMPTS = 3;
 
    /** 重连初始延迟(毫秒) */
    private static final long INITIAL_RECONNECT_DELAY_MS = 5000;
 
    // ==================== 构造器 ====================
 
    /**
     * 构造 OKX WebSocket 客户端。
     *
     * @param config OKX 配置(提供 WS URL、API 密钥等)
     */
    public OkxKlineWebSocketClient(OkxConfig config) {
        this.config = config;
        this.apiKey = config.getApiKey();
        this.apiSecret = config.getApiSecret();
        this.passphrase = config.getPassphrase();
    }
 
    // ==================== Handler 注册 ====================
 
    /**
     * 注册公开频道处理器(如 K线)。需在 init() 前调用。
     *
     * @param handler 实现了 {@link OkxChannelHandler} 接口的公开频道处理器
     */
    public void addPublicHandler(OkxChannelHandler handler) {
        publicHandlers.add(handler);
        log.info("[OKX-WS] 注册公开频道处理器: {}", handler.getChannelName());
    }
 
    /**
     * 注册私有频道处理器(如 仓位、条件单)。需在 init() 前调用。
     *
     * @param handler 实现了 {@link OkxChannelHandler} 接口的私有频道处理器
     */
    public void addPrivateHandler(OkxChannelHandler handler) {
        privateHandlers.add(handler);
        log.info("[OKX-WS] 注册私有频道处理器: {}", handler.getChannelName());
    }
 
    // ==================== 生命周期 ====================
 
    /**
     * 初始化:建立公开 WS 连接 + 私有 WS 连接 → 启动心跳检测。
     * 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。
     */
    public void init() {
        if (!isInitialized.compareAndSet(false, true)) {
            log.warn("[OKX-WS] 已初始化过,跳过重复初始化");
            return;
        }
        connect(false);   // 公开 WS
        connect(true);    // 私有 WS
        startHeartbeat();
    }
 
    /**
     * 销毁:取消所有频道订阅 → 关闭两条 WebSocket 连接 → 关闭线程池。
     *
     * <h3>执行顺序</h3>
     * 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接,
     * 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。
     */
    public void destroy() {
        log.info("[OKX-WS] 开始销毁...");
 
        // 取消公开频道订阅
        if (publicWsClient != null && publicWsClient.isOpen()) {
            for (OkxChannelHandler handler : publicHandlers) {
                handler.unsubscribe(publicWsClient);
            }
        }
        // 取消私有频道订阅
        if (privateWsClient != null && privateWsClient.isOpen()) {
            for (OkxChannelHandler handler : privateHandlers) {
                handler.unsubscribe(privateWsClient);
            }
        }
 
        // 等待取消订阅消息发出
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.warn("[OKX-WS] 取消订阅等待被中断");
        }
 
        // 关闭公开 WS
        closeWebSocket(publicWsClient);
        publicWsClient = null;
 
        // 关闭私有 WS
        closeWebSocket(privateWsClient);
        privateWsClient = null;
 
        // 关闭心跳
        shutdownExecutorGracefully(heartbeatExecutor);
        if (pongTimeoutFuture != null) {
            pongTimeoutFuture.cancel(true);
        }
 
        // 关闭共享线程池
        shutdownExecutorGracefully(sharedExecutor);
 
        log.info("[OKX-WS] 销毁完成");
    }
 
    /**
     * 安全关闭 WebSocket 连接。
     */
    private void closeWebSocket(WebSocketClient ws) {
        if (ws != null && ws.isOpen()) {
            try {
                ws.closeBlocking();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("[OKX-WS] 关闭连接时被中断");
            }
        }
    }
 
    // ==================== 连接管理 ====================
 
    /**
     * 建立 WebSocket 连接。
     *
     * <h3>公开 WS 连接成功回调</h3>
     * 订阅所有公开 handlers(K线等)。
     *
     * <h3>私有 WS 连接成功回调</h3>
     * 先发送 login 认证消息,登录成功后再订阅所有私有 handlers。
     *
     * <h3>连接关闭回调</h3>
     * 设置断连状态 → 异步触发指数退避重连(最多3次)。
     *
     * @param isPrivate true=私有 WS(需登录),false=公开 WS
     */
    private void connect(boolean isPrivate) {
        String wsUrl = isPrivate ? config.getWsPrivateUrl() : config.getWsPublicUrl();
        String label = isPrivate ? "私有" : "公开";
 
        AtomicBoolean connectingFlag = isPrivate ? isPrivateConnecting : isPublicConnecting;
        if (connectingFlag.get() || !connectingFlag.compareAndSet(false, true)) {
            log.info("[OKX-WS] 连接进行中,跳过重复{} WS请求", label);
            return;
        }
        try {
            SSLConfig.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            URI uri = new URI(wsUrl);
 
            WebSocketClient client = new WebSocketClient(uri) {
                @Override
                public void onOpen(ServerHandshake handshake) {
                    log.info("[OKX-WS] {} WS连接成功", label);
                    connectingFlag.set(false);
 
                    if (isPrivate) {
                        isPrivateConnected.set(true);
                        // 私有 WS 需要先登录
                        sendLogin(this);
                    } else {
                        isPublicConnected.set(true);
                        // 公开 WS 直接订阅
                        if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
                            resetHeartbeatTimer();
                            for (OkxChannelHandler handler : publicHandlers) {
                                handler.subscribe(this);
                            }
                        } else {
                            log.warn("[OKX-WS] 应用正在关闭,忽略{} WS连接成功回调", label);
                        }
                    }
                }
 
                @Override
                public void onMessage(String message) {
                    lastMessageTime.set(System.currentTimeMillis());
                    handleMessage(message, isPrivate, this);
                    resetHeartbeatTimer();
                }
 
                @Override
                public void onClose(int code, String reason, boolean remote) {
                    log.warn("[OKX-WS] {} WS连接关闭, code:{}, reason:{}, remote:{}", label, code, reason, remote);
                    if (isPrivate) {
                        isPrivateConnected.set(false);
                        isPrivateLoggedIn.set(false);
                    } else {
                        isPublicConnected.set(false);
                    }
                    connectingFlag.set(false);
                    cancelPongTimeout();
 
                    if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
                        sharedExecutor.execute(() -> {
                            try {
                                reconnectWithBackoff(isPrivate);
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                            } catch (Exception e) {
                                log.error("[OKX-WS] {} WS重连失败", label, e);
                            }
                        });
                    } else {
                        log.warn("[OKX-WS] 线程池已关闭,不执行{} WS重连", label);
                    }
                }
 
                @Override
                public void onError(Exception ex) {
                    log.error("[OKX-WS] {} WS发生错误", label, ex);
                    connectingFlag.set(false);
                    if (isPrivate) {
                        isPrivateConnected.set(false);
                    } else {
                        isPublicConnected.set(false);
                    }
                }
            };
            client.setConnectionLostTimeout(0);
            client.connect();
 
            if (isPrivate) {
                this.privateWsClient = client;
            } else {
                this.publicWsClient = client;
            }
        } catch (URISyntaxException e) {
            log.error("[OKX-WS] URI格式错误: {}", wsUrl, e);
            connectingFlag.set(false);
        }
    }
 
    // ==================== 登录认证 ====================
 
    /**
     * 发送 OKX 私有 WS 登录消息。
     *
     * <h3>签名算法</h3>
     * <pre>
     *   timestamp = ISO 8601 当前时间 (UTC, 毫秒精度)
     *   message   = timestamp + "GET" + "/users/self/verify" + ""
     *   sign      = Base64(HMAC-SHA256(apiSecret, message))
     * </pre>
     *
     * <h3>登录消息格式</h3>
     * <pre>
     * {
     *   "op": "login",
     *   "args": [{
     *     "apiKey": "...",
     *     "passphrase": "...",
     *     "timestamp": "2023-01-01T00:00:00.000Z",
     *     "sign": "..."
     *   }]
     * }
     * </pre>
     *
     * @param ws 私有频道 WebSocket 客户端
     */
    private void sendLogin(WebSocketClient ws) {
        try {
            // OKX WS 登录必须使用 Unix 秒级时间戳(非 ISO 8601!)
            String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
            String message = timestamp + "GET" + "/users/self/verify";
            String sign = hmacSha256Base64(apiSecret, message);
 
            JSONObject loginMsg = new JSONObject();
            loginMsg.put("op", "login");
 
            JSONArray args = new JSONArray();
            JSONObject arg = new JSONObject();
            arg.put("apiKey", apiKey);
            arg.put("passphrase", passphrase);
            arg.put("timestamp", timestamp);
            arg.put("sign", sign);
            args.add(arg);
            loginMsg.put("args", args);
 
            ws.send(loginMsg.toJSONString());
            log.info("[OKX-WS] 发送登录消息, timestamp: {}", timestamp);
        } catch (Exception e) {
            log.error("[OKX-WS] 发送登录消息失败", e);
        }
    }
 
    /**
     * HMAC-SHA256 签名并 Base64 编码。
     *
     * @param secret  密钥
     * @param message 待签名消息
     * @return Base64 编码的签名字符串
     */
    private String hmacSha256Base64(String secret, String message) {
        try {
            Mac mac = Mac.getInstance("HmacSHA256");
            SecretKeySpec spec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
            mac.init(spec);
            byte[] hash = mac.doFinal(message.getBytes(StandardCharsets.UTF_8));
            return Base64.getEncoder().encodeToString(hash);
        } catch (Exception e) {
            log.error("[OKX-WS] HMAC-SHA256签名失败", e);
            return "";
        }
    }
 
    // ==================== 消息路由 ====================
 
    /**
     * 消息分发:先处理系统事件(ping/pong/login/subscribe/error),
     * 再把数据推送路由到对应的 channelHandler。
     *
     * <h3>路由规则</h3>
     * <ol>
     *   <li>"pong" → 日志(忽略)</li>
     *   <li>"ping" → 回复 "pong"</li>
     *   <li>{"event":"login"} → 登录成功 → 订阅所有私有 handlers</li>
     *   <li>{"event":"subscribe"} → 标记对应 handler subscribed=true</li>
     *   <li>{"event":"error"} → 错误日志</li>
     *   <li>{"arg":{...}, "data":[...]} → 遍历 handlers 路由</li>
     * </ol>
     *
     * @param message   原始消息文本
     * @param isPrivate true=私有频道消息,false=公开频道消息
     * @param ws        接收消息的 WebSocket 客户端
     */
    private void handleMessage(String message, boolean isPrivate, WebSocketClient ws) {
        try {
            // OKX ping/pong 混合格式兼容:JSON {"op":"ping"} 与纯文本 "ping" 均支持
            if ("pong".equals(message)) {
                log.debug("[OKX-WS] 收到 pong 响应(纯文本)");
                cancelPongTimeout();
                return;
            }
            if ("ping".equals(message)) {
                log.debug("[OKX-WS] 收到 ping(纯文本),回复 pong");
                if (ws != null && ws.isOpen()) {
                    ws.send("pong");
                }
                return;
            }
 
            JSONObject response = JSON.parseObject(message);
 
            // JSON 格式的 ping/pong (OKX 文档标准格式)
            String op = response.getString("op");
            if ("pong".equals(op)) {
                log.debug("[OKX-WS] 收到 pong 响应");
                cancelPongTimeout();
                return;
            }
            if ("ping".equals(op)) {
                log.debug("[OKX-WS] 收到 ping,回复 pong");
                if (ws != null && ws.isOpen()) {
                    ws.send("{\"op\":\"pong\"}");
                }
                return;
            }
 
            // 登录响应
            String event = response.getString("event");
            if ("login".equals(event)) {
                String code = response.getString("code");
                if ("0".equals(code)) {
                    log.info("[OKX-WS] 私有频道登录成功");
                    isPrivateLoggedIn.set(true);
                    // 登录成功后订阅所有私有频道
                    for (OkxChannelHandler handler : privateHandlers) {
                        handler.subscribe(ws);
                    }
                } else {
                    log.error("[OKX-WS] 私有频道登录失败, code:{}, msg:{}",
                            code, response.getString("msg"));
                }
                return;
            }
 
            // 订阅确认
            if ("subscribe".equals(event)) {
                JSONObject arg = response.getJSONObject("arg");
                if (arg != null) {
                    String channel = arg.getString("channel");
                    log.info("[OKX-WS] 订阅成功: {}", channel);
                    List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
                    for (OkxChannelHandler handler : handlers) {
                        if (channel.equals(handler.getChannelName())) {
                            handler.setSubscribed(true);
                            break;
                        }
                    }
                }
                return;
            }
 
            // 取消订阅确认
            if ("unsubscribe".equals(event)) {
                JSONObject arg = response.getJSONObject("arg");
                log.info("[OKX-WS] 取消订阅成功: {}",
                        arg != null ? arg.getString("channel") : "unknown");
                return;
            }
 
            // 错误
            if ("error".equals(event)) {
                log.error("[OKX-WS] 错误, code:{}, msg:{}",
                        response.getString("code"), response.getString("msg"));
                return;
            }
 
            // 数据推送: {"arg":{"channel":"positions",...}, "data":[...]}
            JSONObject arg = response.getJSONObject("arg");
            if (arg != null && response.getJSONArray("data") != null) {
                String channel = arg.getString("channel");
                if (channel == null) {
                    return;
                }
                List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
                for (OkxChannelHandler handler : handlers) {
                    if (handler.handleMessage(response)) {
                        return;
                    }
                }
            }
        } catch (Exception e) {
            log.error("[OKX-WS] 处理消息失败: {}", message, e);
        }
    }
 
    // ==================== 订阅状态检查 ====================
 
    /**
     * 检查所有已注册的频道是否都已收到订阅成功确认。
     * 同时检查公开和私有频道的 handlers。
     *
     * @return true 如果所有 handlers 都已订阅确认
     */
    public boolean areAllSubscribed() {
        List<OkxChannelHandler> allHandlers = new ArrayList<>();
        allHandlers.addAll(publicHandlers);
        allHandlers.addAll(privateHandlers);
 
        if (allHandlers.isEmpty()) {
            return false;
        }
        for (OkxChannelHandler h : allHandlers) {
            if (!h.isSubscribed()) {
                return false;
            }
        }
        return true;
    }
 
    // ==================== 心跳机制 ====================
 
    /**
     * 启动心跳检测器。
     * 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。
     */
    private void startHeartbeat() {
        if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) {
            heartbeatExecutor.shutdownNow();
        }
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "okx-ws-heartbeat");
            t.setDaemon(true);
            return t;
        });
        heartbeatExecutor.scheduleWithFixedDelay(
                this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
    }
 
    /**
     * 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。
     */
    private synchronized void resetHeartbeatTimer() {
        cancelPongTimeout();
        if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
            pongTimeoutFuture = heartbeatExecutor.schedule(
                    this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
        }
    }
 
    /**
     * 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 ping。
     * OKX 服务端收到 ping 后会回复 pong。
     */
    private void checkHeartbeatTimeout() {
        boolean isAnyConnected = isPublicConnected.get() || isPrivateConnected.get();
        if (!isAnyConnected) {
            return;
        }
        long elapsed = System.currentTimeMillis() - lastMessageTime.get();
        if (elapsed >= HEARTBEAT_TIMEOUT * 1000L) {
            log.debug("[OKX-WS] 心跳超时 {}ms, 主动发送ping", elapsed);
            sendPing();
        }
    }
 
    /**
     * 向两条 WS 连接主动发送 ping(OKX 文档标准 JSON 格式)。
     */
    private void sendPing() {
        try {
            String pingMsg = "{\"op\":\"ping\"}";
            if (publicWsClient != null && publicWsClient.isOpen()) {
                publicWsClient.send(pingMsg);
            }
            if (privateWsClient != null && privateWsClient.isOpen()) {
                privateWsClient.send(pingMsg);
            }
        } catch (Exception e) {
            log.warn("[OKX-WS] 发送ping失败", e);
        }
    }
 
    /**
     * 取消心跳超时检测任务。
     */
    private synchronized void cancelPongTimeout() {
        if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) {
            pongTimeoutFuture.cancel(true);
        }
    }
 
    // ==================== 重连机制 ====================
 
    /**
     * 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。
     *
     * @param isPrivate true=重连私有 WS,false=重连公开 WS
     * @throws InterruptedException 线程被中断
     */
    private void reconnectWithBackoff(boolean isPrivate) throws InterruptedException {
        String label = isPrivate ? "私有" : "公开";
        int attempt = 0;
        long delayMs = INITIAL_RECONNECT_DELAY_MS;
 
        while (attempt < MAX_RECONNECT_ATTEMPTS) {
            try {
                Thread.sleep(delayMs);
                connect(isPrivate);
                log.info("[OKX-WS] {} WS第{}次重连成功", label, attempt + 1);
                return;
            } catch (Exception e) {
                log.warn("[OKX-WS] {} WS第{}次重连失败", label, attempt + 1, e);
                delayMs *= 2;
                attempt++;
            }
        }
        log.error("[OKX-WS] {} WS超过最大重试次数({}),放弃重连", label, MAX_RECONNECT_ATTEMPTS);
    }
 
    // ==================== 工具方法 ====================
 
    /**
     * 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。
     *
     * @param executor 需要关闭的线程池
     */
    private void shutdownExecutorGracefully(ExecutorService executor) {
        if (executor == null || executor.isTerminated()) {
            return;
        }
        try {
            executor.shutdown();
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
        }
    }
 
    // ==================== 状态查询 ====================
 
    /** @return 公开 WS 是否已连接 */
    public boolean isPublicConnected() {
        return isPublicConnected.get();
    }
 
    /** @return 私有 WS 是否已连接并登录成功 */
    public boolean isPrivateConnected() {
        return isPrivateConnected.get() && isPrivateLoggedIn.get();
    }
}