| | |
| | | |
| | | private static final int HEARTBEAT_TIMEOUT = 10; |
| | | |
| | | /** 模拟盘 WS 地址 */ |
| | | private static final String WS_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private"; |
| | | /** 实盘 WS 地址 */ |
| | | private static final String WS_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private"; |
| | | /** 模拟盘公共 WS 地址 */ |
| | | private static final String WS_PUBLIC_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/public"; |
| | | /** 实盘公共 WS 地址 */ |
| | | private static final String WS_PUBLIC_URL_PROD = "wss://ws.okx.com:8443/ws/v5/public"; |
| | | /** 模拟盘私有 WS 地址 */ |
| | | private static final String WS_PRIVATE_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private"; |
| | | /** 实盘私有 WS 地址 */ |
| | | private static final String WS_PRIVATE_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private"; |
| | | |
| | | private final ExchangeInfoEnum account; |
| | | private final boolean isPublic; |
| | | private final String logPrefix; |
| | | private WebSocketClient webSocketClient; |
| | | private ScheduledExecutorService heartbeatExecutor; |
| | | private volatile ScheduledFuture<?> pongTimeoutFuture; |
| | |
| | | return t; |
| | | }); |
| | | |
| | | public OkxGridWsClient(ExchangeInfoEnum account) { |
| | | public OkxGridWsClient(ExchangeInfoEnum account, boolean isPublic) { |
| | | this.account = account; |
| | | this.isPublic = isPublic; |
| | | this.logPrefix = isPublic ? "[OKX-Grid-WS-PUB]" : "[OKX-Grid-WS-PRI]"; |
| | | } |
| | | |
| | | public void addChannelHandler(OkxGridChannelHandler handler) { |
| | |
| | | |
| | | public void init() { |
| | | if (!isInitialized.compareAndSet(false, true)) { |
| | | log.warn("[OKX-Grid-WS] 已初始化过,跳过重复初始化"); |
| | | log.warn("[{}] 已初始化过,跳过重复初始化", logPrefix); |
| | | return; |
| | | } |
| | | connect(); |
| | |
| | | } |
| | | |
| | | public void destroy() { |
| | | log.info("[OKX-Grid-WS] 开始销毁..."); |
| | | log.info("[{}] 开始销毁...", logPrefix); |
| | | if (webSocketClient != null && webSocketClient.isOpen()) { |
| | | for (OkxGridChannelHandler handler : channelHandlers) { |
| | | handler.unsubscribe(webSocketClient); |
| | |
| | | shutdownExecutorGracefully(heartbeatExecutor); |
| | | if (pongTimeoutFuture != null) pongTimeoutFuture.cancel(true); |
| | | shutdownExecutorGracefully(sharedExecutor); |
| | | log.info("[OKX-Grid-WS] 销毁完成"); |
| | | log.info("[{}] 销毁完成", logPrefix); |
| | | } |
| | | |
| | | private void connect() { |
| | | if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { |
| | | log.info("[OKX-Grid-WS] 连接进行中,跳过重复请求"); |
| | | log.info("[{}] 连接进行中,跳过重复请求", logPrefix); |
| | | return; |
| | | } |
| | | try { |
| | | SSLConfig.configureSSL(); |
| | | System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); |
| | | String wsUrl = account.isAccountType() ? WS_URL_PROD : WS_URL_SIM; |
| | | String wsUrl; |
| | | if (account.isAccountType()) { |
| | | wsUrl = isPublic ? WS_PUBLIC_URL_PROD : WS_PRIVATE_URL_PROD; |
| | | } else { |
| | | wsUrl = isPublic ? WS_PUBLIC_URL_SIM : WS_PRIVATE_URL_SIM; |
| | | } |
| | | URI uri = new URI(wsUrl); |
| | | |
| | | if (webSocketClient != null) { |
| | |
| | | webSocketClient = new WebSocketClient(uri) { |
| | | @Override |
| | | public void onOpen(ServerHandshake handshake) { |
| | | log.info("[OKX-Grid-WS] 连接成功"); |
| | | log.info("[{}] 连接成功", logPrefix); |
| | | isConnected.set(true); |
| | | isConnecting.set(false); |
| | | if (sharedExecutor != null && !sharedExecutor.isShutdown()) { |
| | | resetHeartbeatTimer(); |
| | | if (isPublic) { |
| | | subscribeAllHandlers(); |
| | | } else { |
| | | wsLogin(); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | @Override |
| | | public void onClose(int code, String reason, boolean remote) { |
| | | log.warn("[OKX-Grid-WS] 连接关闭, code:{}, reason:{}", code, reason); |
| | | log.warn("[{}] 连接关闭, code:{}, reason:{}", logPrefix, code, reason); |
| | | isConnected.set(false); |
| | | isConnecting.set(false); |
| | | cancelPongTimeout(); |
| | | if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { |
| | | sharedExecutor.execute(() -> { |
| | | try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[OKX-Grid-WS] 重连失败", e); } |
| | | try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[{}] 重连失败", logPrefix, e); } |
| | | }); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void onError(Exception ex) { |
| | | log.error("[OKX-Grid-WS] 发生错误", ex); |
| | | log.error("[{}] 发生错误", logPrefix, ex); |
| | | isConnected.set(false); |
| | | } |
| | | }; |
| | | webSocketClient.connect(); |
| | | } catch (URISyntaxException e) { |
| | | log.error("[OKX-Grid-WS] URI格式错误", e); |
| | | log.error("[{}] URI格式错误", logPrefix, e); |
| | | isConnecting.set(false); |
| | | } |
| | | } |
| | |
| | | args.add(loginArgs); |
| | | msg.put("args", args); |
| | | webSocketClient.send(msg.toJSONString()); |
| | | log.info("[OKX-Grid-WS] 发送登录请求"); |
| | | log.info("[{}] 发送登录请求", logPrefix); |
| | | } catch (Exception e) { |
| | | log.error("[OKX-Grid-WS] 登录请求构建失败", e); |
| | | log.error("[{}] 登录请求构建失败", logPrefix, e); |
| | | } |
| | | } |
| | | |
| | | private void subscribeAllHandlers() { |
| | | log.info("[{}] 开始订阅频道", logPrefix); |
| | | for (OkxGridChannelHandler handler : channelHandlers) { |
| | | handler.subscribe(webSocketClient); |
| | | } |
| | | } |
| | | |
| | |
| | | String event = response.getString("event"); |
| | | String op = response.getString("op"); |
| | | |
| | | // 登录成功 → 订阅所有频道 |
| | | if ("login".equals(event) || ("login".equals(op))) { |
| | | log.info("[OKX-Grid-WS] 登录成功, 开始订阅频道"); |
| | | for (OkxGridChannelHandler handler : channelHandlers) { |
| | | handler.subscribe(webSocketClient); |
| | | } |
| | | log.info("[{}] 登录成功, 开始订阅频道", logPrefix); |
| | | subscribeAllHandlers(); |
| | | return; |
| | | } |
| | | |
| | | // 订阅确认 |
| | | if ("subscribe".equals(event) || "unsubscribe".equals(event)) { |
| | | log.info("[OKX-Grid-WS] {}事件: {}", event, response.getString("arg")); |
| | | log.info("[{}] {}事件: {}", logPrefix, event, response.getString("arg")); |
| | | return; |
| | | } |
| | | |
| | | // 错误 |
| | | if ("error".equals(event)) { |
| | | log.error("[OKX-Grid-WS] 错误: {}", message); |
| | | log.error("[{}] 错误: {}", logPrefix, message); |
| | | return; |
| | | } |
| | | |
| | | // 数据推送 → 路由到 handler |
| | | for (OkxGridChannelHandler handler : channelHandlers) { |
| | | if (handler.handleMessage(response)) return; |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("[OKX-Grid-WS] 处理消息失败: {}", message, e); |
| | | log.error("[{}] 处理消息失败: {}", logPrefix, message, e); |
| | | } |
| | | } |
| | | |
| | |
| | | try { |
| | | if (webSocketClient != null && webSocketClient.isOpen()) { |
| | | webSocketClient.send("ping"); |
| | | log.debug("[OKX-Grid-WS] 发送 ping"); |
| | | log.debug("[{}] 发送 ping", logPrefix); |
| | | } |
| | | } catch (Exception e) { log.warn("[OKX-Grid-WS] 发送 ping 失败", e); } |
| | | } catch (Exception e) { log.warn("[{}] 发送 ping 失败", logPrefix, e); } |
| | | } |
| | | |
| | | private synchronized void cancelPongTimeout() { |
| | |
| | | long delayMs = 5000; |
| | | while (attempt < maxAttempts) { |
| | | try { Thread.sleep(delayMs); connect(); return; } |
| | | catch (Exception e) { log.warn("[OKX-Grid-WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } |
| | | catch (Exception e) { log.warn("[{}] 第{}次重连失败", logPrefix, attempt + 1, e); delayMs *= 2; attempt++; } |
| | | } |
| | | log.error("[OKX-Grid-WS] 超过最大重试次数({}),放弃重连", maxAttempts); |
| | | log.error("[{}] 超过最大重试次数({}),放弃重连", logPrefix, maxAttempts); |
| | | } |
| | | |
| | | private void shutdownExecutorGracefully(ExecutorService executor) { |