| | |
| | | private final AtomicBoolean isConnected = new AtomicBoolean(false); |
| | | private final AtomicBoolean isConnecting = new AtomicBoolean(false); |
| | | private final AtomicBoolean isInitialized = new AtomicBoolean(false); |
| | | private final AtomicBoolean isDestroyed = new AtomicBoolean(false); |
| | | private final AtomicBoolean reconnecting = new AtomicBoolean(false); |
| | | |
| | | private final List<OkxChannelHandler> channelHandlers = new ArrayList<>(); |
| | | |
| | |
| | | } |
| | | |
| | | public void destroy() { |
| | | isDestroyed.set(true); |
| | | log.info("[WS] 开始销毁..."); |
| | | |
| | | if (webSocketClient != null && webSocketClient.isOpen()) { |
| | |
| | | } |
| | | |
| | | private void connect() { |
| | | if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { |
| | | log.info("[WS] 连接进行中,跳过重复请求"); |
| | | if (!isConnecting.compareAndSet(false, true)) { |
| | | return; |
| | | } |
| | | try { |
| | |
| | | isConnected.set(false); |
| | | isConnecting.set(false); |
| | | cancelPongTimeout(); |
| | | if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { |
| | | sharedExecutor.execute(() -> { |
| | | try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); } |
| | | }); |
| | | } else { |
| | | log.warn("[WS] 线程池已关闭,不执行重连"); |
| | | if (isDestroyed.get() || sharedExecutor == null || sharedExecutor.isShutdown() || sharedExecutor.isTerminated()) { |
| | | return; |
| | | } |
| | | sharedExecutor.execute(() -> { |
| | | try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); } |
| | | }); |
| | | } |
| | | |
| | | @Override |
| | |
| | | |
| | | private void checkHeartbeatTimeout() { |
| | | if (!isConnected.get()) return; |
| | | if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); |
| | | if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) { |
| | | log.info("[WS] 心跳超时,发送ping"); |
| | | sendPing(); |
| | | } |
| | | } |
| | | |
| | | private void sendPing() { |
| | | try { |
| | | if (webSocketClient != null && webSocketClient.isOpen()) { |
| | | webSocketClient.send("ping"); |
| | | log.debug("[WS] 发送 ping 请求"); |
| | | log.info("[WS] 发送ping请求"); |
| | | } |
| | | } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); } |
| | | } catch (Exception e) { log.warn("[WS] 发送ping失败", e); } |
| | | } |
| | | |
| | | private synchronized void cancelPongTimeout() { |
| | | if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); |
| | | } |
| | | |
| | | private int reconnectAttempt = 0; |
| | | |
| | | private void reconnectWithBackoff() throws InterruptedException { |
| | | int attempt = 0, maxAttempts = 3; |
| | | long delayMs = 5000; |
| | | while (attempt < maxAttempts) { |
| | | try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } |
| | | if (!reconnecting.compareAndSet(false, true)) { |
| | | return; |
| | | } |
| | | log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts); |
| | | try { |
| | | long delayMs = 5000L; |
| | | while (!isDestroyed.get() && !isConnected.get()) { |
| | | try { |
| | | connect(); |
| | | for (int i = 0; i < 20 && !isDestroyed.get() && !isConnected.get(); i++) { |
| | | Thread.sleep(1000); |
| | | } |
| | | if (isConnected.get()) { |
| | | reconnectAttempt = 0; |
| | | log.info("[WS] 重连成功"); |
| | | return; |
| | | } |
| | | if (isDestroyed.get()) { |
| | | return; |
| | | } |
| | | } catch (Exception e) { |
| | | log.warn("[WS] 第{}次重连失败: {}", reconnectAttempt + 1, e.getMessage()); |
| | | } |
| | | reconnectAttempt++; |
| | | log.info("[WS] 第{}次重连等待{}秒...", reconnectAttempt, delayMs / 1000); |
| | | for (long waited = 0; waited < delayMs && !isDestroyed.get(); waited += 1000) { |
| | | Thread.sleep(1000); |
| | | if (isConnected.get()) return; |
| | | } |
| | | long jitter = (long) (Math.random() * 3000); |
| | | delayMs = Math.min(delayMs * 2, 60000L) + jitter; |
| | | } |
| | | } finally { |
| | | reconnecting.set(false); |
| | | } |
| | | } |
| | | |
| | | private void shutdownExecutorGracefully(ExecutorService executor) { |