Administrator
2025-12-16 d81e77e9639c1f20f0e10c969c3e98410327673d
src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
@@ -3,6 +3,8 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums;
import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
@@ -28,7 +30,7 @@
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "websocket", havingValue = "true")
@ConditionalOnProperty(prefix = "app", name = "quant", havingValue = "true")
public class OkxNewPriceWebSocketClient {
    @Resource
    private WebsocketPriceService websocketPriceService;
@@ -40,12 +42,7 @@
    private volatile ScheduledFuture<?> pongTimeoutFuture;
    private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
    private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public";
    private static final String CHANNEL = "mark-price";
    private static final String[] INST_IDS = {
            "BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT"
    };
    // 心跳超时时间(秒),小于30秒
    private static final int HEARTBEAT_TIMEOUT = 10;
@@ -84,6 +81,9 @@
        }
        sharedExecutor.shutdownNow();
    }
    private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/public";
    private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/public";
    private static final boolean isAccountType = true;
    /**
     * 建立与 OKX WebSocket 服务器的连接。
@@ -91,6 +91,12 @@
     */
    private void connect() {
        try {
            SSLConfig.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            String WS_URL = WS_URL_MONIPAN;
            if (isAccountType){
                WS_URL = WS_URL_SHIPAN;
            }
            URI uri = new URI(WS_URL);
            webSocketClient = new WebSocketClient(uri) {
                @Override
@@ -144,12 +150,10 @@
        subscribeMsg.put("op", "subscribe");
        JSONArray argsArray = new JSONArray();
        for (String instId : INST_IDS) {
            JSONObject arg = new JSONObject();
            arg.put("channel", CHANNEL);
            arg.put("instId", instId);
            argsArray.add(arg);
        }
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL);
        arg.put("instId", CoinEnums.HE_YUE.getCode());
        argsArray.add(arg);
        subscribeMsg.put("args", argsArray);
        webSocketClient.send(subscribeMsg.toJSONString());
@@ -200,17 +204,9 @@
                        String markPx = priceData.getString("markPx");
                        String ts = priceData.getString("ts");
                        String redisKey = buildRedisKey(instId);
                        redisUtils.set(redisKey, markPx);
                        redisUtils.set(CoinEnums.HE_YUE.getCode(), markPx);
                        String symbol = CoinTypeConvert.okxConvert(instId);
                        if (symbol != null) {
                            redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
                            websocketPriceService.comparePriceAsc(symbol, markPx);
                            websocketPriceService.comparePriceDesc(symbol, markPx);
                        }
                        log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
                        log.debug("更新最新价格: {} = {}, 币种: {}", CoinEnums.HE_YUE.getCode(), markPx, instId);
                    } catch (Exception innerEx) {
                        log.warn("处理单条价格数据失败", innerEx);
                    }