package com.xcong.excoin.modules.gateApi; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * Gate WebSocket 连接管理器。 * 负责建立连接、心跳检测、重连,将频道逻辑委托给 {@link GateChannelHandler} 实现类。 * *

频道处理

* 每个频道由独立的 Handler 类负责:订阅、取消订阅、消息解析。 * 运行时将消息按 channel 字段路由到对应 Handler。 * * @author Administrator */ @Slf4j public class GateKlineWebSocketClient { private static final String FUTURES_PING = "futures.ping"; private static final String FUTURES_PONG = "futures.pong"; private static final int HEARTBEAT_TIMEOUT = 10; private static final String WS_URL_MONIPAN = "wss://ws-testnet.gate.com/v4/ws/futures/usdt"; private static final String WS_URL_SHIPAN = "wss://fx-ws.gateio.ws/v4/ws/usdt"; private static final boolean isAccountType = false; private WebSocketClient webSocketClient; private ScheduledExecutorService heartbeatExecutor; private volatile ScheduledFuture pongTimeoutFuture; private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); private final List channelHandlers = new ArrayList<>(); private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "gate-ws-worker"); t.setDaemon(true); return t; }); public GateKlineWebSocketClient() { } public void addChannelHandler(GateChannelHandler handler) { channelHandlers.add(handler); } public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("GateKlineWebSocketClient 已经初始化过,跳过重复初始化"); return; } connect(); startHeartbeat(); } public void destroy() { log.info("开始销毁GateKlineWebSocketClient"); if (webSocketClient != null && webSocketClient.isOpen()) { for (GateChannelHandler handler : channelHandlers) { handler.unsubscribe(webSocketClient); } try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("取消订阅等待被中断"); } } if (webSocketClient != null && webSocketClient.isOpen()) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("关闭WebSocket连接时被中断"); } } if (sharedExecutor != null && !sharedExecutor.isShutdown()) { sharedExecutor.shutdown(); } shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) { pongTimeoutFuture.cancel(true); } shutdownExecutorGracefully(sharedExecutor); log.info("GateKlineWebSocketClient销毁完成"); } private void connect() { if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("连接已在进行中,跳过重复连接请求"); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); String WS_URL = isAccountType ? WS_URL_SHIPAN : WS_URL_MONIPAN; URI uri = new URI(WS_URL); if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("关闭之前连接时被中断"); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("Gate WebSocket连接成功"); isConnected.set(true); isConnecting.set(false); if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); for (GateChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } sendPing(); } else { log.warn("应用正在关闭,忽略WebSocket连接成功回调"); } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("Gate WebSocket连接关闭: code={}, reason={}", code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("重连线程被中断", e); } catch (Exception e) { log.error("重连失败", e); } }); } else { log.warn("共享线程池已关闭,无法执行重连任务"); } } @Override public void onError(Exception ex) { log.error("Gate WebSocket发生错误", ex); isConnected.set(false); } }; webSocketClient.connect(); } catch (URISyntaxException e) { log.error("WebSocket URI格式错误", e); isConnecting.set(false); } } private void handleMessage(String message) { try { JSONObject response = JSON.parseObject(message); String channel = response.getString("channel"); String event = response.getString("event"); if (FUTURES_PONG.equals(channel)) { log.debug("收到futures.pong响应"); cancelPongTimeout(); return; } if ("subscribe".equals(event)) { log.info("{} 频道订阅成功: {}", channel, response.getJSONObject("result")); return; } if ("unsubscribe".equals(event)) { log.info("{} 频道取消订阅成功", channel); return; } if ("error".equals(event)) { JSONObject error = response.getJSONObject("error"); log.error("{} 频道错误: code={}, msg={}", channel, error != null ? error.getInteger("code") : "N/A", error != null ? error.getString("message") : response.getString("msg")); return; } if ("update".equals(event) || "all".equals(event)) { for (GateChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) { return; } } } } catch (Exception e) { log.error("处理WebSocket消息失败: {}", message, e); } } private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) { heartbeatExecutor.shutdownNow(); } heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } private void checkHeartbeatTimeout() { if (!isConnected.get()) { return; } if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) { sendPing(); } } private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { JSONObject pingMsg = new JSONObject(); pingMsg.put("time", System.currentTimeMillis() / 1000); pingMsg.put("channel", FUTURES_PING); webSocketClient.send(pingMsg.toJSONString()); log.debug("发送futures.ping请求"); } } catch (Exception e) { log.warn("发送ping失败", e); } } private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) { pongTimeoutFuture.cancel(true); } } private void reconnectWithBackoff() throws InterruptedException { int attempt = 0; int maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("超过最大重试次数({})仍未连接成功", maxAttempts); } private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) { return; } try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } }