Administrator
2 days ago 6dcaa1d6d256cd7de744a04b2576111c1a7902c7
src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
@@ -41,10 +41,16 @@
    private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
    private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public";
    private static final String CHANNEL = "mark-price";
    private static final String CHANNEL_MARK_PRICE = "mark-price";
    private static final String CHANNEL_INDEX_TICKERS = "index-tickers";
    private static final String CHANNEL_OPEN_INTEREST = "open-interest";
    private static final String[] INST_IDS = {
            "BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT"
    };
    //BTC-USDT-SWAP
    private static final String[] INST_IDS_INTEREST = {
            "BTC-USDT-SWAP", "ETH-USDT-SWAP", "XRP-USDT-SWAP", "LTC-USDT-SWAP", "BCH-USDT-SWAP", "ETC-USDT-SWAP"
    };
    // 心跳超时时间(秒),小于30秒
@@ -98,6 +104,8 @@
                    log.info("OKX New Price WebSocket连接成功");
                    resetHeartbeatTimer();
                    subscribeChannels();
                    subscribeIndexChannels();
                    subscribeOpenInterestChannels();
                }
                @Override
@@ -146,7 +154,48 @@
        JSONArray argsArray = new JSONArray();
        for (String instId : INST_IDS) {
            JSONObject arg = new JSONObject();
            arg.put("channel", CHANNEL);
            arg.put("channel", CHANNEL_MARK_PRICE);
            arg.put("instId", instId);
            argsArray.add(arg);
        }
        subscribeMsg.put("args", argsArray);
        webSocketClient.send(subscribeMsg.toJSONString());
        log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
    }
    /**
     * 订阅指定交易对的价格通道。
     * 构造订阅请求并发送给服务端。
     */
    private void subscribeIndexChannels() {
        JSONObject subscribeMsg = new JSONObject();
        subscribeMsg.put("op", "subscribe");
        JSONArray argsArray = new JSONArray();
        for (String instId : INST_IDS) {
            JSONObject arg = new JSONObject();
            arg.put("channel", CHANNEL_INDEX_TICKERS);
            arg.put("instId", instId);
            argsArray.add(arg);
        }
        subscribeMsg.put("args", argsArray);
        webSocketClient.send(subscribeMsg.toJSONString());
        log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
    }
    /**
     * 订阅指定交易对的价格通道。
     * 构造订阅请求并发送给服务端。
     */
    private void subscribeOpenInterestChannels() {
        JSONObject subscribeMsg = new JSONObject();
        subscribeMsg.put("op", "subscribe");
        JSONArray argsArray = new JSONArray();
        for (String instId : INST_IDS_INTEREST) {
            JSONObject arg = new JSONObject();
            arg.put("channel", CHANNEL_OPEN_INTEREST);
            arg.put("instId", instId);
            argsArray.add(arg);
        }
@@ -190,35 +239,107 @@
     * @param response 包含价格数据的 JSON 对象
     */
    private void processPushData(JSONObject response) {
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray != null && !dataArray.isEmpty()) {
                for (int i = 0; i < dataArray.size(); i++) {
                    try {
                        JSONObject priceData = dataArray.getJSONObject(i);
                        String instId = priceData.getString("instId");
                        String markPx = priceData.getString("markPx");
                        String ts = priceData.getString("ts");
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null) {
            log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response);
            return;
        }
                        String redisKey = buildRedisKey(instId);
                        redisUtils.set(redisKey, markPx);
        String channel = arg.getString("channel");
        if (channel == null) {
            log.warn("无效的推送数据,缺少 'channel' 字段{}",response);
            return;
        }
                        String symbol = CoinTypeConvert.okxConvert(instId);
                        if (symbol != null) {
                            redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
                            websocketPriceService.comparePriceAsc(symbol, markPx);
                            websocketPriceService.comparePriceDesc(symbol, markPx);
        if (CHANNEL_MARK_PRICE.equals(channel)) {
            try {
                JSONArray dataArray = response.getJSONArray("data");
                if (dataArray != null && !dataArray.isEmpty()) {
                    for (int i = 0; i < dataArray.size(); i++) {
                        try {
                            JSONObject priceData = dataArray.getJSONObject(i);
                            String instId = priceData.getString("instId");
                            String markPx = priceData.getString("markPx");
                            String ts = priceData.getString("ts");
                            String redisKey = buildRedisKey(instId);
                            redisUtils.set(redisKey, markPx);
                            String symbol = CoinTypeConvert.okxConvert(instId);
                            if (symbol != null) {
                                redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
                                websocketPriceService.comparePriceAsc(symbol, markPx);
                                websocketPriceService.comparePriceDesc(symbol, markPx);
                            }
                            log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
                        } catch (Exception innerEx) {
                            log.warn("处理单条价格数据失败", innerEx);
                        }
                        log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
                    } catch (Exception innerEx) {
                        log.warn("处理单条价格数据失败", innerEx);
                    }
                }
            } catch (Exception e) {
                log.error("处理价格推送数据失败", e);
            }
        } catch (Exception e) {
            log.error("处理价格推送数据失败", e);
        }else if (CHANNEL_INDEX_TICKERS.equals(channel)) {
            try {
                JSONArray dataArray = response.getJSONArray("data");
                if (dataArray != null && !dataArray.isEmpty()) {
                    for (int i = 0; i < dataArray.size(); i++) {
                        try {
                            JSONObject priceData = dataArray.getJSONObject(i);
                            String instId = priceData.getString("instId");
                            String open24h = priceData.getString("open24h");
                            String high24h = priceData.getString("high24h");
                            String low24h = priceData.getString("low24h");
                            String sodUtc0 = priceData.getString("sodUtc0");
                            String sodUtc8 = priceData.getString("sodUtc8");
                            String ts = priceData.getString("ts");
                            String redisKey = "open:" + buildRedisKey(instId);
                            redisUtils.set(redisKey, open24h);
                            String symbol = CoinTypeConvert.okxConvert(instId);
                            if (symbol != null) {
                                redisUtils.set(CoinTypeConvert.convertToOpenKey(symbol), open24h);
                            }
                            log.debug("更新开仓价格: {} = {}, 时间: {}", redisKey, open24h, ts);
                        } catch (Exception innerEx) {
                            log.warn("处理单条价格数据失败", innerEx);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("处理价格推送数据失败", e);
            }
        }else if (CHANNEL_OPEN_INTEREST.equals(channel)) {
            try {
                JSONArray dataArray = response.getJSONArray("data");
                if (dataArray != null && !dataArray.isEmpty()) {
                    for (int i = 0; i < dataArray.size(); i++) {
                        try {
                            JSONObject priceData = dataArray.getJSONObject(i);
                            String instId = priceData.getString("instId");
                            String oiUsd = priceData.getString("oiUsd");
                            String ts = priceData.getString("ts");
                            String redisKey = "volume:" + buildRedisKey(instId);
                            redisUtils.set(redisKey, oiUsd);
                            log.debug("更新持仓量: {} = {}, 时间: {}", redisKey, oiUsd, ts);
                        } catch (Exception innerEx) {
                            log.warn("处理单条价格数据失败", innerEx);
                        }
                    }
                }
            } catch (Exception e) {
                log.error("处理价格推送数据失败", e);
            }
        }
    }
    /**