Administrator
4 days ago 75836d48785da412552e67050e2332a74da2a435
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -1,10 +1,13 @@
package com.xcong.excoin.modules.okxNewPrice;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService;
import com.xcong.excoin.modules.okxNewPrice.okxWs.*;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.ExchangeInfoEnum;
import com.xcong.excoin.modules.okxNewPrice.okxWs.param.TradeRequestParam;
import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService;
import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService;
import com.xcong.excoin.utils.RedisUtils;
@@ -19,6 +22,7 @@
import javax.annotation.PreDestroy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -31,10 +35,11 @@
 */
@Slf4j
public class OkxQuantWebSocketClient {
    private final WangGeService wangGeService;
    private final CaoZuoService caoZuoService;
    private final RedisUtils redisUtils;
    private final ExchangeInfoEnum account;
    private final CaoZuoService caoZuoService;
    private final WangGeListService wangGeListService;
    private WebSocketClient webSocketClient;
    private ScheduledExecutorService heartbeatExecutor;
@@ -45,12 +50,32 @@
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    private final AtomicBoolean isConnecting = new AtomicBoolean(false);
    
    public OkxQuantWebSocketClient(ExchangeInfoEnum account, WangGeService wangGeService,
                                   CaoZuoService caoZuoService, RedisUtils redisUtils) {
    /**
     * 获取WebSocketClient实例
     * @return WebSocketClient实例
     */
    public WebSocketClient getWebSocketClient() {
        return webSocketClient;
    }
    /**
     * 获取账号名称
     * @return 账号名称
     */
    public String getAccountName() {
        return account.name();
    }
    public OkxQuantWebSocketClient(
            ExchangeInfoEnum account,
            RedisUtils redisUtils,
            CaoZuoService caoZuoService,
            WangGeListService wangGeListService
    ) {
        this.account = account;
        this.wangGeService = wangGeService;
        this.caoZuoService = caoZuoService;
        this.redisUtils = redisUtils;
        this.caoZuoService = caoZuoService;
        this.wangGeListService = wangGeListService;
    }
    private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/private";
@@ -95,22 +120,6 @@
     * 销毁方法,在 Spring Bean 销毁前执行。
     * 关闭 WebSocket 连接、停止心跳定时器及相关的线程资源。
     */
//    @PreDestroy
//    public void destroy() {
//        if (webSocketClient != null && webSocketClient.isOpen()) {
//            subscribeAccountChannel(UNSUBSCRIBE);
//            subscribePositionChannel(UNSUBSCRIBE);
//            subscribeOrderInfoChannel(UNSUBSCRIBE);
//            webSocketClient.close();
//        }
//        shutdownExecutorGracefully(heartbeatExecutor);
//        if (pongTimeoutFuture != null) {
//            pongTimeoutFuture.cancel(true);
//        }
//        shutdownExecutorGracefully(sharedExecutor);
//
//        // 移除了 reconnectScheduler 的关闭操作
//    }
    @PreDestroy
    public void destroy() {
        log.info("开始销毁OkxQuantWebSocketClient");
@@ -174,7 +183,6 @@
        
        try {
            InstrumentsWs.handleEvent(account.name());
            wangGeService.initWangGe();
            SSLConfig.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            String WS_URL = WS_URL_MONIPAN;
@@ -370,14 +378,17 @@
        // 注意:当前实现中,OrderInfoWs等类使用静态Map存储数据
        // 这会导致多账号之间的数据冲突。需要进一步修改这些类的设计,让数据存储与特定账号关联
        if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) {
            OrderInfoWs.handleEvent(response, redisUtils, account.name());
            TradeRequestParam tradeRequestParam = OrderInfoWs.handleEvent(response, redisUtils, account.name());
            TradeOrderWs.orderZhiYingEvent(webSocketClient, tradeRequestParam);
        }else if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
            AccountWs.handleEvent(response, account.name());
//            String side = caoZuoService.caoZuo(account.name());
//            TradeOrderWs.orderEvent(webSocketClient, side, account.name());
        } else if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
            PositionsWs.handleEvent(response, account.name());
        } else if (BalanceAndPositionWs.CHANNEL_NAME.equals(channel)) {
            BalanceAndPositionWs.handleEvent(response);
        }
    }
@@ -474,7 +485,7 @@
        }
        
        int attempt = 0;
        int maxAttempts = 5;
        int maxAttempts = 3;
        long delayMs = 1000;
        while (attempt < maxAttempts && !isConnected.get()) {