Administrator
2025-12-16 6ee948c0a97f2d92814af3cce5d794e7b9d6989b
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -30,15 +30,11 @@
 * @author Administrator
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "quant", havingValue = "true")
public class OkxQuantWebSocketClient {
    @Autowired
    private WangGeService wangGeService;
    @Autowired
    private CaoZuoService caoZuoService;
    @Autowired
    private RedisUtils redisUtils;
    private final WangGeService wangGeService;
    private final CaoZuoService caoZuoService;
    private final RedisUtils redisUtils;
    private final ExchangeInfoEnum account;
    private WebSocketClient webSocketClient;
    private ScheduledExecutorService heartbeatExecutor;
@@ -48,6 +44,14 @@
    // 连接状态标志
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    private final AtomicBoolean isConnecting = new AtomicBoolean(false);
    public OkxQuantWebSocketClient(ExchangeInfoEnum account, WangGeService wangGeService,
                                   CaoZuoService caoZuoService, RedisUtils redisUtils) {
        this.account = account;
        this.wangGeService = wangGeService;
        this.caoZuoService = caoZuoService;
        this.redisUtils = redisUtils;
    }
    private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/private";
    private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/private";
@@ -169,12 +173,11 @@
        }
        
        try {
            InstrumentsWs.handleEvent();
            wangGeService.initWangGe();
            InstrumentsWs.handleEvent(account.name());
            SSLConfig.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            String WS_URL = WS_URL_MONIPAN;
            if (ExchangeInfoEnum.OKX_UAT.isAccountType()){
            if (account.isAccountType()){
                WS_URL = WS_URL_SHIPAN;
            }
            URI uri = new URI(WS_URL);
@@ -199,7 +202,7 @@
                    // 棜查应用是否正在关闭
                    if (!sharedExecutor.isShutdown()) {
                        resetHeartbeatTimer();
                        websocketLogin();
                        websocketLogin(account);
                    } else {
                        log.warn("应用正在关闭,忽略WebSocket连接成功回调");
                    }
@@ -249,8 +252,8 @@
        }
    }
    private void websocketLogin() {
        LoginWs.websocketLogin(webSocketClient);
    private void websocketLogin(ExchangeInfoEnum account) {
        LoginWs.websocketLogin(webSocketClient, account);
    }
    private void subscribeBalanceAndPositionChannel(String option) {
@@ -278,7 +281,7 @@
    private void handleWebSocketMessage(String message) {
        try {
            if ("pong".equals(message)) {
                log.debug("收到心跳响应");
                log.debug("{}: 收到心跳响应", account.name());
                cancelPongTimeout();
                return;
            }
@@ -289,26 +292,26 @@
                String code = response.getString("code");
                if ("0".equals(code)) {
                    String connId = response.getString("connId");
                    log.info("WebSocket登录成功, connId: {}", connId);
                    log.info("{}: WebSocket登录成功, connId: {}", account.name(), connId);
                    subscribeAccountChannel(SUBSCRIBE);
                    subscribeOrderInfoChannel(SUBSCRIBE);
                    subscribePositionChannel(SUBSCRIBE);
                } else {
                    log.error("WebSocket登录失败, code: {}, msg: {}", code, response.getString("msg"));
                    log.error("{}: WebSocket登录失败, code: {}, msg: {}", account.name(), code, response.getString("msg"));
                }
            } else if ("subscribe".equals(event)) {
                subscribeEvent(response);
            } else if ("error".equals(event)) {
                log.error("订阅错误: code={}, msg={}",
                         response.getString("code"), response.getString("msg"));
                log.error("{}: 订阅错误: code={}, msg={}",
                         account.name(), response.getString("code"), response.getString("msg"));
            } else if ("channel-conn-count".equals(event)) {
                log.info("连接限制更新: channel={}, connCount={}",
                         response.getString("channel"), response.getString("connCount"));
                log.info("{}: 连接限制更新: channel={}, connCount={}",
                         account.name(), response.getString("channel"), response.getString("connCount"));
            } else {
                processPushData(response);
            }
        } catch (Exception e) {
            log.error("处理WebSocket消息失败: {}", message, e);
            log.error("{}: 处理WebSocket消息失败: {}", account.name(), message, e);
        }
    }
@@ -325,13 +328,13 @@
            return;
        }
        if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) {
            OrderInfoWs.initEvent(response);
            OrderInfoWs.initEvent(response, account.name());
        }
        if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
            AccountWs.initEvent(response);
            AccountWs.initEvent(response, account.name());
        }
        if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
            PositionsWs.initEvent(response);
            PositionsWs.initEvent(response, account.name());
        }
    }
@@ -347,30 +350,32 @@
            if (TradeOrderWs.ORDERWS_CHANNEL.equals(op)) {
                // 直接使用Object类型接收,避免强制类型转换
                Object data = response.get("data");
                log.info("收到下单推送结果: {}", JSON.toJSONString(data));
                log.info("{}: 收到下单推送结果: {}", account.name(), JSON.toJSONString(data));
                return;
            }
        }
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null) {
            log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response);
            log.warn("{}: 无效的推送数据,缺少 'arg' 字段 :{}", account.name(), response);
            return;
        }
        String channel = arg.getString("channel");
        if (channel == null) {
            log.warn("无效的推送数据,缺少 'channel' 字段{}",response);
            log.warn("{}: 无效的推送数据,缺少 'channel' 字段{}", account.name(), response);
            return;
        }
        // 注意:当前实现中,OrderInfoWs等类使用静态Map存储数据
        // 这会导致多账号之间的数据冲突。需要进一步修改这些类的设计,让数据存储与特定账号关联
        if (OrderInfoWs.ORDERINFOWS_CHANNEL.equals(channel)) {
            OrderInfoWs.handleEvent(response, redisUtils);
            OrderInfoWs.handleEvent(response, redisUtils, account.name());
        }else if (AccountWs.ACCOUNTWS_CHANNEL.equals(channel)) {
            AccountWs.handleEvent(response);
            String side = caoZuoService.caoZuo();
            TradeOrderWs.orderEvent(webSocketClient, side);
            AccountWs.handleEvent(response, account.name());
            String side = caoZuoService.caoZuo(account.name());
            TradeOrderWs.orderEvent(webSocketClient, side, account.name());
        } else if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) {
            PositionsWs.handleEvent(response);
            PositionsWs.handleEvent(response, account.name());
        } else if (BalanceAndPositionWs.CHANNEL_NAME.equals(channel)) {
            BalanceAndPositionWs.handleEvent(response);
        }