Administrator
2025-12-15 a8ac553af7ed67e69f1466f8e97f4bfda00ab349
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -1,14 +1,10 @@
package com.xcong.excoin.modules.okxNewPrice;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService;
import com.xcong.excoin.modules.okxNewPrice.okxWs.*;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.ExchangeInfoEnum;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.OrderParamEnums;
import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService;
import com.xcong.excoin.utils.RedisUtils;
@@ -21,7 +17,6 @@
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.*;
@@ -57,10 +52,6 @@
    private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/private";
    private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/private";
    private ScheduledExecutorService reconnectScheduler;
    private final AtomicReference<Long> lastReconnectTime = new AtomicReference<>(System.currentTimeMillis());
    /**
     * 订阅频道指令
     */
@@ -86,8 +77,7 @@
        connect();
        startHeartbeat();
        
        // 添加每小时重连的定时任务
        schedulePeriodicReconnect();
        // 移除了每小时重连的定时任务
    }
    /**
@@ -108,9 +98,7 @@
        }
        shutdownExecutorGracefully(sharedExecutor);
        if (reconnectScheduler != null) {
            reconnectScheduler.shutdownNow();
        }
        // 移除了 reconnectScheduler 的关闭操作
    }
    private void shutdownExecutorGracefully(ExecutorService executor) {
@@ -273,7 +261,7 @@
                    log.error("WebSocket登录失败, code: {}, msg: {}", code, response.getString("msg"));
                }
            } else if ("subscribe".equals(event)) {
                log.info("订阅成功: {}", response.getJSONObject("arg"));
                subscribeEvent(response);
            } else if ("error".equals(event)) {
                log.error("订阅错误: code={}, msg={}",
                         response.getString("code"), response.getString("msg"));
@@ -285,6 +273,29 @@
            }
        } catch (Exception e) {
            log.error("处理WebSocket消息失败: {}", message, e);
        }
    }
    private void subscribeEvent(JSONObject response) {
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null) {
            log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response);
            return;
        }
        String channel = arg.getString("channel");
        if (channel == null) {
            log.warn("无效的推送数据,缺少 'channel' 字段{}",response);
            return;
        }
        if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) {
            OrderInfoWs.initEvent(response);
        }
        if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
            AccountWs.initEvent(response);
        }
        if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
            PositionsWs.initEvent(response);
        }
    }
@@ -320,13 +331,13 @@
            OrderInfoWs.handleEvent(response, redisUtils);
        }else if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
            AccountWs.handleEvent(response);
            String side = caoZuoService.caoZuo();
            TradeOrderWs.orderEvent(webSocketClient, side);
        } else if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
            PositionsWs.handleEvent(response);
        } else if (BalanceAndPositionWs.CHANNEL_NAME.equals(channel)) {
            BalanceAndPositionWs.handleEvent(response);
        }
        String side = caoZuoService.caoZuo();
        TradeOrderWs.orderEvent(webSocketClient, side);
    }
    /**
@@ -348,24 +359,7 @@
                HEARTBEAT_TIMEOUT, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
    }
    /**
     * 安排定期重连任务
     * 每小时执行一次重连以保持连接新鲜度
     */
    private void schedulePeriodicReconnect() {
        if (reconnectScheduler != null && !reconnectScheduler.isTerminated()) {
            reconnectScheduler.shutdownNow();
        }
        reconnectScheduler = Executors.newSingleThreadScheduledExecutor(r -> {
            Thread t = new Thread(r, "okx-scheduled-reconnect");
            t.setDaemon(true);
            return t;
        });
        // 每小时执行一次重连
        reconnectScheduler.scheduleWithFixedDelay(this::performScheduledReconnect, 60, 60, TimeUnit.MINUTES);
    }
    // 移除了 schedulePeriodicReconnect 方法
    /**
     * 重置心跳计时器。
@@ -381,19 +375,7 @@
        }
    }
    /**
     * 执行定时重连任务
     * 每小时强制重连一次以确保连接的新鲜度
     */
    private void performScheduledReconnect() {
        log.info("执行定时重连任务");
        if (webSocketClient != null && webSocketClient.isOpen()) {
            log.info("关闭当前连接准备重连");
            webSocketClient.close();
        }
        // 更新最后重连时间
        lastReconnectTime.set(System.currentTimeMillis());
    }
    // 移除了 performScheduledReconnect 方法
    /**
     * 检查心跳超时情况。