package com.xcong.excoin.modules.okxApi; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * OKX WebSocket 连接管理器 — 双通道架构。 * *

与 Gate 版本的关键区别

* OKX 使用两条独立的 WebSocket 连接: * * 而 Gate 只有一条 WS 连接,通过签名区分公开/私有频道。 * *

登录认证(私有 WS)

*
 * {
 *   "op": "login",
 *   "args": [{
 *     "apiKey": "...",
 *     "passphrase": "...",
 *     "timestamp": "1734567890",   // Unix 秒级时间戳
 *     "sign": "base64(HMAC-SHA256(timestamp + 'GET' + '/users/self/verify'))"
 *   }]
 * }
 * 
* *

心跳机制

* OKX 标准格式为 JSON {@code {"op":"ping"}} / {@code {"op":"pong"}}, * 同时兼容纯文本 {@code "ping"} / {@code "pong"} 格式。 * *

消息路由

*
 *   onMessage → handleMessage(message, isPrivate):
 *     1. "pong" (纯文本)                            → 日志 + cancelPongTimeout
 *     2. "ping" (纯文本)                            → 回复 "pong"
 *     3. {"op":"pong"} (JSON)                       → 日志 + cancelPongTimeout
 *     4. {"op":"ping"} (JSON)                       → 回复 {"op":"pong"}
 *     5. {"event":"login"}                          → 登录成功 → 订阅所有私有 handlers
 *     6. {"event":"subscribe"}                      → 标记对应 handler subscribed=true
 *     7. {"event":"error"}                          → 错误日志
 *     8. {"arg":{...}, "data":[...]}                → 遍历 handlers 路由
 * 
* *

生命周期

*
 *   init()        → connect(public) + connect(private,true) → startHeartbeat()
 *   destroy()     → unsubscribe 所有 handler → closeBlocking() 两条连接 → shutdown 线程池
 *   onClose()     → reconnectWithBackoff() 重连对应连接(最多 3 次,指数退避)
 * 
* *

线程安全

