Administrator
5 days ago 23ece6103fd890655f0eef79331d3d73921611a2
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -1,10 +1,13 @@
package com.xcong.excoin.modules.okxNewPrice;
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService;
import com.xcong.excoin.modules.okxNewPrice.okxWs.*;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.ExchangeInfoEnum;
import com.xcong.excoin.modules.okxNewPrice.okxWs.param.TradeRequestParam;
import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService;
import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService;
import com.xcong.excoin.utils.RedisUtils;
@@ -19,6 +22,7 @@
import javax.annotation.PreDestroy;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -30,15 +34,12 @@
 * @author Administrator
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "quant", havingValue = "true")
public class OkxQuantWebSocketClient {
    @Autowired
    private WangGeService wangGeService;
    @Autowired
    private CaoZuoService caoZuoService;
    @Autowired
    private RedisUtils redisUtils;
    private final RedisUtils redisUtils;
    private final ExchangeInfoEnum account;
    private final CaoZuoService caoZuoService;
    private final WangGeListService wangGeListService;
    private WebSocketClient webSocketClient;
    private ScheduledExecutorService heartbeatExecutor;
@@ -48,6 +49,34 @@
    // 连接状态标志
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    private final AtomicBoolean isConnecting = new AtomicBoolean(false);
    /**
     * 获取WebSocketClient实例
     * @return WebSocketClient实例
     */
    public WebSocketClient getWebSocketClient() {
        return webSocketClient;
    }
    /**
     * 获取账号名称
     * @return 账号名称
     */
    public String getAccountName() {
        return account.name();
    }
    public OkxQuantWebSocketClient(
            ExchangeInfoEnum account,
            RedisUtils redisUtils,
            CaoZuoService caoZuoService,
            WangGeListService wangGeListService
    ) {
        this.account = account;
        this.redisUtils = redisUtils;
        this.caoZuoService = caoZuoService;
        this.wangGeListService = wangGeListService;
    }
    private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/private";
    private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/private";
@@ -169,12 +198,11 @@
        }
        
        try {
            InstrumentsWs.handleEvent();
            wangGeService.initWangGe();
            InstrumentsWs.handleEvent(account.name());
            SSLConfig.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            String WS_URL = WS_URL_MONIPAN;
            if (ExchangeInfoEnum.OKX_UAT.isAccountType()){
            if (account.isAccountType()){
                WS_URL = WS_URL_SHIPAN;
            }
            URI uri = new URI(WS_URL);
@@ -199,7 +227,7 @@
                    // 棜查应用是否正在关闭
                    if (!sharedExecutor.isShutdown()) {
                        resetHeartbeatTimer();
                        websocketLogin();
                        websocketLogin(account);
                    } else {
                        log.warn("应用正在关闭,忽略WebSocket连接成功回调");
                    }
@@ -249,8 +277,8 @@
        }
    }
    private void websocketLogin() {
        LoginWs.websocketLogin(webSocketClient);
    private void websocketLogin(ExchangeInfoEnum account) {
        LoginWs.websocketLogin(webSocketClient, account);
    }
    private void subscribeBalanceAndPositionChannel(String option) {
@@ -278,7 +306,7 @@
    private void handleWebSocketMessage(String message) {
        try {
            if ("pong".equals(message)) {
                log.debug("收到心跳响应");
                log.debug("{}: 收到心跳响应", account.name());
                cancelPongTimeout();
                return;
            }
@@ -289,26 +317,26 @@
                String code = response.getString("code");
                if ("0".equals(code)) {
                    String connId = response.getString("connId");
                    log.info("WebSocket登录成功, connId: {}", connId);
                    log.info("{}: WebSocket登录成功, connId: {}", account.name(), connId);
                    subscribeAccountChannel(SUBSCRIBE);
                    subscribeOrderInfoChannel(SUBSCRIBE);
                    subscribePositionChannel(SUBSCRIBE);
                } else {
                    log.error("WebSocket登录失败, code: {}, msg: {}", code, response.getString("msg"));
                    log.error("{}: WebSocket登录失败, code: {}, msg: {}", account.name(), code, response.getString("msg"));
                }
            } else if ("subscribe".equals(event)) {
                subscribeEvent(response);
            } else if ("error".equals(event)) {
                log.error("订阅错误: code={}, msg={}",
                         response.getString("code"), response.getString("msg"));
                log.error("{}: 订阅错误: code={}, msg={}",
                         account.name(), response.getString("code"), response.getString("msg"));
            } else if ("channel-conn-count".equals(event)) {
                log.info("连接限制更新: channel={}, connCount={}",
                         response.getString("channel"), response.getString("connCount"));
                log.info("{}: 连接限制更新: channel={}, connCount={}",
                         account.name(), response.getString("channel"), response.getString("connCount"));
            } else {
                processPushData(response);
            }
        } catch (Exception e) {
            log.error("处理WebSocket消息失败: {}", message, e);
            log.error("{}: 处理WebSocket消息失败: {}", account.name(), message, e);
        }
    }
@@ -325,13 +353,13 @@
            return;
        }
        if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) {
            OrderInfoWs.initEvent(response);
            OrderInfoWs.initEvent(response, account.name());
        }
        if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
            AccountWs.initEvent(response);
            AccountWs.initEvent(response, account.name());
        }
        if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
            PositionsWs.initEvent(response);
            PositionsWs.initEvent(response, account.name());
        }
    }
@@ -347,31 +375,36 @@
            if (TradeOrderWs.ORDERWS_CHANNEL.equals(op)) {
                // 直接使用Object类型接收,避免强制类型转换
                Object data = response.get("data");
                log.info("收到下单推送结果: {}", JSON.toJSONString(data));
                log.info("{}: 收到下单推送结果: {}", account.name(), JSON.toJSONString(data));
                return;
            }
        }
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null) {
            log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response);
            log.warn("{}: 无效的推送数据,缺少 'arg' 字段 :{}", account.name(), response);
            return;
        }
        String channel = arg.getString("channel");
        if (channel == null) {
            log.warn("无效的推送数据,缺少 'channel' 字段{}",response);
            log.warn("{}: 无效的推送数据,缺少 'channel' 字段{}", account.name(), response);
            return;
        }
        // 注意:当前实现中,OrderInfoWs等类使用静态Map存储数据
        // 这会导致多账号之间的数据冲突。需要进一步修改这些类的设计,让数据存储与特定账号关联
        if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) {
            OrderInfoWs.handleEvent(response, redisUtils);
            List<TradeRequestParam> tradeRequestParams = OrderInfoWs.handleEvent(response, redisUtils, account.name());
            TradeOrderWs.orderZhiYingEvent(webSocketClient, tradeRequestParams);
        }else if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
            AccountWs.handleEvent(response);
            String side = caoZuoService.caoZuo();
            TradeOrderWs.orderEvent(webSocketClient, side);
            AccountWs.handleEvent(response, account.name());
        } else if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
            PositionsWs.handleEvent(response);
            PositionsWs.handleEvent(response, account.name());
        } else if (BalanceAndPositionWs.CHANNEL_NAME.equals(channel)) {
            BalanceAndPositionWs.handleEvent(response);
        }
    }
@@ -468,7 +501,7 @@
        }
        
        int attempt = 0;
        int maxAttempts = 5;
        int maxAttempts = 3;
        long delayMs = 1000;
        while (attempt < maxAttempts && !isConnected.get()) {