From e45e705c22df5bc979e72db6014dd1ff9637be42 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Wed, 24 Jun 2026 22:21:58 +0800
Subject: [PATCH] fix(okx): 修复网格交易成交日志记录问题
---
src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java | 753 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 753 insertions(+), 0 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
new file mode 100644
index 0000000..1bd14b6
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
@@ -0,0 +1,753 @@
+package com.xcong.excoin.modules.okxApi;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
+import lombok.extern.slf4j.Slf4j;
+import org.java_websocket.client.WebSocketClient;
+import org.java_websocket.handshake.ServerHandshake;
+
+import javax.crypto.Mac;
+import javax.crypto.spec.SecretKeySpec;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.charset.StandardCharsets;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * OKX WebSocket 连接管理器 — 双通道架构。
+ *
+ * <h3>与 Gate 版本的关键区别</h3>
+ * OKX 使用<b>两条独立的 WebSocket 连接</b>:
+ * <ul>
+ * <li><b>公开 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/public}):
+ * 无需认证,订阅 K 线等公开数据。</li>
+ * <li><b>私有 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/private}):
+ * 需要登录认证(login 消息),订阅仓位、条件订单等私有数据。</li>
+ * </ul>
+ * 而 Gate 只有一条 WS 连接,通过签名区分公开/私有频道。
+ *
+ * <h3>登录认证(私有 WS)</h3>
+ * <pre>
+ * {
+ * "op": "login",
+ * "args": [{
+ * "apiKey": "...",
+ * "passphrase": "...",
+ * "timestamp": "1734567890", // Unix 秒级时间戳
+ * "sign": "base64(HMAC-SHA256(timestamp + 'GET' + '/users/self/verify'))"
+ * }]
+ * }
+ * </pre>
+ *
+ * <h3>心跳机制</h3>
+ * OKX 标准格式为 JSON {@code {"op":"ping"}} / {@code {"op":"pong"}},
+ * 同时兼容纯文本 {@code "ping"} / {@code "pong"} 格式。
+ *
+ * <h3>消息路由</h3>
+ * <pre>
+ * onMessage → handleMessage(message, isPrivate):
+ * 1. "pong" (纯文本) → 日志 + cancelPongTimeout
+ * 2. "ping" (纯文本) → 回复 "pong"
+ * 3. {"op":"pong"} (JSON) → 日志 + cancelPongTimeout
+ * 4. {"op":"ping"} (JSON) → 回复 {"op":"pong"}
+ * 5. {"event":"login"} → 登录成功 → 订阅所有私有 handlers
+ * 6. {"event":"subscribe"} → 标记对应 handler subscribed=true
+ * 7. {"event":"error"} → 错误日志
+ * 8. {"arg":{...}, "data":[...]} → 遍历 handlers 路由
+ * </pre>
+ *
+ * <h3>生命周期</h3>
+ * <pre>
+ * init() → connect(public) + connect(private,true) → startHeartbeat()
+ * destroy() → unsubscribe 所有 handler → closeBlocking() 两条连接 → shutdown 线程池
+ * onClose() → reconnectWithBackoff() 重连对应连接(最多 3 次,指数退避)
+ * </pre>
+ *
+ * <h3>线程安全</h3>
+ * 连接状态用 AtomicBoolean(isPublicConnected, isPrivateConnected, isConnecting, isInitialized)。
+ * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
+ *
+ * @author Administrator
+ */
+@SuppressWarnings("ALL")
+@Slf4j
+public class OkxKlineWebSocketClient {
+
+ // ==================== 常量 ====================
+
+ /** 心跳超时时间(秒) */
+ private static final int HEARTBEAT_TIMEOUT = 10;
+
+ /** ISO 8601 时间格式化器(毫秒精度,UTC 时区) */
+ private static final DateTimeFormatter ISO_8601_FORMATTER =
+ DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
+ .withZone(ZoneId.of("UTC"));
+
+ // ==================== 配置 ====================
+
+ /** OKX 配置(提供 WS URL、API 密钥等) */
+ private final OkxConfig config;
+
+ /** OKX API Key */
+ private final String apiKey;
+
+ /** OKX API Secret */
+ private final String apiSecret;
+
+ /** OKX API Passphrase */
+ private final String passphrase;
+
+ // ==================== WebSocket 客户端 ====================
+
+ /** 公开频道 WebSocket 客户端(K线等) */
+ private WebSocketClient publicWsClient;
+
+ /** 私有频道 WebSocket 客户端(仓位、条件单等) */
+ private WebSocketClient privateWsClient;
+
+ /** 心跳检测调度器 */
+ private ScheduledExecutorService heartbeatExecutor;
+
+ /** 心跳超时 Future */
+ private volatile ScheduledFuture<?> pongTimeoutFuture;
+
+ /** 最后收到消息的时间戳(毫秒) */
+ private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
+
+ // ==================== 连接状态 ====================
+
+ /** 公开频道连接状态 */
+ private final AtomicBoolean isPublicConnected = new AtomicBoolean(false);
+
+ /** 私有频道连接状态 */
+ private final AtomicBoolean isPrivateConnected = new AtomicBoolean(false);
+
+ /** 公开频道连接中标记,防重入 */
+ private final AtomicBoolean isPublicConnecting = new AtomicBoolean(false);
+
+ /** 私有频道连接中标记,防重入 */
+ private final AtomicBoolean isPrivateConnecting = new AtomicBoolean(false);
+
+ /** 初始化标记,防重复 init */
+ private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+
+ /** 私有频道登录成功标记 */
+ private final AtomicBoolean isPrivateLoggedIn = new AtomicBoolean(false);
+
+ // ==================== 频道处理器 ====================
+
+ /** 公开频道处理器列表(如 K线) */
+ private final List<OkxChannelHandler> publicHandlers = new ArrayList<>();
+
+ /** 私有频道处理器列表(如 仓位、条件单) */
+ private final List<OkxChannelHandler> privateHandlers = new ArrayList<>();
+
+ // ==================== 异步线程池 ====================
+
+ /** 重连等异步任务的缓存线程池(daemon 线程) */
+ private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
+ Thread t = new Thread(r, "okx-ws-worker");
+ t.setDaemon(true);
+ return t;
+ });
+
+ // ==================== 重连配置 ====================
+
+ /** 重连最大次数 */
+ private static final int MAX_RECONNECT_ATTEMPTS = 3;
+
+ /** 重连初始延迟(毫秒) */
+ private static final long INITIAL_RECONNECT_DELAY_MS = 5000;
+
+ // ==================== 构造器 ====================
+
+ /**
+ * 构造 OKX WebSocket 客户端。
+ *
+ * @param config OKX 配置(提供 WS URL、API 密钥等)
+ */
+ public OkxKlineWebSocketClient(OkxConfig config) {
+ this.config = config;
+ this.apiKey = config.getApiKey();
+ this.apiSecret = config.getApiSecret();
+ this.passphrase = config.getPassphrase();
+ }
+
+ // ==================== Handler 注册 ====================
+
+ /**
+ * 注册公开频道处理器(如 K线)。需在 init() 前调用。
+ *
+ * @param handler 实现了 {@link OkxChannelHandler} 接口的公开频道处理器
+ */
+ public void addPublicHandler(OkxChannelHandler handler) {
+ publicHandlers.add(handler);
+ log.info("[OKX-WS] 注册公开频道处理器: {}", handler.getChannelName());
+ }
+
+ /**
+ * 注册私有频道处理器(如 仓位、条件单)。需在 init() 前调用。
+ *
+ * @param handler 实现了 {@link OkxChannelHandler} 接口的私有频道处理器
+ */
+ public void addPrivateHandler(OkxChannelHandler handler) {
+ privateHandlers.add(handler);
+ log.info("[OKX-WS] 注册私有频道处理器: {}", handler.getChannelName());
+ }
+
+ // ==================== 生命周期 ====================
+
+ /**
+ * 初始化:建立公开 WS 连接 + 私有 WS 连接 → 启动心跳检测。
+ * 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。
+ */
+ public void init() {
+ if (!isInitialized.compareAndSet(false, true)) {
+ log.warn("[OKX-WS] 已初始化过,跳过重复初始化");
+ return;
+ }
+ connect(false); // 公开 WS
+ connect(true); // 私有 WS
+ startHeartbeat();
+ }
+
+ /**
+ * 销毁:取消所有频道订阅 → 关闭两条 WebSocket 连接 → 关闭线程池。
+ *
+ * <h3>执行顺序</h3>
+ * 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接,
+ * 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。
+ */
+ public void destroy() {
+ log.info("[OKX-WS] 开始销毁...");
+
+ // 取消公开频道订阅
+ if (publicWsClient != null && publicWsClient.isOpen()) {
+ for (OkxChannelHandler handler : publicHandlers) {
+ handler.unsubscribe(publicWsClient);
+ }
+ }
+ // 取消私有频道订阅
+ if (privateWsClient != null && privateWsClient.isOpen()) {
+ for (OkxChannelHandler handler : privateHandlers) {
+ handler.unsubscribe(privateWsClient);
+ }
+ }
+
+ // 等待取消订阅消息发出
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("[OKX-WS] 取消订阅等待被中断");
+ }
+
+ // 关闭公开 WS
+ closeWebSocket(publicWsClient);
+ publicWsClient = null;
+
+ // 关闭私有 WS
+ closeWebSocket(privateWsClient);
+ privateWsClient = null;
+
+ // 关闭心跳
+ shutdownExecutorGracefully(heartbeatExecutor);
+ if (pongTimeoutFuture != null) {
+ pongTimeoutFuture.cancel(true);
+ }
+
+ // 关闭共享线程池
+ shutdownExecutorGracefully(sharedExecutor);
+
+ log.info("[OKX-WS] 销毁完成");
+ }
+
+ /**
+ * 安全关闭 WebSocket 连接。
+ */
+ private void closeWebSocket(WebSocketClient ws) {
+ if (ws != null && ws.isOpen()) {
+ try {
+ ws.closeBlocking();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.warn("[OKX-WS] 关闭连接时被中断");
+ }
+ }
+ }
+
+ // ==================== 连接管理 ====================
+
+ /**
+ * 建立 WebSocket 连接。
+ *
+ * <h3>公开 WS 连接成功回调</h3>
+ * 订阅所有公开 handlers(K线等)。
+ *
+ * <h3>私有 WS 连接成功回调</h3>
+ * 先发送 login 认证消息,登录成功后再订阅所有私有 handlers。
+ *
+ * <h3>连接关闭回调</h3>
+ * 设置断连状态 → 异步触发指数退避重连(最多3次)。
+ *
+ * @param isPrivate true=私有 WS(需登录),false=公开 WS
+ */
+ private void connect(boolean isPrivate) {
+ String wsUrl = isPrivate ? config.getWsPrivateUrl() : config.getWsPublicUrl();
+ String label = isPrivate ? "私有" : "公开";
+
+ AtomicBoolean connectingFlag = isPrivate ? isPrivateConnecting : isPublicConnecting;
+ if (connectingFlag.get() || !connectingFlag.compareAndSet(false, true)) {
+ log.info("[OKX-WS] 连接进行中,跳过重复{} WS请求", label);
+ return;
+ }
+ try {
+ SSLConfig.configureSSL();
+ System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
+ URI uri = new URI(wsUrl);
+
+ WebSocketClient client = new WebSocketClient(uri) {
+ @Override
+ public void onOpen(ServerHandshake handshake) {
+ log.info("[OKX-WS] {} WS连接成功", label);
+ connectingFlag.set(false);
+
+ if (isPrivate) {
+ isPrivateConnected.set(true);
+ // 私有 WS 需要先登录
+ sendLogin(this);
+ } else {
+ isPublicConnected.set(true);
+ // 公开 WS 直接订阅
+ if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
+ resetHeartbeatTimer();
+ for (OkxChannelHandler handler : publicHandlers) {
+ handler.subscribe(this);
+ }
+ } else {
+ log.warn("[OKX-WS] 应用正在关闭,忽略{} WS连接成功回调", label);
+ }
+ }
+ }
+
+ @Override
+ public void onMessage(String message) {
+ lastMessageTime.set(System.currentTimeMillis());
+ handleMessage(message, isPrivate, this);
+ resetHeartbeatTimer();
+ }
+
+ @Override
+ public void onClose(int code, String reason, boolean remote) {
+ log.warn("[OKX-WS] {} WS连接关闭, code:{}, reason:{}, remote:{}", label, code, reason, remote);
+ if (isPrivate) {
+ isPrivateConnected.set(false);
+ isPrivateLoggedIn.set(false);
+ } else {
+ isPublicConnected.set(false);
+ }
+ connectingFlag.set(false);
+ cancelPongTimeout();
+
+ if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
+ sharedExecutor.execute(() -> {
+ try {
+ reconnectWithBackoff(isPrivate);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ log.error("[OKX-WS] {} WS重连失败", label, e);
+ }
+ });
+ } else {
+ log.warn("[OKX-WS] 线程池已关闭,不执行{} WS重连", label);
+ }
+ }
+
+ @Override
+ public void onError(Exception ex) {
+ log.error("[OKX-WS] {} WS发生错误", label, ex);
+ connectingFlag.set(false);
+ if (isPrivate) {
+ isPrivateConnected.set(false);
+ } else {
+ isPublicConnected.set(false);
+ }
+ }
+ };
+ client.setConnectionLostTimeout(0);
+ client.connect();
+
+ if (isPrivate) {
+ this.privateWsClient = client;
+ } else {
+ this.publicWsClient = client;
+ }
+ } catch (URISyntaxException e) {
+ log.error("[OKX-WS] URI格式错误: {}", wsUrl, e);
+ connectingFlag.set(false);
+ }
+ }
+
+ // ==================== 登录认证 ====================
+
+ /**
+ * 发送 OKX 私有 WS 登录消息。
+ *
+ * <h3>签名算法</h3>
+ * <pre>
+ * timestamp = ISO 8601 当前时间 (UTC, 毫秒精度)
+ * message = timestamp + "GET" + "/users/self/verify" + ""
+ * sign = Base64(HMAC-SHA256(apiSecret, message))
+ * </pre>
+ *
+ * <h3>登录消息格式</h3>
+ * <pre>
+ * {
+ * "op": "login",
+ * "args": [{
+ * "apiKey": "...",
+ * "passphrase": "...",
+ * "timestamp": "2023-01-01T00:00:00.000Z",
+ * "sign": "..."
+ * }]
+ * }
+ * </pre>
+ *
+ * @param ws 私有频道 WebSocket 客户端
+ */
+ private void sendLogin(WebSocketClient ws) {
+ try {
+ // OKX WS 登录必须使用 Unix 秒级时间戳(非 ISO 8601!)
+ String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
+ String message = timestamp + "GET" + "/users/self/verify";
+ String sign = hmacSha256Base64(apiSecret, message);
+
+ JSONObject loginMsg = new JSONObject();
+ loginMsg.put("op", "login");
+
+ JSONArray args = new JSONArray();
+ JSONObject arg = new JSONObject();
+ arg.put("apiKey", apiKey);
+ arg.put("passphrase", passphrase);
+ arg.put("timestamp", timestamp);
+ arg.put("sign", sign);
+ args.add(arg);
+ loginMsg.put("args", args);
+
+ ws.send(loginMsg.toJSONString());
+ log.info("[OKX-WS] 发送登录消息, timestamp: {}", timestamp);
+ } catch (Exception e) {
+ log.error("[OKX-WS] 发送登录消息失败", e);
+ }
+ }
+
+ /**
+ * HMAC-SHA256 签名并 Base64 编码。
+ *
+ * @param secret 密钥
+ * @param message 待签名消息
+ * @return Base64 编码的签名字符串
+ */
+ private String hmacSha256Base64(String secret, String message) {
+ try {
+ Mac mac = Mac.getInstance("HmacSHA256");
+ SecretKeySpec spec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
+ mac.init(spec);
+ byte[] hash = mac.doFinal(message.getBytes(StandardCharsets.UTF_8));
+ return Base64.getEncoder().encodeToString(hash);
+ } catch (Exception e) {
+ log.error("[OKX-WS] HMAC-SHA256签名失败", e);
+ return "";
+ }
+ }
+
+ // ==================== 消息路由 ====================
+
+ /**
+ * 消息分发:先处理系统事件(ping/pong/login/subscribe/error),
+ * 再把数据推送路由到对应的 channelHandler。
+ *
+ * <h3>路由规则</h3>
+ * <ol>
+ * <li>"pong" → 日志(忽略)</li>
+ * <li>"ping" → 回复 "pong"</li>
+ * <li>{"event":"login"} → 登录成功 → 订阅所有私有 handlers</li>
+ * <li>{"event":"subscribe"} → 标记对应 handler subscribed=true</li>
+ * <li>{"event":"error"} → 错误日志</li>
+ * <li>{"arg":{...}, "data":[...]} → 遍历 handlers 路由</li>
+ * </ol>
+ *
+ * @param message 原始消息文本
+ * @param isPrivate true=私有频道消息,false=公开频道消息
+ * @param ws 接收消息的 WebSocket 客户端
+ */
+ private void handleMessage(String message, boolean isPrivate, WebSocketClient ws) {
+ try {
+ // OKX ping/pong 混合格式兼容:JSON {"op":"ping"} 与纯文本 "ping" 均支持
+ if ("pong".equals(message)) {
+ log.debug("[OKX-WS] 收到 pong 响应(纯文本)");
+ cancelPongTimeout();
+ return;
+ }
+ if ("ping".equals(message)) {
+ log.debug("[OKX-WS] 收到 ping(纯文本),回复 pong");
+ if (ws != null && ws.isOpen()) {
+ ws.send("pong");
+ }
+ return;
+ }
+
+ JSONObject response = JSON.parseObject(message);
+
+ // JSON 格式的 ping/pong (OKX 文档标准格式)
+ String op = response.getString("op");
+ if ("pong".equals(op)) {
+ log.debug("[OKX-WS] 收到 pong 响应");
+ cancelPongTimeout();
+ return;
+ }
+ if ("ping".equals(op)) {
+ log.debug("[OKX-WS] 收到 ping,回复 pong");
+ if (ws != null && ws.isOpen()) {
+ ws.send("{\"op\":\"pong\"}");
+ }
+ return;
+ }
+
+ // 登录响应
+ String event = response.getString("event");
+ if ("login".equals(event)) {
+ String code = response.getString("code");
+ if ("0".equals(code)) {
+ log.info("[OKX-WS] 私有频道登录成功");
+ isPrivateLoggedIn.set(true);
+ // 登录成功后订阅所有私有频道
+ for (OkxChannelHandler handler : privateHandlers) {
+ handler.subscribe(ws);
+ }
+ } else {
+ log.error("[OKX-WS] 私有频道登录失败, code:{}, msg:{}",
+ code, response.getString("msg"));
+ }
+ return;
+ }
+
+ // 订阅确认
+ if ("subscribe".equals(event)) {
+ JSONObject arg = response.getJSONObject("arg");
+ if (arg != null) {
+ String channel = arg.getString("channel");
+ log.info("[OKX-WS] 订阅成功: {}", channel);
+ List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
+ for (OkxChannelHandler handler : handlers) {
+ if (channel.equals(handler.getChannelName())) {
+ handler.setSubscribed(true);
+ break;
+ }
+ }
+ }
+ return;
+ }
+
+ // 取消订阅确认
+ if ("unsubscribe".equals(event)) {
+ JSONObject arg = response.getJSONObject("arg");
+ log.info("[OKX-WS] 取消订阅成功: {}",
+ arg != null ? arg.getString("channel") : "unknown");
+ return;
+ }
+
+ // 错误
+ if ("error".equals(event)) {
+ log.error("[OKX-WS] 错误, code:{}, msg:{}",
+ response.getString("code"), response.getString("msg"));
+ return;
+ }
+
+ // 数据推送: {"arg":{"channel":"positions",...}, "data":[...]}
+ JSONObject arg = response.getJSONObject("arg");
+ if (arg != null && response.getJSONArray("data") != null) {
+ String channel = arg.getString("channel");
+ if (channel == null) {
+ return;
+ }
+ List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
+ for (OkxChannelHandler handler : handlers) {
+ if (handler.handleMessage(response)) {
+ return;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("[OKX-WS] 处理消息失败: {}", message, e);
+ }
+ }
+
+ // ==================== 订阅状态检查 ====================
+
+ /**
+ * 检查所有已注册的频道是否都已收到订阅成功确认。
+ * 同时检查公开和私有频道的 handlers。
+ *
+ * @return true 如果所有 handlers 都已订阅确认
+ */
+ public boolean areAllSubscribed() {
+ List<OkxChannelHandler> allHandlers = new ArrayList<>();
+ allHandlers.addAll(publicHandlers);
+ allHandlers.addAll(privateHandlers);
+
+ if (allHandlers.isEmpty()) {
+ return false;
+ }
+ for (OkxChannelHandler h : allHandlers) {
+ if (!h.isSubscribed()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // ==================== 心跳机制 ====================
+
+ /**
+ * 启动心跳检测器。
+ * 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。
+ */
+ private void startHeartbeat() {
+ if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) {
+ heartbeatExecutor.shutdownNow();
+ }
+ heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "okx-ws-heartbeat");
+ t.setDaemon(true);
+ return t;
+ });
+ heartbeatExecutor.scheduleWithFixedDelay(
+ this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
+ }
+
+ /**
+ * 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。
+ */
+ private synchronized void resetHeartbeatTimer() {
+ cancelPongTimeout();
+ if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
+ pongTimeoutFuture = heartbeatExecutor.schedule(
+ this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
+ }
+ }
+
+ /**
+ * 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 ping。
+ * OKX 服务端收到 ping 后会回复 pong。
+ */
+ private void checkHeartbeatTimeout() {
+ boolean isAnyConnected = isPublicConnected.get() || isPrivateConnected.get();
+ if (!isAnyConnected) {
+ return;
+ }
+ long elapsed = System.currentTimeMillis() - lastMessageTime.get();
+ if (elapsed >= HEARTBEAT_TIMEOUT * 1000L) {
+ log.debug("[OKX-WS] 心跳超时 {}ms, 主动发送ping", elapsed);
+ sendPing();
+ }
+ }
+
+ /**
+ * 向两条 WS 连接主动发送 ping(OKX 文档标准 JSON 格式)。
+ */
+ private void sendPing() {
+ try {
+ String pingMsg = "{\"op\":\"ping\"}";
+ if (publicWsClient != null && publicWsClient.isOpen()) {
+ publicWsClient.send(pingMsg);
+ }
+ if (privateWsClient != null && privateWsClient.isOpen()) {
+ privateWsClient.send(pingMsg);
+ }
+ } catch (Exception e) {
+ log.warn("[OKX-WS] 发送ping失败", e);
+ }
+ }
+
+ /**
+ * 取消心跳超时检测任务。
+ */
+ private synchronized void cancelPongTimeout() {
+ if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) {
+ pongTimeoutFuture.cancel(true);
+ }
+ }
+
+ // ==================== 重连机制 ====================
+
+ /**
+ * 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。
+ *
+ * @param isPrivate true=重连私有 WS,false=重连公开 WS
+ * @throws InterruptedException 线程被中断
+ */
+ private void reconnectWithBackoff(boolean isPrivate) throws InterruptedException {
+ String label = isPrivate ? "私有" : "公开";
+ int attempt = 0;
+ long delayMs = INITIAL_RECONNECT_DELAY_MS;
+
+ while (attempt < MAX_RECONNECT_ATTEMPTS) {
+ try {
+ Thread.sleep(delayMs);
+ connect(isPrivate);
+ log.info("[OKX-WS] {} WS第{}次重连成功", label, attempt + 1);
+ return;
+ } catch (Exception e) {
+ log.warn("[OKX-WS] {} WS第{}次重连失败", label, attempt + 1, e);
+ delayMs *= 2;
+ attempt++;
+ }
+ }
+ log.error("[OKX-WS] {} WS超过最大重试次数({}),放弃重连", label, MAX_RECONNECT_ATTEMPTS);
+ }
+
+ // ==================== 工具方法 ====================
+
+ /**
+ * 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。
+ *
+ * @param executor 需要关闭的线程池
+ */
+ private void shutdownExecutorGracefully(ExecutorService executor) {
+ if (executor == null || executor.isTerminated()) {
+ return;
+ }
+ try {
+ executor.shutdown();
+ if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ executor.shutdownNow();
+ }
+ }
+
+ // ==================== 状态查询 ====================
+
+ /** @return 公开 WS 是否已连接 */
+ public boolean isPublicConnected() {
+ return isPublicConnected.get();
+ }
+
+ /** @return 私有 WS 是否已连接并登录成功 */
+ public boolean isPrivateConnected() {
+ return isPrivateConnected.get() && isPrivateLoggedIn.get();
+ }
+}
--
Gitblit v1.9.1