From e45e705c22df5bc979e72db6014dd1ff9637be42 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Wed, 24 Jun 2026 22:21:58 +0800
Subject: [PATCH] fix(okx): 修复网格交易成交日志记录问题

---
 src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java |  753 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 753 insertions(+), 0 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
new file mode 100644
index 0000000..1bd14b6
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
@@ -0,0 +1,753 @@
+package com.xcong.excoin.modules.okxApi;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * OKX WebSocket 连接管理器 — 双通道架构。
+ *
+ * <h3>与 Gate 版本的关键区别</h3>
+ * OKX 使用<b>两条独立的 WebSocket 连接</b>:
+ * <ul>
+ *   <li><b>公开 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/public}):
+ *       无需认证,订阅 K 线等公开数据。</li>
+ *   <li><b>私有 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/private}):
+ *       需要登录认证(login 消息),订阅仓位、条件订单等私有数据。</li>
+ * </ul>
+ * 而 Gate 只有一条 WS 连接,通过签名区分公开/私有频道。
+ *
+ * <h3>登录认证(私有 WS)</h3>
+ * <pre>
+ * {
+ *   "op": "login",
+ *   "args": [{
+ *     "apiKey": "...",
+ *     "passphrase": "...",
+ *     "timestamp": "1734567890",   // Unix 秒级时间戳
+ *     "sign": "base64(HMAC-SHA256(timestamp + 'GET' + '/users/self/verify'))"
+ *   }]
+ * }
+ * </pre>
+ *
+ * <h3>心跳机制</h3>
+ * OKX 标准格式为 JSON {@code {"op":"ping"}} / {@code {"op":"pong"}},
+ * 同时兼容纯文本 {@code "ping"} / {@code "pong"} 格式。
+ *
+ * <h3>消息路由</h3>
+ * <pre>
+ *   onMessage → handleMessage(message, isPrivate):
+ *     1. "pong" (纯文本)                            → 日志 + cancelPongTimeout
+ *     2. "ping" (纯文本)                            → 回复 "pong"
+ *     3. {"op":"pong"} (JSON)                       → 日志 + cancelPongTimeout
+ *     4. {"op":"ping"} (JSON)                       → 回复 {"op":"pong"}
+ *     5. {"event":"login"}                          → 登录成功 → 订阅所有私有 handlers
+ *     6. {"event":"subscribe"}                      → 标记对应 handler subscribed=true
+ *     7. {"event":"error"}                          → 错误日志
+ *     8. {"arg":{...}, "data":[...]}                → 遍历 handlers 路由
+ * </pre>
+ *
+ * <h3>生命周期</h3>
+ * <pre>
+ *   init()        → connect(public) + connect(private,true) → startHeartbeat()
+ *   destroy()     → unsubscribe 所有 handler → closeBlocking() 两条连接 → shutdown 线程池
+ *   onClose()     → reconnectWithBackoff() 重连对应连接(最多 3 次,指数退避)
+ * </pre>
+ *
+ * <h3>线程安全</h3>
+ * 连接状态用 AtomicBoolean(isPublicConnected, isPrivateConnected, isConnecting, isInitialized)。
+ * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
+ *
+ * @author Administrator
+ */
+@SuppressWarnings("ALL")
+@Slf4j
+public class OkxKlineWebSocketClient {
+
+    // ==================== 常量 ====================
+
+    /** 心跳超时时间(秒) */
+    private static final int HEARTBEAT_TIMEOUT = 10;
+
+    /** ISO 8601 时间格式化器(毫秒精度,UTC 时区) */
+    private static final DateTimeFormatter ISO_8601_FORMATTER =
+            DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+                    .withZone(ZoneId.of("UTC"));
+
+    // ==================== 配置 ====================
+
+    /** OKX 配置(提供 WS URL、API 密钥等) */
+    private final OkxConfig config;
+
+    /** OKX API Key */
+    private final String apiKey;
+
+    /** OKX API Secret */
+    private final String apiSecret;
+
+    /** OKX API Passphrase */
+    private final String passphrase;
+
+    // ==================== WebSocket 客户端 ====================
+
+    /** 公开频道 WebSocket 客户端(K线等) */
+    private WebSocketClient publicWsClient;
+
+    /** 私有频道 WebSocket 客户端(仓位、条件单等) */
+    private WebSocketClient privateWsClient;
+
+    /** 心跳检测调度器 */
+    private ScheduledExecutorService heartbeatExecutor;
+
+    /** 心跳超时 Future */
+    private volatile ScheduledFuture<?> pongTimeoutFuture;
+
+    /** 最后收到消息的时间戳(毫秒) */
+    private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
+
+    // ==================== 连接状态 ====================
+
+    /** 公开频道连接状态 */
+    private final AtomicBoolean isPublicConnected = new AtomicBoolean(false);
+
+    /** 私有频道连接状态 */
+    private final AtomicBoolean isPrivateConnected = new AtomicBoolean(false);
+
+    /** 公开频道连接中标记,防重入 */
+    private final AtomicBoolean isPublicConnecting = new AtomicBoolean(false);
+
+    /** 私有频道连接中标记,防重入 */
+    private final AtomicBoolean isPrivateConnecting = new AtomicBoolean(false);
+
+    /** 初始化标记,防重复 init */
+    private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+    /** 私有频道登录成功标记 */
+    private final AtomicBoolean isPrivateLoggedIn = new AtomicBoolean(false);
+
+    // ==================== 频道处理器 ====================
+
+    /** 公开频道处理器列表(如 K线) */
+    private final List<OkxChannelHandler> publicHandlers = new ArrayList<>();
+
+    /** 私有频道处理器列表(如 仓位、条件单) */
+    private final List<OkxChannelHandler> privateHandlers = new ArrayList<>();
+
+    // ==================== 异步线程池 ====================
+
+    /** 重连等异步任务的缓存线程池(daemon 线程) */
+    private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
+        Thread t = new Thread(r, "okx-ws-worker");
+        t.setDaemon(true);
+        return t;
+    });
+
+    // ==================== 重连配置 ====================
+
+    /** 重连最大次数 */
+    private static final int MAX_RECONNECT_ATTEMPTS = 3;
+
+    /** 重连初始延迟(毫秒) */
+    private static final long INITIAL_RECONNECT_DELAY_MS = 5000;
+
+    // ==================== 构造器 ====================
+
+    /**
+     * 构造 OKX WebSocket 客户端。
+     *
+     * @param config OKX 配置(提供 WS URL、API 密钥等)
+     */
+    public OkxKlineWebSocketClient(OkxConfig config) {
+        this.config = config;
+        this.apiKey = config.getApiKey();
+        this.apiSecret = config.getApiSecret();
+        this.passphrase = config.getPassphrase();
+    }
+
+    // ==================== Handler 注册 ====================
+
+    /**
+     * 注册公开频道处理器(如 K线)。需在 init() 前调用。
+     *
+     * @param handler 实现了 {@link OkxChannelHandler} 接口的公开频道处理器
+     */
+    public void addPublicHandler(OkxChannelHandler handler) {
+        publicHandlers.add(handler);
+        log.info("[OKX-WS] 注册公开频道处理器: {}", handler.getChannelName());
+    }
+
+    /**
+     * 注册私有频道处理器(如 仓位、条件单)。需在 init() 前调用。
+     *
+     * @param handler 实现了 {@link OkxChannelHandler} 接口的私有频道处理器
+     */
+    public void addPrivateHandler(OkxChannelHandler handler) {
+        privateHandlers.add(handler);
+        log.info("[OKX-WS] 注册私有频道处理器: {}", handler.getChannelName());
+    }
+
+    // ==================== 生命周期 ====================
+
+    /**
+     * 初始化:建立公开 WS 连接 + 私有 WS 连接 → 启动心跳检测。
+     * 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。
+     */
+    public void init() {
+        if (!isInitialized.compareAndSet(false, true)) {
+            log.warn("[OKX-WS] 已初始化过,跳过重复初始化");
+            return;
+        }
+        connect(false);   // 公开 WS
+        connect(true);    // 私有 WS
+        startHeartbeat();
+    }
+
+    /**
+     * 销毁:取消所有频道订阅 → 关闭两条 WebSocket 连接 → 关闭线程池。
+     *
+     * <h3>执行顺序</h3>
+     * 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接,
+     * 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。
+     */
+    public void destroy() {
+        log.info("[OKX-WS] 开始销毁...");
+
+        // 取消公开频道订阅
+        if (publicWsClient != null && publicWsClient.isOpen()) {
+            for (OkxChannelHandler handler : publicHandlers) {
+                handler.unsubscribe(publicWsClient);
+            }
+        }
+        // 取消私有频道订阅
+        if (privateWsClient != null && privateWsClient.isOpen()) {
+            for (OkxChannelHandler handler : privateHandlers) {
+                handler.unsubscribe(privateWsClient);
+            }
+        }
+
+        // 等待取消订阅消息发出
+        try {
+            Thread.sleep(500);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("[OKX-WS] 取消订阅等待被中断");
+        }
+
+        // 关闭公开 WS
+        closeWebSocket(publicWsClient);
+        publicWsClient = null;
+
+        // 关闭私有 WS
+        closeWebSocket(privateWsClient);
+        privateWsClient = null;
+
+        // 关闭心跳
+        shutdownExecutorGracefully(heartbeatExecutor);
+        if (pongTimeoutFuture != null) {
+            pongTimeoutFuture.cancel(true);
+        }
+
+        // 关闭共享线程池
+        shutdownExecutorGracefully(sharedExecutor);
+
+        log.info("[OKX-WS] 销毁完成");
+    }
+
+    /**
+     * 安全关闭 WebSocket 连接。
+     */
+    private void closeWebSocket(WebSocketClient ws) {
+        if (ws != null && ws.isOpen()) {
+            try {
+                ws.closeBlocking();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                log.warn("[OKX-WS] 关闭连接时被中断");
+            }
+        }
+    }
+
+    // ==================== 连接管理 ====================
+
+    /**
+     * 建立 WebSocket 连接。
+     *
+     * <h3>公开 WS 连接成功回调</h3>
+     * 订阅所有公开 handlers(K线等)。
+     *
+     * <h3>私有 WS 连接成功回调</h3>
+     * 先发送 login 认证消息,登录成功后再订阅所有私有 handlers。
+     *
+     * <h3>连接关闭回调</h3>
+     * 设置断连状态 → 异步触发指数退避重连(最多3次)。
+     *
+     * @param isPrivate true=私有 WS(需登录),false=公开 WS
+     */
+    private void connect(boolean isPrivate) {
+        String wsUrl = isPrivate ? config.getWsPrivateUrl() : config.getWsPublicUrl();
+        String label = isPrivate ? "私有" : "公开";
+
+        AtomicBoolean connectingFlag = isPrivate ? isPrivateConnecting : isPublicConnecting;
+        if (connectingFlag.get() || !connectingFlag.compareAndSet(false, true)) {
+            log.info("[OKX-WS] 连接进行中,跳过重复{} WS请求", label);
+            return;
+        }
+        try {
+            SSLConfig.configureSSL();
+            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
+            URI uri = new URI(wsUrl);
+
+            WebSocketClient client = new WebSocketClient(uri) {
+                @Override
+                public void onOpen(ServerHandshake handshake) {
+                    log.info("[OKX-WS] {} WS连接成功", label);
+                    connectingFlag.set(false);
+
+                    if (isPrivate) {
+                        isPrivateConnected.set(true);
+                        // 私有 WS 需要先登录
+                        sendLogin(this);
+                    } else {
+                        isPublicConnected.set(true);
+                        // 公开 WS 直接订阅
+                        if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
+                            resetHeartbeatTimer();
+                            for (OkxChannelHandler handler : publicHandlers) {
+                                handler.subscribe(this);
+                            }
+                        } else {
+                            log.warn("[OKX-WS] 应用正在关闭,忽略{} WS连接成功回调", label);
+                        }
+                    }
+                }
+
+                @Override
+                public void onMessage(String message) {
+                    lastMessageTime.set(System.currentTimeMillis());
+                    handleMessage(message, isPrivate, this);
+                    resetHeartbeatTimer();
+                }
+
+                @Override
+                public void onClose(int code, String reason, boolean remote) {
+                    log.warn("[OKX-WS] {} WS连接关闭, code:{}, reason:{}, remote:{}", label, code, reason, remote);
+                    if (isPrivate) {
+                        isPrivateConnected.set(false);
+                        isPrivateLoggedIn.set(false);
+                    } else {
+                        isPublicConnected.set(false);
+                    }
+                    connectingFlag.set(false);
+                    cancelPongTimeout();
+
+                    if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
+                        sharedExecutor.execute(() -> {
+                            try {
+                                reconnectWithBackoff(isPrivate);
+                            } catch (InterruptedException e) {
+                                Thread.currentThread().interrupt();
+                            } catch (Exception e) {
+                                log.error("[OKX-WS] {} WS重连失败", label, e);
+                            }
+                        });
+                    } else {
+                        log.warn("[OKX-WS] 线程池已关闭,不执行{} WS重连", label);
+                    }
+                }
+
+                @Override
+                public void onError(Exception ex) {
+                    log.error("[OKX-WS] {} WS发生错误", label, ex);
+                    connectingFlag.set(false);
+                    if (isPrivate) {
+                        isPrivateConnected.set(false);
+                    } else {
+                        isPublicConnected.set(false);
+                    }
+                }
+            };
+            client.setConnectionLostTimeout(0);
+            client.connect();
+
+            if (isPrivate) {
+                this.privateWsClient = client;
+            } else {
+                this.publicWsClient = client;
+            }
+        } catch (URISyntaxException e) {
+            log.error("[OKX-WS] URI格式错误: {}", wsUrl, e);
+            connectingFlag.set(false);
+        }
+    }
+
+    // ==================== 登录认证 ====================
+
+    /**
+     * 发送 OKX 私有 WS 登录消息。
+     *
+     * <h3>签名算法</h3>
+     * <pre>
+     *   timestamp = ISO 8601 当前时间 (UTC, 毫秒精度)
+     *   message   = timestamp + "GET" + "/users/self/verify" + ""
+     *   sign      = Base64(HMAC-SHA256(apiSecret, message))
+     * </pre>
+     *
+     * <h3>登录消息格式</h3>
+     * <pre>
+     * {
+     *   "op": "login",
+     *   "args": [{
+     *     "apiKey": "...",
+     *     "passphrase": "...",
+     *     "timestamp": "2023-01-01T00:00:00.000Z",
+     *     "sign": "..."
+     *   }]
+     * }
+     * </pre>
+     *
+     * @param ws 私有频道 WebSocket 客户端
+     */
+    private void sendLogin(WebSocketClient ws) {
+        try {
+            // OKX WS 登录必须使用 Unix 秒级时间戳(非 ISO 8601!)
+            String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
+            String message = timestamp + "GET" + "/users/self/verify";
+            String sign = hmacSha256Base64(apiSecret, message);
+
+            JSONObject loginMsg = new JSONObject();
+            loginMsg.put("op", "login");
+
+            JSONArray args = new JSONArray();
+            JSONObject arg = new JSONObject();
+            arg.put("apiKey", apiKey);
+            arg.put("passphrase", passphrase);
+            arg.put("timestamp", timestamp);
+            arg.put("sign", sign);
+            args.add(arg);
+            loginMsg.put("args", args);
+
+            ws.send(loginMsg.toJSONString());
+            log.info("[OKX-WS] 发送登录消息, timestamp: {}", timestamp);
+        } catch (Exception e) {
+            log.error("[OKX-WS] 发送登录消息失败", e);
+        }
+    }
+
+    /**
+     * HMAC-SHA256 签名并 Base64 编码。
+     *
+     * @param secret  密钥
+     * @param message 待签名消息
+     * @return Base64 编码的签名字符串
+     */
+    private String hmacSha256Base64(String secret, String message) {
+        try {
+            Mac mac = Mac.getInstance("HmacSHA256");
+            SecretKeySpec spec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
+            mac.init(spec);
+            byte[] hash = mac.doFinal(message.getBytes(StandardCharsets.UTF_8));
+            return Base64.getEncoder().encodeToString(hash);
+        } catch (Exception e) {
+            log.error("[OKX-WS] HMAC-SHA256签名失败", e);
+            return "";
+        }
+    }
+
+    // ==================== 消息路由 ====================
+
+    /**
+     * 消息分发:先处理系统事件(ping/pong/login/subscribe/error),
+     * 再把数据推送路由到对应的 channelHandler。
+     *
+     * <h3>路由规则</h3>
+     * <ol>
+     *   <li>"pong" → 日志(忽略)</li>
+     *   <li>"ping" → 回复 "pong"</li>
+     *   <li>{"event":"login"} → 登录成功 → 订阅所有私有 handlers</li>
+     *   <li>{"event":"subscribe"} → 标记对应 handler subscribed=true</li>
+     *   <li>{"event":"error"} → 错误日志</li>
+     *   <li>{"arg":{...}, "data":[...]} → 遍历 handlers 路由</li>
+     * </ol>
+     *
+     * @param message   原始消息文本
+     * @param isPrivate true=私有频道消息,false=公开频道消息
+     * @param ws        接收消息的 WebSocket 客户端
+     */
+    private void handleMessage(String message, boolean isPrivate, WebSocketClient ws) {
+        try {
+            // OKX ping/pong 混合格式兼容:JSON {"op":"ping"} 与纯文本 "ping" 均支持
+            if ("pong".equals(message)) {
+                log.debug("[OKX-WS] 收到 pong 响应(纯文本)");
+                cancelPongTimeout();
+                return;
+            }
+            if ("ping".equals(message)) {
+                log.debug("[OKX-WS] 收到 ping(纯文本),回复 pong");
+                if (ws != null && ws.isOpen()) {
+                    ws.send("pong");
+                }
+                return;
+            }
+
+            JSONObject response = JSON.parseObject(message);
+
+            // JSON 格式的 ping/pong (OKX 文档标准格式)
+            String op = response.getString("op");
+            if ("pong".equals(op)) {
+                log.debug("[OKX-WS] 收到 pong 响应");
+                cancelPongTimeout();
+                return;
+            }
+            if ("ping".equals(op)) {
+                log.debug("[OKX-WS] 收到 ping,回复 pong");
+                if (ws != null && ws.isOpen()) {
+                    ws.send("{\"op\":\"pong\"}");
+                }
+                return;
+            }
+
+            // 登录响应
+            String event = response.getString("event");
+            if ("login".equals(event)) {
+                String code = response.getString("code");
+                if ("0".equals(code)) {
+                    log.info("[OKX-WS] 私有频道登录成功");
+                    isPrivateLoggedIn.set(true);
+                    // 登录成功后订阅所有私有频道
+                    for (OkxChannelHandler handler : privateHandlers) {
+                        handler.subscribe(ws);
+                    }
+                } else {
+                    log.error("[OKX-WS] 私有频道登录失败, code:{}, msg:{}",
+                            code, response.getString("msg"));
+                }
+                return;
+            }
+
+            // 订阅确认
+            if ("subscribe".equals(event)) {
+                JSONObject arg = response.getJSONObject("arg");
+                if (arg != null) {
+                    String channel = arg.getString("channel");
+                    log.info("[OKX-WS] 订阅成功: {}", channel);
+                    List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
+                    for (OkxChannelHandler handler : handlers) {
+                        if (channel.equals(handler.getChannelName())) {
+                            handler.setSubscribed(true);
+                            break;
+                        }
+                    }
+                }
+                return;
+            }
+
+            // 取消订阅确认
+            if ("unsubscribe".equals(event)) {
+                JSONObject arg = response.getJSONObject("arg");
+                log.info("[OKX-WS] 取消订阅成功: {}",
+                        arg != null ? arg.getString("channel") : "unknown");
+                return;
+            }
+
+            // 错误
+            if ("error".equals(event)) {
+                log.error("[OKX-WS] 错误, code:{}, msg:{}",
+                        response.getString("code"), response.getString("msg"));
+                return;
+            }
+
+            // 数据推送: {"arg":{"channel":"positions",...}, "data":[...]}
+            JSONObject arg = response.getJSONObject("arg");
+            if (arg != null && response.getJSONArray("data") != null) {
+                String channel = arg.getString("channel");
+                if (channel == null) {
+                    return;
+                }
+                List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
+                for (OkxChannelHandler handler : handlers) {
+                    if (handler.handleMessage(response)) {
+                        return;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("[OKX-WS] 处理消息失败: {}", message, e);
+        }
+    }
+
+    // ==================== 订阅状态检查 ====================
+
+    /**
+     * 检查所有已注册的频道是否都已收到订阅成功确认。
+     * 同时检查公开和私有频道的 handlers。
+     *
+     * @return true 如果所有 handlers 都已订阅确认
+     */
+    public boolean areAllSubscribed() {
+        List<OkxChannelHandler> allHandlers = new ArrayList<>();
+        allHandlers.addAll(publicHandlers);
+        allHandlers.addAll(privateHandlers);
+
+        if (allHandlers.isEmpty()) {
+            return false;
+        }
+        for (OkxChannelHandler h : allHandlers) {
+            if (!h.isSubscribed()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // ==================== 心跳机制 ====================
+
+    /**
+     * 启动心跳检测器。
+     * 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。
+     */
+    private void startHeartbeat() {
+        if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) {
+            heartbeatExecutor.shutdownNow();
+        }
+        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+            Thread t = new Thread(r, "okx-ws-heartbeat");
+            t.setDaemon(true);
+            return t;
+        });
+        heartbeatExecutor.scheduleWithFixedDelay(
+                this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
+    }
+
+    /**
+     * 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。
+     */
+    private synchronized void resetHeartbeatTimer() {
+        cancelPongTimeout();
+        if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
+            pongTimeoutFuture = heartbeatExecutor.schedule(
+                    this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 ping。
+     * OKX 服务端收到 ping 后会回复 pong。
+     */
+    private void checkHeartbeatTimeout() {
+        boolean isAnyConnected = isPublicConnected.get() || isPrivateConnected.get();
+        if (!isAnyConnected) {
+            return;
+        }
+        long elapsed = System.currentTimeMillis() - lastMessageTime.get();
+        if (elapsed >= HEARTBEAT_TIMEOUT * 1000L) {
+            log.debug("[OKX-WS] 心跳超时 {}ms, 主动发送ping", elapsed);
+            sendPing();
+        }
+    }
+
+    /**
+     * 向两条 WS 连接主动发送 ping(OKX 文档标准 JSON 格式)。
+     */
+    private void sendPing() {
+        try {
+            String pingMsg = "{\"op\":\"ping\"}";
+            if (publicWsClient != null && publicWsClient.isOpen()) {
+                publicWsClient.send(pingMsg);
+            }
+            if (privateWsClient != null && privateWsClient.isOpen()) {
+                privateWsClient.send(pingMsg);
+            }
+        } catch (Exception e) {
+            log.warn("[OKX-WS] 发送ping失败", e);
+        }
+    }
+
+    /**
+     * 取消心跳超时检测任务。
+     */
+    private synchronized void cancelPongTimeout() {
+        if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) {
+            pongTimeoutFuture.cancel(true);
+        }
+    }
+
+    // ==================== 重连机制 ====================
+
+    /**
+     * 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。
+     *
+     * @param isPrivate true=重连私有 WS,false=重连公开 WS
+     * @throws InterruptedException 线程被中断
+     */
+    private void reconnectWithBackoff(boolean isPrivate) throws InterruptedException {
+        String label = isPrivate ? "私有" : "公开";
+        int attempt = 0;
+        long delayMs = INITIAL_RECONNECT_DELAY_MS;
+
+        while (attempt < MAX_RECONNECT_ATTEMPTS) {
+            try {
+                Thread.sleep(delayMs);
+                connect(isPrivate);
+                log.info("[OKX-WS] {} WS第{}次重连成功", label, attempt + 1);
+                return;
+            } catch (Exception e) {
+                log.warn("[OKX-WS] {} WS第{}次重连失败", label, attempt + 1, e);
+                delayMs *= 2;
+                attempt++;
+            }
+        }
+        log.error("[OKX-WS] {} WS超过最大重试次数({}),放弃重连", label, MAX_RECONNECT_ATTEMPTS);
+    }
+
+    // ==================== 工具方法 ====================
+
+    /**
+     * 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。
+     *
+     * @param executor 需要关闭的线程池
+     */
+    private void shutdownExecutorGracefully(ExecutorService executor) {
+        if (executor == null || executor.isTerminated()) {
+            return;
+        }
+        try {
+            executor.shutdown();
+            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+                executor.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            executor.shutdownNow();
+        }
+    }
+
+    // ==================== 状态查询 ====================
+
+    /** @return 公开 WS 是否已连接 */
+    public boolean isPublicConnected() {
+        return isPublicConnected.get();
+    }
+
+    /** @return 私有 WS 是否已连接并登录成功 */
+    public boolean isPrivateConnected() {
+        return isPrivateConnected.get() && isPrivateLoggedIn.get();
+    }
+}

--
Gitblit v1.9.1