package com.xcong.excoin.modules.gateApi; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * Gate WebSocket 连接管理器。 * *
* init() → connect() → startHeartbeat() * destroy() → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池 * onClose() → reconnectWithBackoff() (最多 3 次,指数退避) ** *
* onMessage → handleMessage: * 1. futures.pong → cancelPongTimeout * 2. subscribe/unsubscribe → 日志 * 3. error → 错误日志 * 4. update/all → 遍历 channelHandlers → handler.handleMessage(response) ** *
注意:先 closeBlocking 再 shutdown sharedExecutor, * 避免 onClose 回调中的 reconnectWithBackoff 访问已关闭的线程池。 */ public void destroy() { log.info("[WS] destroy..."); if (webSocketClient != null && webSocketClient.isOpen()) { for (GateChannelHandler handler : channelHandlers) { handler.unsubscribe(webSocketClient); } try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[WS] unsubscribe wait interrupted"); } } if (webSocketClient != null && webSocketClient.isOpen()) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("[WS] close interrupted"); } } if (sharedExecutor != null && !sharedExecutor.isShutdown()) { sharedExecutor.shutdown(); } shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) { pongTimeoutFuture.cancel(true); } shutdownExecutorGracefully(sharedExecutor); log.info("[WS] destroyed"); } /** * 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。 * 连接成功后依次订阅所有已注册的频道处理器。 */ private void connect() { if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[WS] already connecting"); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); URI uri = new URI(wsUrl); if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("[WS] connected"); isConnected.set(true); isConnecting.set(false); if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); for (GateChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } sendPing(); } else { log.warn("[WS] shutting down, ignore onOpen"); } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("[WS] closed, code:{}, reason:{}", code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] reconnect fail", e); } }); } else { log.warn("[WS] executor closed, no reconnect"); } } @Override public void onError(Exception ex) { log.error("[WS] error", ex); isConnected.set(false); } }; webSocketClient.connect(); } catch (URISyntaxException e) { log.error("[WS] bad uri", e); isConnecting.set(false); } } /** * 消息分发:先处理系统事件(pong/subscribe/error), * 再把 update/all 事件路由到各 channelHandler。 *
每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。 */ private void handleMessage(String message) { try { JSONObject response = JSON.parseObject(message); String channel = response.getString("channel"); String event = response.getString("event"); if (FUTURES_PONG.equals(channel)) { log.debug("[WS] pong received"); cancelPongTimeout(); return; } if ("subscribe".equals(event)) { log.info("[WS] {} subscribed: {}", channel, response.getJSONObject("result")); return; } if ("unsubscribe".equals(event)) { log.info("[WS] {} unsubscribed", channel); return; } if ("error".equals(event)) { JSONObject error = response.getJSONObject("error"); log.error("[WS] {} error, code:{}, msg:{}", channel, error != null ? error.getInteger("code") : "N/A", error != null ? error.getString("message") : response.getString("msg")); return; } if ("update".equals(event) || "all".equals(event)) { for (GateChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) return; } } } catch (Exception e) { log.error("[WS] handle msg fail: {}", message, e); } } // ---- heartbeat ---- private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow(); heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } private void checkHeartbeatTimeout() { if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); } private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { JSONObject pingMsg = new JSONObject(); pingMsg.put("time", System.currentTimeMillis() / 1000); pingMsg.put("channel", FUTURES_PING); webSocketClient.send(pingMsg.toJSONString()); log.debug("[WS] ping sent"); } } catch (Exception e) { log.warn("[WS] ping fail", e); } } private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } // ---- reconnect ---- private void reconnectWithBackoff() throws InterruptedException { int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] reconnect attempt {} fail", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("[WS] reconnect exhausted after {} attempts", maxAttempts); } private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) return; try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } }