package com.xcong.excoin.modules.gateApi; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * Gate WebSocket 连接管理器。 * *

职责

* 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link GateChannelHandler} 实现类。 * *

生命周期

*
 *   init()        → connect() → startHeartbeat()
 *   destroy()     → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
 *   onClose()     → reconnectWithBackoff() (最多 3 次,指数退避)
 * 
* *

消息路由

*
 *   onMessage → handleMessage:
 *     1. futures.pong         → cancelPongTimeout
 *     2. subscribe/unsubscribe → 日志
 *     3. error                → 错误日志
 *     4. update/all           → 遍历 channelHandlers → handler.handleMessage(response)
 * 
* *

心跳机制

* 采用双重检测:TCP 层的 WebSocket ping/pong + 应用层 futures.ping/futures.pong。 * 10 秒未收到任何消息 → 发送 futures.ping;25 秒周期检查。 * *

线程安全

* 连接状态用 AtomicBoolean(isConnected, isConnecting, isInitialized)。 * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。 * * @author Administrator */ @SuppressWarnings("ALL") @Slf4j public class GateKlineWebSocketClient { private static final String FUTURES_PING = "futures.ping"; private static final String FUTURES_PONG = "futures.pong"; private static final int HEARTBEAT_TIMEOUT = 10; /** WebSocket 地址,由 GateConfig 提供 */ private final String wsUrl; /** Java-WebSocket 客户端实例 */ private WebSocketClient webSocketClient; /** 心跳检测调度器 */ private ScheduledExecutorService heartbeatExecutor; /** 心跳超时 Future */ private volatile ScheduledFuture pongTimeoutFuture; /** 最后收到消息的时间戳(毫秒) */ private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); /** 连接状态 */ private final AtomicBoolean isConnected = new AtomicBoolean(false); /** 连接中标记,防重入 */ private final AtomicBoolean isConnecting = new AtomicBoolean(false); /** 初始化标记,防重复 init */ private final AtomicBoolean isInitialized = new AtomicBoolean(false); /** 频道处理器列表,通过 addChannelHandler 注册 */ private final List channelHandlers = new ArrayList<>(); /** 重连等异步任务的缓存线程池(daemon 线程) */ private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "gate-ws-worker"); t.setDaemon(true); return t; }); public GateKlineWebSocketClient(String wsUrl) { this.wsUrl = wsUrl; } /** * 注册频道处理器。需在 init() 前调用。 * * @param handler 实现了 {@link GateChannelHandler} 接口的频道处理器 */ public void addChannelHandler(GateChannelHandler handler) { channelHandlers.add(handler); } /** * 初始化:建立 WebSocket 连接 → 启动心跳检测。 * 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。 */ public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("[WS] 已初始化过,跳过重复初始化"); return; } connect(); startHeartbeat(); } /** * 销毁:取消所有频道订阅 → 关闭 WebSocket 连接 → 关闭线程池。 * *

执行顺序

* 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接, * 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。 */ public void destroy() { log.info("[WS] 开始销毁..."); if (webSocketClient != null && webSocketClient.isOpen()) { for (GateChannelHandler handler : channelHandlers) { handler.unsubscribe(webSocketClient); } try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[WS] 取消订阅等待被中断"); } } if (webSocketClient != null && webSocketClient.isOpen()) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[WS] 关闭连接时被中断"); } } if (sharedExecutor != null && !sharedExecutor.isShutdown()) { sharedExecutor.shutdown(); } shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) { pongTimeoutFuture.cancel(true); } shutdownExecutorGracefully(sharedExecutor); log.info("[WS] 销毁完成"); } /** * 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。 * *

连接成功回调

*
    *
  1. 设置 isConnected=true,isConnecting=false
  2. *
  3. 重置心跳计时器
  4. *
  5. 依次订阅所有已注册的频道处理器
  6. *
  7. 发送首次 ping
  8. *
* *

连接关闭回调

* 设置断连状态 → 取消心跳超时 → 异步触发指数退避重连(最多3次)。 * *

线程安全

* 使用 {@code AtomicBoolean.isConnecting} 防止并发重连。 */ private void connect() { if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[WS] 连接进行中,跳过重复请求"); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); URI uri = new URI(wsUrl); if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("[WS] 连接成功"); isConnected.set(true); isConnecting.set(false); if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); for (GateChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } sendPing(); } else { log.warn("[WS] 应用正在关闭,忽略连接成功回调"); } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("[WS] 连接关闭, code:{}, reason:{}", code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); } }); } else { log.warn("[WS] 线程池已关闭,不执行重连"); } } @Override public void onError(Exception ex) { log.error("[WS] 发生错误", ex); isConnected.set(false); } }; webSocketClient.connect(); } catch (URISyntaxException e) { log.error("[WS] URI格式错误", e); isConnecting.set(false); } } /** * 消息分发:先处理系统事件(pong/subscribe/error), * 再把 update/all 事件路由到各 channelHandler。 *

每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。 */ private void handleMessage(String message) { try { JSONObject response = JSON.parseObject(message); String channel = response.getString("channel"); String event = response.getString("event"); if (FUTURES_PONG.equals(channel)) { log.debug("[WS] 收到 pong 响应"); cancelPongTimeout(); return; } if ("subscribe".equals(event)) { log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result")); return; } if ("unsubscribe".equals(event)) { log.info("[WS] {} 取消订阅成功", channel); return; } if ("error".equals(event)) { JSONObject error = response.getJSONObject("error"); log.error("[WS] {} 错误, code:{}, msg:{}", channel, error != null ? error.getInteger("code") : "N/A", error != null ? error.getString("message") : response.getString("msg")); return; } if ("update".equals(event) || "all".equals(event)) { for (GateChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) return; } } } catch (Exception e) { log.error("[WS] 处理消息失败: {}", message, e); } } // ---- heartbeat ---- /** * 启动心跳检测器。 * 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。 */ private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow(); heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } /** * 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。 */ private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } /** * 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 futures.ping。 */ private void checkHeartbeatTimeout() { if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); } /** * 发送应用层 futures.ping 消息。 */ private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { JSONObject pingMsg = new JSONObject(); pingMsg.put("time", System.currentTimeMillis() / 1000); pingMsg.put("channel", FUTURES_PING); webSocketClient.send(pingMsg.toJSONString()); log.debug("[WS] 发送 ping 请求"); } } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); } } /** * 取消心跳超时检测任务。 */ private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } // ---- reconnect ---- /** * 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。 * * @throws InterruptedException 线程被中断 */ private void reconnectWithBackoff() throws InterruptedException { int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts); } /** * 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。 * * @param executor 需要关闭的线程池 */ private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) return; try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } }