package com.xcong.excoin.modules.okxApi; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * OKX WebSocket 连接管理器。 * *

职责

* 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link OkxChannelHandler} 实现类。 * *

生命周期

*
 *   init()        → connect() → startHeartbeat()
 *   destroy()     → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
 *   onClose()     → reconnectWithBackoff() (最多 3 次,指数退避)
 * 
* *

消息路由

*
 *   onMessage → handleMessage:
 *     1. pong                  → cancelPongTimeout
 *     2. login/subscribe/error → 日志
 *     3. order/batch-orders    → 下单结果日志
 *     4. 数据推送              → 遍历 channelHandlers → handler.handleMessage(response)
 * 
* *

心跳机制

* 10 秒未收到任何消息 → 发送 ping;25 秒周期检查。 * * @author Administrator */ @SuppressWarnings("ALL") @Slf4j public class OkxKlineWebSocketClient { private static final int HEARTBEAT_TIMEOUT = 10; private final String wsUrl; private final boolean isPrivate; private final String apiKey; private final String secretKey; private final String passphrase; private WebSocketClient webSocketClient; private ScheduledExecutorService heartbeatExecutor; private volatile ScheduledFuture pongTimeoutFuture; private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); private final List channelHandlers = new ArrayList<>(); public WebSocketClient getWebSocketClient() { return webSocketClient; } private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "okxApi-ws-worker"); t.setDaemon(true); return t; }); public OkxKlineWebSocketClient(String wsUrl) { this.wsUrl = wsUrl; this.isPrivate = false; this.apiKey = null; this.secretKey = null; this.passphrase = null; } public OkxKlineWebSocketClient(String wsUrl, String apiKey, String secretKey, String passphrase) { this.wsUrl = wsUrl; this.isPrivate = true; this.apiKey = apiKey; this.secretKey = secretKey; this.passphrase = passphrase; } public void addChannelHandler(OkxChannelHandler handler) { channelHandlers.add(handler); } private void websocketLogin() { try { String timestamp = String.valueOf(System.currentTimeMillis() / 1000); String sign = OkxWsUtil.signWebsocket(timestamp, secretKey); JSONArray argsArray = new JSONArray(); JSONObject loginArgs = new JSONObject(); loginArgs.put("apiKey", apiKey); loginArgs.put("passphrase", passphrase); loginArgs.put("timestamp", timestamp); loginArgs.put("sign", sign); argsArray.add(loginArgs); JSONObject login = OkxWsUtil.buildJsonObject(null, "login", argsArray); webSocketClient.send(login.toJSONString()); log.info("[WS] 发送登录请求"); } catch (Exception e) { log.error("[WS] 登录请求构建失败", e); } } public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("[WS] 已初始化过,跳过重复初始化"); return; } connect(); startHeartbeat(); } public void destroy() { log.info("[WS] 开始销毁..."); if (webSocketClient != null && webSocketClient.isOpen()) { for (OkxChannelHandler handler : channelHandlers) { handler.unsubscribe(webSocketClient); } try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[WS] 取消订阅等待被中断"); } } if (webSocketClient != null && webSocketClient.isOpen()) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[WS] 关闭连接时被中断"); } } if (sharedExecutor != null && !sharedExecutor.isShutdown()) { sharedExecutor.shutdown(); } shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) { pongTimeoutFuture.cancel(true); } shutdownExecutorGracefully(sharedExecutor); log.info("[WS] 销毁完成"); } private void connect() { if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[WS] 连接进行中,跳过重复请求"); return; } try { OkxWsUtil.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); URI uri = new URI(wsUrl); if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("[WS] 连接成功, isPrivate:{}", isPrivate); isConnected.set(true); isConnecting.set(false); if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); if (isPrivate) { websocketLogin(); } else { for (OkxChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } sendPing(); } } else { log.warn("[WS] 应用正在关闭,忽略连接成功回调"); } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("[WS] 连接关闭, code:{}, reason:{}", code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); } }); } else { log.warn("[WS] 线程池已关闭,不执行重连"); } } @Override public void onError(Exception ex) { log.error("[WS] 发生错误", ex); isConnected.set(false); } }; webSocketClient.connect(); } catch (URISyntaxException e) { log.error("[WS] URI格式错误", e); isConnecting.set(false); } } private void handleMessage(String message) { try { if ("pong".equals(message)) { log.debug("[WS] 收到心跳响应"); cancelPongTimeout(); return; } JSONObject response = JSON.parseObject(message); String event = response.getString("event"); if ("login".equals(event)) { String code = response.getString("code"); if ("0".equals(code)) { log.info("[WS] WebSocket登录成功"); for (OkxChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } sendPing(); } else { log.error("[WS] WebSocket登录失败, code:{}, msg:{}", code, response.getString("msg")); } return; } if ("subscribe".equals(event)) { log.info("[WS] 订阅成功: {}", response.getJSONObject("arg")); return; } if ("unsubscribe".equals(event)) { log.info("[WS] 取消订阅成功: {}", response.getJSONObject("arg")); return; } if ("error".equals(event)) { log.error("[WS] 错误, code:{}, msg:{}", response.getString("code"), response.getString("msg")); return; } if ("channel-conn-count".equals(event)) { return; } String op = response.getString("op"); if ("order".equals(op) || "batch-orders".equals(op)) { log.info("[WS] 收到下单推送结果: {}", JSON.toJSONString(response.get("data"))); return; } for (OkxChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) return; } } catch (Exception e) { log.error("[WS] 处理消息失败: {}", message, e); } } private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow(); heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okxApi-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } private void checkHeartbeatTimeout() { if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); } private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { webSocketClient.send("ping"); log.debug("[WS] 发送 ping 请求"); } } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); } } private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } private void reconnectWithBackoff() throws InterruptedException { int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts); } private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) return; try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } }