Administrator
17 hours ago a986f3571c7e18ade4665fe5999b445b5762264d
src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java
@@ -18,14 +18,37 @@
/**
 * Gate WebSocket 连接管理器。
 * 负责建立连接、心跳检测、重连,将频道逻辑委托给 {@link GateChannelHandler} 实现类。
 *
 * <h3>频道处理</h3>
 * 每个频道由独立的 Handler 类负责:订阅、取消订阅、消息解析。
 * 运行时将消息按 channel 字段路由到对应 Handler。
 * <h3>职责</h3>
 * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link GateChannelHandler} 实现类。
 *
 * <h3>生命周期</h3>
 * <pre>
 *   init()        → connect() → startHeartbeat()
 *   destroy()     → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
 *   onClose()     → reconnectWithBackoff() (最多 3 次,指数退避)
 * </pre>
 *
 * <h3>消息路由</h3>
 * <pre>
 *   onMessage → handleMessage:
 *     1. futures.pong         → cancelPongTimeout
 *     2. subscribe/unsubscribe → 日志
 *     3. error                → 错误日志
 *     4. update/all           → 遍历 channelHandlers → handler.handleMessage(response)
 * </pre>
 *
 * <h3>心跳机制</h3>
 * 采用双重检测:TCP 层的 WebSocket ping/pong + 应用层 futures.ping/futures.pong。
 * 10 秒未收到任何消息 → 发送 futures.ping;25 秒周期检查。
 *
 * <h3>线程安全</h3>
 * 连接状态用 AtomicBoolean(isConnected, isConnecting, isInitialized)。
 * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
 *
 * @author Administrator
 */
@SuppressWarnings("ALL")
@Slf4j
public class GateKlineWebSocketClient {
@@ -33,45 +56,70 @@
    private static final String FUTURES_PONG = "futures.pong";
    private static final int HEARTBEAT_TIMEOUT = 10;
    private static final String WS_URL_MONIPAN = "wss://ws-testnet.gate.com/v4/ws/futures/usdt";
    private static final String WS_URL_SHIPAN = "wss://fx-ws.gateio.ws/v4/ws/usdt";
    private static final boolean isAccountType = false;
    /** WebSocket 地址,由 GateConfig 提供 */
    private final String wsUrl;
    /** Java-WebSocket 客户端实例 */
    private WebSocketClient webSocketClient;
    /** 心跳检测调度器 */
    private ScheduledExecutorService heartbeatExecutor;
    /** 心跳超时 Future */
    private volatile ScheduledFuture<?> pongTimeoutFuture;
    /** 最后收到消息的时间戳(毫秒) */
    private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
    /** 连接状态 */
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    /** 连接中标记,防重入 */
    private final AtomicBoolean isConnecting = new AtomicBoolean(false);
    /** 初始化标记,防重复 init */
    private final AtomicBoolean isInitialized = new AtomicBoolean(false);
    /** 频道处理器列表,通过 addChannelHandler 注册 */
    private final List<GateChannelHandler> channelHandlers = new ArrayList<>();
    /** 重连等异步任务的缓存线程池(daemon 线程) */
    private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
        Thread t = new Thread(r, "gate-ws-worker");
        t.setDaemon(true);
        return t;
    });
    public GateKlineWebSocketClient() {
    public GateKlineWebSocketClient(String wsUrl) {
        this.wsUrl = wsUrl;
    }
    /**
     * 注册频道处理器。需在 init() 前调用。
     *
     * @param handler 实现了 {@link GateChannelHandler} 接口的频道处理器
     */
    public void addChannelHandler(GateChannelHandler handler) {
        channelHandlers.add(handler);
    }
    /**
     * 初始化:建立 WebSocket 连接 → 启动心跳检测。
     * 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。
     */
    public void init() {
        if (!isInitialized.compareAndSet(false, true)) {
            log.warn("GateKlineWebSocketClient 已经初始化过,跳过重复初始化");
            log.warn("[WS] 已初始化过,跳过重复初始化");
            return;
        }
        connect();
        startHeartbeat();
    }
    /**
     * 销毁:取消所有频道订阅 → 关闭 WebSocket 连接 → 关闭线程池。
     *
     * <h3>执行顺序</h3>
     * 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接,
     * 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。
     */
    public void destroy() {
        log.info("开始销毁GateKlineWebSocketClient");
        log.info("[WS] 开始销毁...");
        if (webSocketClient != null && webSocketClient.isOpen()) {
            for (GateChannelHandler handler : channelHandlers) {
@@ -81,7 +129,7 @@
                Thread.sleep(500);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("取消订阅等待被中断");
                log.warn("[WS] 取消订阅等待被中断");
            }
        }
@@ -90,7 +138,7 @@
                webSocketClient.closeBlocking();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                log.warn("关闭WebSocket连接时被中断");
                log.warn("[WS] 关闭连接时被中断");
            }
        }
@@ -104,31 +152,42 @@
        }
        shutdownExecutorGracefully(sharedExecutor);
        log.info("GateKlineWebSocketClient销毁完成");
        log.info("[WS] 销毁完成");
    }
    /**
     * 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。
     *
     * <h3>连接成功回调</h3>
     * <ol>
     *   <li>设置 isConnected=true,isConnecting=false</li>
     *   <li>重置心跳计时器</li>
     *   <li>依次订阅所有已注册的频道处理器</li>
     *   <li>发送首次 ping</li>
     * </ol>
     *
     * <h3>连接关闭回调</h3>
     * 设置断连状态 → 取消心跳超时 → 异步触发指数退避重连(最多3次)。
     *
     * <h3>线程安全</h3>
     * 使用 {@code AtomicBoolean.isConnecting} 防止并发重连。
     */
    private void connect() {
        if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
            log.info("连接已在进行中,跳过重复连接请求");
            log.info("[WS] 连接进行中,跳过重复请求");
            return;
        }
        try {
            SSLConfig.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            String WS_URL = isAccountType ? WS_URL_SHIPAN : WS_URL_MONIPAN;
            URI uri = new URI(WS_URL);
            URI uri = new URI(wsUrl);
            if (webSocketClient != null) {
                try {
                    webSocketClient.closeBlocking();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    log.warn("关闭之前连接时被中断");
                }
                try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
            }
            webSocketClient = new WebSocketClient(uri) {
                @Override
                public void onOpen(ServerHandshake handshake) {
                    log.info("Gate WebSocket连接成功");
                    log.info("[WS] 连接成功");
                    isConnected.set(true);
                    isConnecting.set(false);
                    if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
@@ -138,7 +197,7 @@
                        }
                        sendPing();
                    } else {
                        log.warn("应用正在关闭,忽略WebSocket连接成功回调");
                        log.warn("[WS] 应用正在关闭,忽略连接成功回调");
                    }
                }
@@ -151,39 +210,37 @@
                @Override
                public void onClose(int code, String reason, boolean remote) {
                    log.warn("Gate WebSocket连接关闭: code={}, reason={}", code, reason);
                    log.warn("[WS] 连接关闭, code:{}, reason:{}", code, reason);
                    isConnected.set(false);
                    isConnecting.set(false);
                    cancelPongTimeout();
                    if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
                        sharedExecutor.execute(() -> {
                            try {
                                reconnectWithBackoff();
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                log.error("重连线程被中断", e);
                            } catch (Exception e) {
                                log.error("重连失败", e);
                            }
                            try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); }
                        });
                    } else {
                        log.warn("共享线程池已关闭,无法执行重连任务");
                        log.warn("[WS] 线程池已关闭,不执行重连");
                    }
                }
                @Override
                public void onError(Exception ex) {
                    log.error("Gate WebSocket发生错误", ex);
                    log.error("[WS] 发生错误", ex);
                    isConnected.set(false);
                }
            };
            webSocketClient.connect();
        } catch (URISyntaxException e) {
            log.error("WebSocket URI格式错误", e);
            log.error("[WS] URI格式错误", e);
            isConnecting.set(false);
        }
    }
    /**
     * 消息分发:先处理系统事件(pong/subscribe/error),
     * 再把 update/all 事件路由到各 channelHandler。
     * <p>每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。
     */
    private void handleMessage(String message) {
        try {
            JSONObject response = JSON.parseObject(message);
@@ -191,69 +248,69 @@
            String event = response.getString("event");
            if (FUTURES_PONG.equals(channel)) {
                log.debug("收到futures.pong响应");
                log.debug("[WS] 收到 pong 响应");
                cancelPongTimeout();
                return;
            }
            if ("subscribe".equals(event)) {
                log.info("{} 频道订阅成功: {}", channel, response.getJSONObject("result"));
                log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result"));
                return;
            }
            if ("unsubscribe".equals(event)) {
                log.info("{} 频道取消订阅成功", channel);
                log.info("[WS] {} 取消订阅成功", channel);
                return;
            }
            if ("error".equals(event)) {
                JSONObject error = response.getJSONObject("error");
                log.error("{} 频道错误: code={}, msg={}",
                log.error("[WS] {} 错误, code:{}, msg:{}",
                        channel,
                        error != null ? error.getInteger("code") : "N/A",
                        error != null ? error.getString("message") : response.getString("msg"));
                return;
            }
            if ("update".equals(event) || "all".equals(event)) {
                for (GateChannelHandler handler : channelHandlers) {
                    if (handler.handleMessage(response)) {
                        return;
                    }
                    if (handler.handleMessage(response)) return;
                }
            }
        } catch (Exception e) {
            log.error("处理WebSocket消息失败: {}", message, e);
            log.error("[WS] 处理消息失败: {}", message, e);
        }
    }
    // ---- heartbeat ----
    /**
     * 启动心跳检测器。
     * 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。
     */
    private void startHeartbeat() {
        if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) {
            heartbeatExecutor.shutdownNow();
        }
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "gate-ws-heartbeat");
            t.setDaemon(true);
            return t;
        });
        if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow();
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; });
        heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
    }
    /**
     * 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。
     */
    private synchronized void resetHeartbeatTimer() {
        cancelPongTimeout();
        if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
            pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout,
                    HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
            pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
        }
    }
    /**
     * 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 futures.ping。
     */
    private void checkHeartbeatTimeout() {
        if (!isConnected.get()) {
            return;
        }
        if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) {
            sendPing();
        }
        if (!isConnected.get()) return;
        if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing();
    }
    /**
     * 发送应用层 futures.ping 消息。
     */
    private void sendPing() {
        try {
            if (webSocketClient != null && webSocketClient.isOpen()) {
@@ -261,49 +318,42 @@
                pingMsg.put("time", System.currentTimeMillis() / 1000);
                pingMsg.put("channel", FUTURES_PING);
                webSocketClient.send(pingMsg.toJSONString());
                log.debug("发送futures.ping请求");
                log.debug("[WS] 发送 ping 请求");
            }
        } catch (Exception e) {
            log.warn("发送ping失败", e);
        }
        } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); }
    }
    /**
     * 取消心跳超时检测任务。
     */
    private synchronized void cancelPongTimeout() {
        if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) {
            pongTimeoutFuture.cancel(true);
        }
        if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
    }
    // ---- reconnect ----
    /**
     * 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。
     *
     * @throws InterruptedException 线程被中断
     */
    private void reconnectWithBackoff() throws InterruptedException {
        int attempt = 0;
        int maxAttempts = 3;
        int attempt = 0, maxAttempts = 3;
        long delayMs = 5000;
        while (attempt < maxAttempts) {
            try {
                Thread.sleep(delayMs);
                connect();
                return;
            } catch (Exception e) {
                log.warn("第{}次重连失败", attempt + 1, e);
                delayMs *= 2;
                attempt++;
            }
            try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; }
        }
        log.error("超过最大重试次数({})仍未连接成功", maxAttempts);
        log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts);
    }
    /**
     * 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。
     *
     * @param executor 需要关闭的线程池
     */
    private void shutdownExecutorGracefully(ExecutorService executor) {
        if (executor == null || executor.isTerminated()) {
            return;
        }
        try {
            executor.shutdown();
            if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
                executor.shutdownNow();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
        }
        if (executor == null || executor.isTerminated()) return;
        try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); }
        catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); }
    }
}