* 连接状态用 AtomicBoolean(isPublicConnected, isPrivateConnected, isConnecting, isInitialized)。 * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。 * * @author Administrator */ @SuppressWarnings("ALL") @Slf4j public class OkxKlineWebSocketClient { // ==================== 常量 ==================== /** 心跳超时时间(秒) */ private static final int HEARTBEAT_TIMEOUT = 10; /** ISO 8601 时间格式化器(毫秒精度,UTC 时区) */ private static final DateTimeFormatter ISO_8601_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") .withZone(ZoneId.of("UTC")); // ==================== 配置 ==================== /** OKX 配置(提供 WS URL、API 密钥等) */ private final OkxConfig config; /** OKX API Key */ private final String apiKey; /** OKX API Secret */ private final String apiSecret; /** OKX API Passphrase */ private final String passphrase; // ==================== WebSocket 客户端 ==================== /** 公开频道 WebSocket 客户端(K线等) */ private WebSocketClient publicWsClient; /** 私有频道 WebSocket 客户端(仓位、条件单等) */ private WebSocketClient privateWsClient; /** 心跳检测调度器 */ private ScheduledExecutorService heartbeatExecutor; /** 心跳超时 Future */ private volatile ScheduledFuture pongTimeoutFuture; /** 最后收到消息的时间戳(毫秒) */ private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); // ==================== 连接状态 ==================== /** 公开频道连接状态 */ private final AtomicBoolean isPublicConnected = new AtomicBoolean(false); /** 私有频道连接状态 */ private final AtomicBoolean isPrivateConnected = new AtomicBoolean(false); /** 连接中标记,防重入 */ private final AtomicBoolean isConnecting = new AtomicBoolean(false); /** 初始化标记,防重复 init */ private final AtomicBoolean isInitialized = new AtomicBoolean(false); /** 私有频道登录成功标记 */ private final AtomicBoolean isPrivateLoggedIn = new AtomicBoolean(false); // ==================== 频道处理器 ==================== /** 公开频道处理器列表(如 K线) */ private final List publicHandlers = new ArrayList<>(); /** 私有频道处理器列表(如 仓位、条件单) */ private final List privateHandlers = new ArrayList<>(); // ==================== 异步线程池 ==================== /** 重连等异步任务的缓存线程池(daemon 线程) */ private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "okx-ws-worker"); t.setDaemon(true); return t; }); // ==================== 重连配置 ==================== /** 重连最大次数 */ private static final int MAX_RECONNECT_ATTEMPTS = 3; /** 重连初始延迟(毫秒) */ private static final long INITIAL_RECONNECT_DELAY_MS = 5000; // ==================== 构造器 ==================== /** * 构造 OKX WebSocket 客户端。 * * @param config OKX 配置(提供 WS URL、API 密钥等) */ public OkxKlineWebSocketClient(OkxConfig config) { this.config = config; this.apiKey = config.getApiKey(); this.apiSecret = config.getApiSecret(); this.passphrase = config.getPassphrase(); } // ==================== Handler 注册 ==================== /** * 注册公开频道处理器(如 K线)。需在 init() 前调用。 * * @param handler 实现了 {@link OkxChannelHandler} 接口的公开频道处理器 */ public void addPublicHandler(OkxChannelHandler handler) { publicHandlers.add(handler); log.info("[OKX-WS] 注册公开频道处理器: {}", handler.getChannelName()); } /** * 注册私有频道处理器(如 仓位、条件单)。需在 init() 前调用。 * * @param handler 实现了 {@link OkxChannelHandler} 接口的私有频道处理器 */ public void addPrivateHandler(OkxChannelHandler handler) { privateHandlers.add(handler); log.info("[OKX-WS] 注册私有频道处理器: {}", handler.getChannelName()); } // ==================== 生命周期 ==================== /** * 初始化:建立公开 WS 连接 + 私有 WS 连接 → 启动心跳检测。 * 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。 */ public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("[OKX-WS] 已初始化过,跳过重复初始化"); return; } connect(false); // 公开 WS connect(true); // 私有 WS startHeartbeat(); } /** * 销毁:取消所有频道订阅 → 关闭两条 WebSocket 连接 → 关闭线程池。 * *

执行顺序

* 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接, * 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。 */ public void destroy() { log.info("[OKX-WS] 开始销毁..."); // 取消公开频道订阅 if (publicWsClient != null && publicWsClient.isOpen()) { for (OkxChannelHandler handler : publicHandlers) { handler.unsubscribe(publicWsClient); } } // 取消私有频道订阅 if (privateWsClient != null && privateWsClient.isOpen()) { for (OkxChannelHandler handler : privateHandlers) { handler.unsubscribe(privateWsClient); } } // 等待取消订阅消息发出 try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[OKX-WS] 取消订阅等待被中断"); } // 关闭公开 WS closeWebSocket(publicWsClient); publicWsClient = null; // 关闭私有 WS closeWebSocket(privateWsClient); privateWsClient = null; // 关闭心跳 shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) { pongTimeoutFuture.cancel(true); } // 关闭共享线程池 shutdownExecutorGracefully(sharedExecutor); log.info("[OKX-WS] 销毁完成"); } /** * 安全关闭 WebSocket 连接。 */ private void closeWebSocket(WebSocketClient ws) { if (ws != null && ws.isOpen()) { try { ws.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[OKX-WS] 关闭连接时被中断"); } } } // ==================== 连接管理 ==================== /** * 建立 WebSocket 连接。 * *

公开 WS 连接成功回调

* 订阅所有公开 handlers(K线等)。 * *

私有 WS 连接成功回调

* 先发送 login 认证消息,登录成功后再订阅所有私有 handlers。 * *

连接关闭回调

* 设置断连状态 → 异步触发指数退避重连(最多3次)。 * * @param isPrivate true=私有 WS(需登录),false=公开 WS */ private void connect(boolean isPrivate) { String wsUrl = isPrivate ? config.getWsPrivateUrl() : config.getWsPublicUrl(); String label = isPrivate ? "私有" : "公开"; if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[OKX-WS] 连接进行中,跳过重复{} WS请求", label); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); URI uri = new URI(wsUrl); WebSocketClient client = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("[OKX-WS] {} WS连接成功", label); isConnecting.set(false); if (isPrivate) { isPrivateConnected.set(true); // 私有 WS 需要先登录 sendLogin(this); } else { isPublicConnected.set(true); // 公开 WS 直接订阅 if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); for (OkxChannelHandler handler : publicHandlers) { handler.subscribe(this); } } else { log.warn("[OKX-WS] 应用正在关闭,忽略{} WS连接成功回调", label); } } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleMessage(message, isPrivate, this); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("[OKX-WS] {} WS连接关闭, code:{}, reason:{}, remote:{}", label, code, reason, remote); if (isPrivate) { isPrivateConnected.set(false); isPrivateLoggedIn.set(false); } else { isPublicConnected.set(false); } isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(isPrivate); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[OKX-WS] {} WS重连失败", label, e); } }); } else { log.warn("[OKX-WS] 线程池已关闭,不执行{} WS重连", label); } } @Override public void onError(Exception ex) { log.error("[OKX-WS] {} WS发生错误", label, ex); if (isPrivate) { isPrivateConnected.set(false); } else { isPublicConnected.set(false); } } }; client.setConnectionLostTimeout(0); client.connect(); if (isPrivate) { this.privateWsClient = client; } else { this.publicWsClient = client; } } catch (URISyntaxException e) { log.error("[OKX-WS] URI格式错误: {}", wsUrl, e); isConnecting.set(false); } } // ==================== 登录认证 ==================== /** * 发送 OKX 私有 WS 登录消息。 * *

签名算法

*
     *   timestamp = ISO 8601 当前时间 (UTC, 毫秒精度)
     *   message   = timestamp + "GET" + "/users/self/verify" + ""
     *   sign      = Base64(HMAC-SHA256(apiSecret, message))
     * 
* *

登录消息格式

*
     * {
     *   "op": "login",
     *   "args": [{
     *     "apiKey": "...",
     *     "passphrase": "...",
     *     "timestamp": "2023-01-01T00:00:00.000Z",
     *     "sign": "..."
     *   }]
     * }
     * 
* * @param ws 私有频道 WebSocket 客户端 */ private void sendLogin(WebSocketClient ws) { try { // OKX WS 登录必须使用 Unix 秒级时间戳(非 ISO 8601!) String timestamp = String.valueOf(System.currentTimeMillis() / 1000); String message = timestamp + "GET" + "/users/self/verify"; String sign = hmacSha256Base64(apiSecret, message); JSONObject loginMsg = new JSONObject(); loginMsg.put("op", "login"); JSONArray args = new JSONArray(); JSONObject arg = new JSONObject(); arg.put("apiKey", apiKey); arg.put("passphrase", passphrase); arg.put("timestamp", timestamp); arg.put("sign", sign); args.add(arg); loginMsg.put("args", args); ws.send(loginMsg.toJSONString()); log.info("[OKX-WS] 发送登录消息, timestamp: {}", timestamp); } catch (Exception e) { log.error("[OKX-WS] 发送登录消息失败", e); } } /** * HMAC-SHA256 签名并 Base64 编码。 * * @param secret 密钥 * @param message 待签名消息 * @return Base64 编码的签名字符串 */ private String hmacSha256Base64(String secret, String message) { try { Mac mac = Mac.getInstance("HmacSHA256"); SecretKeySpec spec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"); mac.init(spec); byte[] hash = mac.doFinal(message.getBytes(StandardCharsets.UTF_8)); return Base64.getEncoder().encodeToString(hash); } catch (Exception e) { log.error("[OKX-WS] HMAC-SHA256签名失败", e); return ""; } } // ==================== 消息路由 ==================== /** * 消息分发:先处理系统事件(ping/pong/login/subscribe/error), * 再把数据推送路由到对应的 channelHandler。 * *

路由规则

*
    *
  1. "pong" → 日志(忽略)
  2. *
  3. "ping" → 回复 "pong"
  4. *
  5. {"event":"login"} → 登录成功 → 订阅所有私有 handlers
  6. *
  7. {"event":"subscribe"} → 标记对应 handler subscribed=true
  8. *
  9. {"event":"error"} → 错误日志
  10. *
  11. {"arg":{...}, "data":[...]} → 遍历 handlers 路由
  12. *
* * @param message 原始消息文本 * @param isPrivate true=私有频道消息,false=公开频道消息 * @param ws 接收消息的 WebSocket 客户端 */ private void handleMessage(String message, boolean isPrivate, WebSocketClient ws) { try { // OKX ping/pong 混合格式兼容:JSON {"op":"ping"} 与纯文本 "ping" 均支持 if ("pong".equals(message)) { log.debug("[OKX-WS] 收到 pong 响应(纯文本)"); cancelPongTimeout(); return; } if ("ping".equals(message)) { log.debug("[OKX-WS] 收到 ping(纯文本),回复 pong"); if (ws != null && ws.isOpen()) { ws.send("pong"); } return; } JSONObject response = JSON.parseObject(message); // JSON 格式的 ping/pong (OKX 文档标准格式) String op = response.getString("op"); if ("pong".equals(op)) { log.debug("[OKX-WS] 收到 pong 响应"); cancelPongTimeout(); return; } if ("ping".equals(op)) { log.debug("[OKX-WS] 收到 ping,回复 pong"); if (ws != null && ws.isOpen()) { ws.send("{\"op\":\"pong\"}"); } return; } // 登录响应 String event = response.getString("event"); if ("login".equals(event)) { String code = response.getString("code"); if ("0".equals(code)) { log.info("[OKX-WS] 私有频道登录成功"); isPrivateLoggedIn.set(true); // 登录成功后订阅所有私有频道 for (OkxChannelHandler handler : privateHandlers) { handler.subscribe(ws); } } else { log.error("[OKX-WS] 私有频道登录失败, code:{}, msg:{}", code, response.getString("msg")); } return; } // 订阅确认 if ("subscribe".equals(event)) { JSONObject arg = response.getJSONObject("arg"); if (arg != null) { String channel = arg.getString("channel"); log.info("[OKX-WS] 订阅成功: {}", channel); List handlers = isPrivate ? privateHandlers : publicHandlers; for (OkxChannelHandler handler : handlers) { if (channel.equals(handler.getChannelName())) { handler.setSubscribed(true); break; } } } return; } // 取消订阅确认 if ("unsubscribe".equals(event)) { JSONObject arg = response.getJSONObject("arg"); log.info("[OKX-WS] 取消订阅成功: {}", arg != null ? arg.getString("channel") : "unknown"); return; } // 错误 if ("error".equals(event)) { log.error("[OKX-WS] 错误, code:{}, msg:{}", response.getString("code"), response.getString("msg")); return; } // 数据推送: {"arg":{"channel":"positions",...}, "data":[...]} JSONObject arg = response.getJSONObject("arg"); if (arg != null && response.getJSONArray("data") != null) { String channel = arg.getString("channel"); if (channel == null) { return; } List handlers = isPrivate ? privateHandlers : publicHandlers; for (OkxChannelHandler handler : handlers) { if (handler.handleMessage(response)) { return; } } } } catch (Exception e) { log.error("[OKX-WS] 处理消息失败: {}", message, e); } } // ==================== 订阅状态检查 ==================== /** * 检查所有已注册的频道是否都已收到订阅成功确认。 * 同时检查公开和私有频道的 handlers。 * * @return true 如果所有 handlers 都已订阅确认 */ public boolean areAllSubscribed() { List allHandlers = new ArrayList<>(); allHandlers.addAll(publicHandlers); allHandlers.addAll(privateHandlers); if (allHandlers.isEmpty()) { return false; } for (OkxChannelHandler h : allHandlers) { if (!h.isSubscribed()) { return false; } } return true; } // ==================== 心跳机制 ==================== /** * 启动心跳检测器。 * 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。 */ private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) { heartbeatExecutor.shutdownNow(); } heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okx-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay( this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } /** * 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。 */ private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule( this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } /** * 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 ping。 * OKX 服务端收到 ping 后会回复 pong。 */ private void checkHeartbeatTimeout() { boolean isAnyConnected = isPublicConnected.get() || isPrivateConnected.get(); if (!isAnyConnected) { return; } long elapsed = System.currentTimeMillis() - lastMessageTime.get(); if (elapsed >= HEARTBEAT_TIMEOUT * 1000L) { log.debug("[OKX-WS] 心跳超时 {}ms, 主动发送ping", elapsed); sendPing(); } } /** * 向两条 WS 连接主动发送 ping(OKX 文档标准 JSON 格式)。 */ private void sendPing() { try { String pingMsg = "{\"op\":\"ping\"}"; if (publicWsClient != null && publicWsClient.isOpen()) { publicWsClient.send(pingMsg); } if (privateWsClient != null && privateWsClient.isOpen()) { privateWsClient.send(pingMsg); } } catch (Exception e) { log.warn("[OKX-WS] 发送ping失败", e); } } /** * 取消心跳超时检测任务。 */ private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) { pongTimeoutFuture.cancel(true); } } // ==================== 重连机制 ==================== /** * 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。 * * @param isPrivate true=重连私有 WS,false=重连公开 WS * @throws InterruptedException 线程被中断 */ private void reconnectWithBackoff(boolean isPrivate) throws InterruptedException { String label = isPrivate ? "私有" : "公开"; int attempt = 0; long delayMs = INITIAL_RECONNECT_DELAY_MS; while (attempt < MAX_RECONNECT_ATTEMPTS) { try { Thread.sleep(delayMs); connect(isPrivate); log.info("[OKX-WS] {} WS第{}次重连成功", label, attempt + 1); return; } catch (Exception e) { log.warn("[OKX-WS] {} WS第{}次重连失败", label, attempt + 1, e); delayMs *= 2; attempt++; } } log.error("[OKX-WS] {} WS超过最大重试次数({}),放弃重连", label, MAX_RECONNECT_ATTEMPTS); } // ==================== 工具方法 ==================== /** * 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。 * * @param executor 需要关闭的线程池 */ private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) { return; } try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } // ==================== 状态查询 ==================== /** @return 公开 WS 是否已连接 */ public boolean isPublicConnected() { return isPublicConnected.get(); } /** @return 私有 WS 是否已连接并登录成功 */ public boolean isPrivateConnected() { return isPrivateConnected.get() && isPrivateLoggedIn.get(); } }