package com.xcong.excoin.modules.okxNewPrice; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxGridChannelHandler; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SignUtils; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SSLConfig; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; /** * OKX 网格交易专用 WebSocket 客户端,对齐 Gate 的 GateKlineWebSocketClient 模式。 * *

职责

* 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 * {@link OkxGridChannelHandler} 实现类。 * *

生命周期

*
 *   init()        → connect() → login() → subscribe handlers → startHeartbeat()
 *   destroy()     → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
 *   onClose()     → reconnectWithBackoff() (最多 3 次,指数退避)
 * 
* * @author Administrator */ @Slf4j public class OkxGridWsClient { private static final int HEARTBEAT_TIMEOUT = 10; /** 模拟盘业务 WS 地址(K线等行情数据) */ private static final String WS_BUSINESS_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/business"; /** 实盘业务 WS 地址(K线等行情数据) */ private static final String WS_BUSINESS_URL_PROD = "wss://ws.okx.com:8443/ws/v5/business"; /** 模拟盘私有 WS 地址 */ private static final String WS_PRIVATE_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private"; /** 实盘私有 WS 地址 */ private static final String WS_PRIVATE_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private"; private final ExchangeInfoEnum account; private final boolean isPublic; private final String logPrefix; private WebSocketClient webSocketClient; private ScheduledExecutorService heartbeatExecutor; private volatile ScheduledFuture pongTimeoutFuture; private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); /** 频道处理器列表 */ private final List channelHandlers = new ArrayList<>(); /** 共享线程池 */ private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "okx-grid-ws-worker"); t.setDaemon(true); return t; }); public OkxGridWsClient(ExchangeInfoEnum account, boolean isPublic) { this.account = account; this.isPublic = isPublic; this.logPrefix = isPublic ? "[OKX-Grid-WS-PUB]" : "[OKX-Grid-WS-PRI]"; } public void addChannelHandler(OkxGridChannelHandler handler) { channelHandlers.add(handler); } public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("[{}] 已初始化过,跳过重复初始化", logPrefix); return; } connect(); startHeartbeat(); } public void destroy() { log.info("[{}] 开始销毁...", logPrefix); if (webSocketClient != null && webSocketClient.isOpen()) { for (OkxGridChannelHandler handler : channelHandlers) { handler.unsubscribe(webSocketClient); } try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } if (webSocketClient != null && webSocketClient.isOpen()) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } shutdownExecutorGracefully(heartbeatExecutor); if (pongTimeoutFuture != null) pongTimeoutFuture.cancel(true); shutdownExecutorGracefully(sharedExecutor); log.info("[{}] 销毁完成", logPrefix); } private void connect() { if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[{}] 连接进行中,跳过重复请求", logPrefix); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); String wsUrl; if (account.isAccountType()) { wsUrl = isPublic ? WS_BUSINESS_URL_PROD : WS_PRIVATE_URL_PROD; } else { wsUrl = isPublic ? WS_BUSINESS_URL_SIM : WS_PRIVATE_URL_SIM; } URI uri = new URI(wsUrl); if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("[{}] 连接成功", logPrefix); isConnected.set(true); isConnecting.set(false); if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); if (isPublic) { subscribeAllHandlers(); } else { wsLogin(); } } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("[{}] 连接关闭, code:{}, reason:{}", logPrefix, code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[{}] 重连失败", logPrefix, e); } }); } } @Override public void onError(Exception ex) { log.error("[{}] 发生错误", logPrefix, ex); isConnected.set(false); } }; webSocketClient.setConnectionLostTimeout(0); webSocketClient.connect(); } catch (URISyntaxException e) { log.error("[{}] URI格式错误", logPrefix, e); isConnecting.set(false); } } /** * WebSocket 登录认证 */ private void wsLogin() { try { String timestamp = String.valueOf(System.currentTimeMillis() / 1000); String sign = SignUtils.signWebsocket(timestamp, account.getSecretKey()); JSONObject msg = new JSONObject(); msg.put("op", "login"); com.alibaba.fastjson.JSONArray args = new com.alibaba.fastjson.JSONArray(); JSONObject loginArgs = new JSONObject(); loginArgs.put("apiKey", account.getApiKey()); loginArgs.put("passphrase", account.getPassphrase()); loginArgs.put("timestamp", timestamp); loginArgs.put("sign", sign); args.add(loginArgs); msg.put("args", args); webSocketClient.send(msg.toJSONString()); log.info("[{}] 发送登录请求", logPrefix); } catch (Exception e) { log.error("[{}] 登录请求构建失败", logPrefix, e); } } private void subscribeAllHandlers() { log.info("[{}] 开始订阅频道", logPrefix); for (OkxGridChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } } private void handleMessage(String message) { try { if ("pong".equals(message)) { log.debug("[{}] 收到 pong", logPrefix); return; } JSONObject response = JSON.parseObject(message); String event = response.getString("event"); String op = response.getString("op"); if ("login".equals(event) || ("login".equals(op))) { log.info("[{}] 登录成功, 开始订阅频道", logPrefix); subscribeAllHandlers(); return; } if ("subscribe".equals(event) || "unsubscribe".equals(event)) { log.info("[{}] {}事件: {}", logPrefix, event, response.getString("arg")); // 订阅成功确认:解析频道名并通知对应 handler if ("subscribe".equals(event)) { JSONObject argObj = response.getJSONObject("arg"); if (argObj != null) { String channel = argObj.getString("channel"); if (channel != null) { for (OkxGridChannelHandler handler : channelHandlers) { if (channel.equals(handler.getChannelName())) { handler.onSubscribed(); break; } } } } } return; } if ("error".equals(event)) { log.error("[{}] 错误: {}", logPrefix, message); return; } for (OkxGridChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) return; } } catch (Exception e) { log.error("[{}] 处理消息失败: {}", logPrefix, message, e); } } // ---- heartbeat ---- private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow(); heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okx-grid-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::sendPing, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } private void checkHeartbeatTimeout() { if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); } private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { webSocketClient.send("ping"); log.debug("[{}] 发送 ping", logPrefix); } } catch (Exception e) { log.warn("[{}] 发送 ping 失败", logPrefix, e); } } private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } private void reconnectWithBackoff() throws InterruptedException { int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[{}] 第{}次重连失败", logPrefix, attempt + 1, e); delayMs *= 2; attempt++; } } log.error("[{}] 超过最大重试次数({}),放弃重连", logPrefix, maxAttempts); } private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) return; try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } }