package com.xcong.excoin.modules.okxNewPrice; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxGridChannelHandler; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SignUtils; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SSLConfig; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * OKX 网格交易专用 WebSocket 客户端,对齐 Gate 的 GateKlineWebSocketClient 模式。 * *

职责

* 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 * {@link OkxGridChannelHandler} 实现类。 * *

生命周期

*
 *   init()        → connect() → login() → subscribe handlers → startHeartbeat()
 *   destroy()     → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
 *   onClose()     → reconnectWithBackoff() (最多 3 次,指数退避)
 * 
* * @author Administrator */ @Slf4j public class OkxGridWsClient { private static final int HEARTBEAT_TIMEOUT = 10; /** 模拟盘 WS 地址 */ private static final String WS_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private"; /** 实盘 WS 地址 */ private static final String WS_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private"; private final ExchangeInfoEnum account; private WebSocketClient webSocketClient; private ScheduledExecutorService heartbeatExecutor; private volatile ScheduledFuture pongTimeoutFuture; private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); /** 频道处理器列表 */ private final List channelHandlers = new ArrayList<>(); /** 共享线程池 */ private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "okx-grid-ws-worker"); t.setDaemon(true); return t; }); public OkxGridWsClient(ExchangeInfoEnum account) { this.account = account; } public void addChannelHandler(OkxGridChannelHandler handler) { channelHandlers.add(handler); } public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("[OKX-Grid-WS] 已初始化过,跳过重复初始化"); return; } connect(); startHeartbeat(); } public void destroy() { log.info("[OKX-Grid-WS] 开始销毁..."); if (webSocketClient != null && webSocketClient.isOpen()) { for (OkxGridChannelHandler handler : channelHandlers) { handler.unsubscribe(webSocketClient); } try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (webSocketClient != null && webSocketClient.isOpen()) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) pongTimeoutFuture.cancel(true); shutdownExecutorGracefully(sharedExecutor); log.info("[OKX-Grid-WS] 销毁完成"); } private void connect() { if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[OKX-Grid-WS] 连接进行中,跳过重复请求"); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); String wsUrl = account.isAccountType() ? WS_URL_PROD : WS_URL_SIM; URI uri = new URI(wsUrl); if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("[OKX-Grid-WS] 连接成功"); isConnected.set(true); isConnecting.set(false); if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); wsLogin(); } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("[OKX-Grid-WS] 连接关闭, code:{}, reason:{}", code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[OKX-Grid-WS] 重连失败", e); } }); } } @Override public void onError(Exception ex) { log.error("[OKX-Grid-WS] 发生错误", ex); isConnected.set(false); } }; webSocketClient.connect(); } catch (URISyntaxException e) { log.error("[OKX-Grid-WS] URI格式错误", e); isConnecting.set(false); } } /** * WebSocket 登录认证 */ private void wsLogin() { try { String timestamp = String.valueOf(System.currentTimeMillis() / 1000); String sign = SignUtils.signWebsocket(timestamp, account.getSecretKey()); JSONObject msg = new JSONObject(); msg.put("op", "login"); com.alibaba.fastjson.JSONArray args = new com.alibaba.fastjson.JSONArray(); JSONObject loginArgs = new JSONObject(); loginArgs.put("apiKey", account.getApiKey()); loginArgs.put("passphrase", account.getPassphrase()); loginArgs.put("timestamp", timestamp); loginArgs.put("sign", sign); args.add(loginArgs); msg.put("args", args); webSocketClient.send(msg.toJSONString()); log.info("[OKX-Grid-WS] 发送登录请求"); } catch (Exception e) { log.error("[OKX-Grid-WS] 登录请求构建失败", e); } } private void handleMessage(String message) { try { JSONObject response = JSON.parseObject(message); String event = response.getString("event"); String op = response.getString("op"); // 登录成功 → 订阅所有频道 if ("login".equals(event) || ("login".equals(op))) { log.info("[OKX-Grid-WS] 登录成功, 开始订阅频道"); for (OkxGridChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } return; } // 订阅确认 if ("subscribe".equals(event) || "unsubscribe".equals(event)) { log.info("[OKX-Grid-WS] {}事件: {}", event, response.getString("arg")); return; } // 错误 if ("error".equals(event)) { log.error("[OKX-Grid-WS] 错误: {}", message); return; } // 数据推送 → 路由到 handler for (OkxGridChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) return; } } catch (Exception e) { log.error("[OKX-Grid-WS] 处理消息失败: {}", message, e); } } // ---- heartbeat ---- private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow(); heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okx-grid-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::sendPing, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } private void checkHeartbeatTimeout() { if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); } private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { webSocketClient.send("ping"); log.debug("[OKX-Grid-WS] 发送 ping"); } } catch (Exception e) { log.warn("[OKX-Grid-WS] 发送 ping 失败", e); } } private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } private void reconnectWithBackoff() throws InterruptedException { int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[OKX-Grid-WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("[OKX-Grid-WS] 超过最大重试次数({}),放弃重连", maxAttempts); } private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) return; try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } }