package com.xcong.excoin.modules.gateApi; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * Gate WebSocket 连接管理器。 * *
* init() → connect() → startHeartbeat() * destroy() → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池 * onClose() → reconnectWithBackoff() (最多 3 次,指数退避) ** *
* onMessage → handleMessage: * 1. futures.pong → cancelPongTimeout * 2. subscribe/unsubscribe → 日志 * 3. error → 错误日志 * 4. update/all → 遍历 channelHandlers → handler.handleMessage(response) ** *
每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。 */ private void handleMessage(String message) { try { JSONObject response = JSON.parseObject(message); String channel = response.getString("channel"); String event = response.getString("event"); if (FUTURES_PONG.equals(channel)) { log.debug("[WS] 收到 pong 响应"); cancelPongTimeout(); return; } if ("subscribe".equals(event)) { log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result")); return; } if ("unsubscribe".equals(event)) { log.info("[WS] {} 取消订阅成功", channel); return; } if ("error".equals(event)) { JSONObject error = response.getJSONObject("error"); log.error("[WS] {} 错误, code:{}, msg:{}", channel, error != null ? error.getInteger("code") : "N/A", error != null ? error.getString("message") : response.getString("msg")); return; } if ("update".equals(event) || "all".equals(event)) { for (GateChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) return; } } } catch (Exception e) { log.error("[WS] 处理消息失败: {}", message, e); } } // ---- heartbeat ---- /** * 启动心跳检测器。 * 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。 */ private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow(); heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } /** * 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。 */ private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } /** * 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 futures.ping。 */ private void checkHeartbeatTimeout() { if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); } /** * 发送应用层 futures.ping 消息。 */ private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { JSONObject pingMsg = new JSONObject(); pingMsg.put("time", System.currentTimeMillis() / 1000); pingMsg.put("channel", FUTURES_PING); webSocketClient.send(pingMsg.toJSONString()); log.debug("[WS] 发送 ping 请求"); } } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); } } /** * 取消心跳超时检测任务。 */ private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } // ---- reconnect ---- /** * 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。 * * @throws InterruptedException 线程被中断 */ private void reconnectWithBackoff() throws InterruptedException { int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts); } /** * 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。 * * @param executor 需要关闭的线程池 */ private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) return; try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } }