Administrator
2026-06-02 304f66653474ff7684bb3ddbed38ff7f908195ee
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxGridWsClient.java
@@ -38,12 +38,18 @@
    private static final int HEARTBEAT_TIMEOUT = 10;
    /** 模拟盘 WS 地址 */
    private static final String WS_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private";
    /** 实盘 WS 地址 */
    private static final String WS_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private";
    /** 模拟盘业务 WS 地址(K线等行情数据) */
    private static final String WS_BUSINESS_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/business";
    /** 实盘业务 WS 地址(K线等行情数据) */
    private static final String WS_BUSINESS_URL_PROD = "wss://ws.okx.com:8443/ws/v5/business";
    /** 模拟盘私有 WS 地址 */
    private static final String WS_PRIVATE_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private";
    /** 实盘私有 WS 地址 */
    private static final String WS_PRIVATE_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private";
    private final ExchangeInfoEnum account;
    private final boolean isPublic;
    private final String logPrefix;
    private WebSocketClient webSocketClient;
    private ScheduledExecutorService heartbeatExecutor;
    private volatile ScheduledFuture<?> pongTimeoutFuture;
@@ -63,8 +69,10 @@
        return t;
    });
    public OkxGridWsClient(ExchangeInfoEnum account) {
    public OkxGridWsClient(ExchangeInfoEnum account, boolean isPublic) {
        this.account = account;
        this.isPublic = isPublic;
        this.logPrefix = isPublic ? "[OKX-Grid-WS-PUB]" : "[OKX-Grid-WS-PRI]";
    }
    public void addChannelHandler(OkxGridChannelHandler handler) {
@@ -73,7 +81,7 @@
    public void init() {
        if (!isInitialized.compareAndSet(false, true)) {
            log.warn("[OKX-Grid-WS] 已初始化过,跳过重复初始化");
            log.warn("[{}] 已初始化过,跳过重复初始化", logPrefix);
            return;
        }
        connect();
@@ -81,7 +89,7 @@
    }
    public void destroy() {
        log.info("[OKX-Grid-WS] 开始销毁...");
        log.info("[{}] 开始销毁...", logPrefix);
        if (webSocketClient != null && webSocketClient.isOpen()) {
            for (OkxGridChannelHandler handler : channelHandlers) {
                handler.unsubscribe(webSocketClient);
@@ -98,18 +106,23 @@
        shutdownExecutorGracefully(heartbeatExecutor);
        if (pongTimeoutFuture != null) pongTimeoutFuture.cancel(true);
        shutdownExecutorGracefully(sharedExecutor);
        log.info("[OKX-Grid-WS] 销毁完成");
        log.info("[{}] 销毁完成", logPrefix);
    }
    private void connect() {
        if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
            log.info("[OKX-Grid-WS] 连接进行中,跳过重复请求");
            log.info("[{}] 连接进行中,跳过重复请求", logPrefix);
            return;
        }
        try {
            SSLConfig.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            String wsUrl = account.isAccountType() ? WS_URL_PROD : WS_URL_SIM;
            String wsUrl;
            if (account.isAccountType()) {
                wsUrl = isPublic ? WS_BUSINESS_URL_PROD : WS_PRIVATE_URL_PROD;
            } else {
                wsUrl = isPublic ? WS_BUSINESS_URL_SIM : WS_PRIVATE_URL_SIM;
            }
            URI uri = new URI(wsUrl);
            if (webSocketClient != null) {
@@ -119,12 +132,16 @@
            webSocketClient = new WebSocketClient(uri) {
                @Override
                public void onOpen(ServerHandshake handshake) {
                    log.info("[OKX-Grid-WS] 连接成功");
                    log.info("[{}] 连接成功", logPrefix);
                    isConnected.set(true);
                    isConnecting.set(false);
                    if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
                        resetHeartbeatTimer();
                        wsLogin();
                        if (isPublic) {
                            subscribeAllHandlers();
                        } else {
                            wsLogin();
                        }
                    }
                }
@@ -137,26 +154,27 @@
                @Override
                public void onClose(int code, String reason, boolean remote) {
                    log.warn("[OKX-Grid-WS] 连接关闭, code:{}, reason:{}", code, reason);
                    log.warn("[{}] 连接关闭, code:{}, reason:{}", logPrefix, code, reason);
                    isConnected.set(false);
                    isConnecting.set(false);
                    cancelPongTimeout();
                    if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
                        sharedExecutor.execute(() -> {
                            try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[OKX-Grid-WS] 重连失败", e); }
                            try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[{}] 重连失败", logPrefix, e); }
                        });
                    }
                }
                @Override
                public void onError(Exception ex) {
                    log.error("[OKX-Grid-WS] 发生错误", ex);
                    log.error("[{}] 发生错误", logPrefix, ex);
                    isConnected.set(false);
                }
            };
            webSocketClient.setConnectionLostTimeout(0);
            webSocketClient.connect();
        } catch (URISyntaxException e) {
            log.error("[OKX-Grid-WS] URI格式错误", e);
            log.error("[{}] URI格式错误", logPrefix, e);
            isConnecting.set(false);
        }
    }
@@ -180,45 +198,50 @@
            args.add(loginArgs);
            msg.put("args", args);
            webSocketClient.send(msg.toJSONString());
            log.info("[OKX-Grid-WS] 发送登录请求");
            log.info("[{}] 发送登录请求", logPrefix);
        } catch (Exception e) {
            log.error("[OKX-Grid-WS] 登录请求构建失败", e);
            log.error("[{}] 登录请求构建失败", logPrefix, e);
        }
    }
    private void subscribeAllHandlers() {
        log.info("[{}] 开始订阅频道", logPrefix);
        for (OkxGridChannelHandler handler : channelHandlers) {
            handler.subscribe(webSocketClient);
        }
    }
    private void handleMessage(String message) {
        try {
            if ("pong".equals(message)) {
                log.debug("[{}] 收到 pong", logPrefix);
                return;
            }
            JSONObject response = JSON.parseObject(message);
            String event = response.getString("event");
            String op = response.getString("op");
            // 登录成功 → 订阅所有频道
            if ("login".equals(event) || ("login".equals(op))) {
                log.info("[OKX-Grid-WS] 登录成功, 开始订阅频道");
                for (OkxGridChannelHandler handler : channelHandlers) {
                    handler.subscribe(webSocketClient);
                }
                log.info("[{}] 登录成功, 开始订阅频道", logPrefix);
                subscribeAllHandlers();
                return;
            }
            // 订阅确认
            if ("subscribe".equals(event) || "unsubscribe".equals(event)) {
                log.info("[OKX-Grid-WS] {}事件: {}", event, response.getString("arg"));
                log.info("[{}] {}事件: {}", logPrefix, event, response.getString("arg"));
                return;
            }
            // 错误
            if ("error".equals(event)) {
                log.error("[OKX-Grid-WS] 错误: {}", message);
                log.error("[{}] 错误: {}", logPrefix, message);
                return;
            }
            // 数据推送 → 路由到 handler
            for (OkxGridChannelHandler handler : channelHandlers) {
                if (handler.handleMessage(response)) return;
            }
        } catch (Exception e) {
            log.error("[OKX-Grid-WS] 处理消息失败: {}", message, e);
            log.error("[{}] 处理消息失败: {}", logPrefix, message, e);
        }
    }
@@ -250,9 +273,9 @@
        try {
            if (webSocketClient != null && webSocketClient.isOpen()) {
                webSocketClient.send("ping");
                log.debug("[OKX-Grid-WS] 发送 ping");
                log.debug("[{}] 发送 ping", logPrefix);
            }
        } catch (Exception e) { log.warn("[OKX-Grid-WS] 发送 ping 失败", e); }
        } catch (Exception e) { log.warn("[{}] 发送 ping 失败", logPrefix, e); }
    }
    private synchronized void cancelPongTimeout() {
@@ -264,9 +287,9 @@
        long delayMs = 5000;
        while (attempt < maxAttempts) {
            try { Thread.sleep(delayMs); connect(); return; }
            catch (Exception e) { log.warn("[OKX-Grid-WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; }
            catch (Exception e) { log.warn("[{}] 第{}次重连失败", logPrefix, attempt + 1, e); delayMs *= 2; attempt++; }
        }
        log.error("[OKX-Grid-WS] 超过最大重试次数({}),放弃重连", maxAttempts);
        log.error("[{}] 超过最大重试次数({}),放弃重连", logPrefix, maxAttempts);
    }
    private void shutdownExecutorGracefully(ExecutorService executor) {