From afc858458cbdde1d5f3f2cfce6d056656bf75c16 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Wed, 13 May 2026 21:39:39 +0800
Subject: [PATCH] refactor(okxNewPrice): 账户配置

---
 src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java |   78 +++++++++++++++++++++++++++++----------
 1 files changed, 58 insertions(+), 20 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
index b043658..19a423f 100644
--- a/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
@@ -63,6 +63,8 @@
     private final AtomicBoolean isConnected = new AtomicBoolean(false);
     private final AtomicBoolean isConnecting = new AtomicBoolean(false);
     private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+    private final AtomicBoolean isDestroyed = new AtomicBoolean(false);
+    private final AtomicBoolean reconnecting = new AtomicBoolean(false);
 
     private final List<OkxChannelHandler> channelHandlers = new ArrayList<>();
 
@@ -117,7 +119,6 @@
 
             JSONObject login = OkxWsUtil.buildJsonObject(null, "login", argsArray);
             webSocketClient.send(login.toJSONString());
-            log.info("[WS] 发送登录请求");
         } catch (Exception e) {
             log.error("[WS] 登录请求构建失败", e);
         }
@@ -133,6 +134,7 @@
     }
 
     public void destroy() {
+        isDestroyed.set(true);
         log.info("[WS] 开始销毁...");
 
         if (webSocketClient != null && webSocketClient.isOpen()) {
@@ -170,8 +172,7 @@
     }
 
     private void connect() {
-        if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
-            log.info("[WS] 连接进行中,跳过重复请求");
+        if (!isConnecting.compareAndSet(false, true)) {
             return;
         }
         try {
@@ -215,13 +216,12 @@
                     isConnected.set(false);
                     isConnecting.set(false);
                     cancelPongTimeout();
-                    if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
-                        sharedExecutor.execute(() -> {
-                            try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); }
-                        });
-                    } else {
-                        log.warn("[WS] 线程池已关闭,不执行重连");
+                    if (isDestroyed.get() || sharedExecutor == null || sharedExecutor.isShutdown() || sharedExecutor.isTerminated()) {
+                        return;
                     }
+                    sharedExecutor.execute(() -> {
+                        try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); }
+                    });
                 }
 
                 @Override
@@ -264,11 +264,9 @@
                 return;
             }
             if ("subscribe".equals(event)) {
-                log.info("[WS] 订阅成功: {}", response.getJSONObject("arg"));
                 return;
             }
             if ("unsubscribe".equals(event)) {
-                log.info("[WS] 取消订阅成功: {}", response.getJSONObject("arg"));
                 return;
             }
             if ("error".equals(event)) {
@@ -281,7 +279,14 @@
             }
             String op = response.getString("op");
             if ("order".equals(op) || "batch-orders".equals(op)) {
-                log.info("[WS] 收到下单推送结果: {}", JSON.toJSONString(response.get("data")));
+                JSONArray dataArr = response.getJSONArray("data");
+                if (dataArr != null && !dataArr.isEmpty()) {
+                    JSONObject first = dataArr.getJSONObject(0);
+                    String sCode = first.getString("sCode");
+                    if (sCode != null && !"0".equals(sCode)) {
+                        log.error("[WS] 下单失败, sCode:{}, sMsg:{}", sCode, first.getString("sMsg"));
+                    }
+                }
                 return;
             }
             for (OkxChannelHandler handler : channelHandlers) {
@@ -307,29 +312,62 @@
 
     private void checkHeartbeatTimeout() {
         if (!isConnected.get()) return;
-        if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing();
+        if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) {
+            log.info("[WS] 心跳超时,发送ping");
+            sendPing();
+        }
     }
 
     private void sendPing() {
         try {
             if (webSocketClient != null && webSocketClient.isOpen()) {
                 webSocketClient.send("ping");
-                log.debug("[WS] 发送 ping 请求");
+                log.info("[WS] 发送ping请求");
             }
-        } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); }
+        } catch (Exception e) { log.warn("[WS] 发送ping失败", e); }
     }
 
     private synchronized void cancelPongTimeout() {
         if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
     }
 
+    private int reconnectAttempt = 0;
+
     private void reconnectWithBackoff() throws InterruptedException {
-        int attempt = 0, maxAttempts = 3;
-        long delayMs = 5000;
-        while (attempt < maxAttempts) {
-            try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; }
+        if (!reconnecting.compareAndSet(false, true)) {
+            return;
         }
-        log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts);
+        try {
+            long delayMs = 5000L;
+            while (!isDestroyed.get() && !isConnected.get()) {
+                try {
+                    connect();
+                    for (int i = 0; i < 20 && !isDestroyed.get() && !isConnected.get(); i++) {
+                        Thread.sleep(1000);
+                    }
+                    if (isConnected.get()) {
+                        reconnectAttempt = 0;
+                        log.info("[WS] 重连成功");
+                        return;
+                    }
+                    if (isDestroyed.get()) {
+                        return;
+                    }
+                } catch (Exception e) {
+                    log.warn("[WS] 第{}次重连失败: {}", reconnectAttempt + 1, e.getMessage());
+                }
+                reconnectAttempt++;
+                log.info("[WS] 第{}次重连等待{}秒...", reconnectAttempt, delayMs / 1000);
+                for (long waited = 0; waited < delayMs && !isDestroyed.get(); waited += 1000) {
+                    Thread.sleep(1000);
+                    if (isConnected.get()) return;
+                }
+                long jitter = (long) (Math.random() * 3000);
+                delayMs = Math.min(delayMs * 2, 60000L) + jitter;
+            }
+        } finally {
+            reconnecting.set(false);
+        }
     }
 
     private void shutdownExecutorGracefully(ExecutorService executor) {

--
Gitblit v1.9.1