From d09c2a58f85aafc92d6bff7a1131fd0b376eee03 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Fri, 12 Dec 2025 15:15:39 +0800
Subject: [PATCH] feat(price): 新增开仓价和持仓量数据处理功能

---
 src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java |  170 ++++++++++++++++++++++++++++++++++++++++++++++++--------
 1 files changed, 146 insertions(+), 24 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
index 1ff740a..4b1200a 100644
--- a/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
@@ -41,7 +41,9 @@
     private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
 
     private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public";
-    private static final String CHANNEL = "mark-price";
+    private static final String CHANNEL_MARK_PRICE = "mark-price";
+    private static final String CHANNEL_INDEX_TICKERS = "index-tickers";
+    private static final String CHANNEL_OPEN_INTEREST = "open-interest";
 
     private static final String[] INST_IDS = {
             "EOS-USDT","BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT"
@@ -98,6 +100,8 @@
                     log.info("OKX New Price WebSocket连接成功");
                     resetHeartbeatTimer();
                     subscribeChannels();
+                    subscribeIndexChannels();
+                    subscribeOpenInterestChannels();
                 }
 
                 @Override
@@ -146,7 +150,48 @@
         JSONArray argsArray = new JSONArray();
         for (String instId : INST_IDS) {
             JSONObject arg = new JSONObject();
-            arg.put("channel", CHANNEL);
+            arg.put("channel", CHANNEL_MARK_PRICE);
+            arg.put("instId", instId);
+            argsArray.add(arg);
+        }
+
+        subscribeMsg.put("args", argsArray);
+        webSocketClient.send(subscribeMsg.toJSONString());
+        log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
+    }
+
+    /**
+     * 订阅指定交易对的价格通道。
+     * 构造订阅请求并发送给服务端。
+     */
+    private void subscribeIndexChannels() {
+        JSONObject subscribeMsg = new JSONObject();
+        subscribeMsg.put("op", "subscribe");
+
+        JSONArray argsArray = new JSONArray();
+        for (String instId : INST_IDS) {
+            JSONObject arg = new JSONObject();
+            arg.put("channel", CHANNEL_INDEX_TICKERS);
+            arg.put("instId", instId);
+            argsArray.add(arg);
+        }
+
+        subscribeMsg.put("args", argsArray);
+        webSocketClient.send(subscribeMsg.toJSONString());
+        log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
+    }
+    /**
+     * 订阅指定交易对的价格通道。
+     * 构造订阅请求并发送给服务端。
+     */
+    private void subscribeOpenInterestChannels() {
+        JSONObject subscribeMsg = new JSONObject();
+        subscribeMsg.put("op", "subscribe");
+
+        JSONArray argsArray = new JSONArray();
+        for (String instId : INST_IDS) {
+            JSONObject arg = new JSONObject();
+            arg.put("channel", CHANNEL_OPEN_INTEREST);
             arg.put("instId", instId);
             argsArray.add(arg);
         }
@@ -190,35 +235,112 @@
      * @param response 包含价格数据的 JSON 对象
      */
     private void processPushData(JSONObject response) {
-        try {
-            JSONArray dataArray = response.getJSONArray("data");
-            if (dataArray != null && !dataArray.isEmpty()) {
-                for (int i = 0; i < dataArray.size(); i++) {
-                    try {
-                        JSONObject priceData = dataArray.getJSONObject(i);
-                        String instId = priceData.getString("instId");
-                        String markPx = priceData.getString("markPx");
-                        String ts = priceData.getString("ts");
+        JSONObject arg = response.getJSONObject("arg");
+        if (arg == null) {
+            log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response);
+            return;
+        }
 
-                        String redisKey = buildRedisKey(instId);
-                        redisUtils.set(redisKey, markPx);
+        String channel = arg.getString("channel");
+        if (channel == null) {
+            log.warn("无效的推送数据,缺少 'channel' 字段{}",response);
+            return;
+        }
 
-                        String symbol = CoinTypeConvert.okxConvert(instId);
-                        if (symbol != null) {
-                            redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
-                            websocketPriceService.comparePriceAsc(symbol, markPx);
-                            websocketPriceService.comparePriceDesc(symbol, markPx);
+        if (CHANNEL_MARK_PRICE.equals(channel)) {
+            try {
+                JSONArray dataArray = response.getJSONArray("data");
+                if (dataArray != null && !dataArray.isEmpty()) {
+                    for (int i = 0; i < dataArray.size(); i++) {
+                        try {
+                            JSONObject priceData = dataArray.getJSONObject(i);
+                            String instId = priceData.getString("instId");
+                            String markPx = priceData.getString("markPx");
+                            String ts = priceData.getString("ts");
+
+                            String redisKey = buildRedisKey(instId);
+                            redisUtils.set(redisKey, markPx);
+
+                            String symbol = CoinTypeConvert.okxConvert(instId);
+                            if (symbol != null) {
+                                redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
+                                websocketPriceService.comparePriceAsc(symbol, markPx);
+                                websocketPriceService.comparePriceDesc(symbol, markPx);
+                            }
+
+                            log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
+                        } catch (Exception innerEx) {
+                            log.warn("处理单条价格数据失败", innerEx);
                         }
-
-                        log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
-                    } catch (Exception innerEx) {
-                        log.warn("处理单条价格数据失败", innerEx);
                     }
                 }
+            } catch (Exception e) {
+                log.error("处理价格推送数据失败", e);
             }
-        } catch (Exception e) {
-            log.error("处理价格推送数据失败", e);
+        }else if (CHANNEL_INDEX_TICKERS.equals(channel)) {
+            try {
+                JSONArray dataArray = response.getJSONArray("data");
+                if (dataArray != null && !dataArray.isEmpty()) {
+                    for (int i = 0; i < dataArray.size(); i++) {
+                        try {
+                            JSONObject priceData = dataArray.getJSONObject(i);
+                            String instId = priceData.getString("instId");
+                            String open24h = priceData.getString("open24h");
+                            String high24h = priceData.getString("high24h");
+                            String low24h = priceData.getString("low24h");
+                            String sodUtc0 = priceData.getString("sodUtc0");
+                            String sodUtc8 = priceData.getString("sodUtc8");
+                            String ts = priceData.getString("ts");
+
+                            String redisKey = "open:" + buildRedisKey(instId);
+                            redisUtils.set(redisKey, open24h);
+
+                            String symbol = CoinTypeConvert.okxConvert(instId);
+                            if (symbol != null) {
+                                redisUtils.set(CoinTypeConvert.convertToOpenKey(symbol), open24h);
+                            }
+
+                            log.debug("更新开仓价格: {} = {}, 时间: {}", redisKey, open24h, ts);
+                        } catch (Exception innerEx) {
+                            log.warn("处理单条价格数据失败", innerEx);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.error("处理价格推送数据失败", e);
+            }
+        }else if (CHANNEL_OPEN_INTEREST.equals(channel)) {
+            try {
+                JSONArray dataArray = response.getJSONArray("data");
+                if (dataArray != null && !dataArray.isEmpty()) {
+                    for (int i = 0; i < dataArray.size(); i++) {
+                        try {
+                            JSONObject priceData = dataArray.getJSONObject(i);
+                            String instId = priceData.getString("instId");
+                            String oiUsd = priceData.getString("oiUsd");
+                            String ts = priceData.getString("ts");
+
+                            String redisKey = "volume:" + buildRedisKey(instId);
+                            redisUtils.set(redisKey, oiUsd);
+
+                            String symbol = CoinTypeConvert.okxConvert(instId);
+                            if (symbol != null) {
+                                redisUtils.set(CoinTypeConvert.convertToVolumeKey(symbol), oiUsd);
+                            }
+
+                            log.debug("更新持仓量: {} = {}, 时间: {}", redisKey, oiUsd, ts);
+                        } catch (Exception innerEx) {
+                            log.warn("处理单条价格数据失败", innerEx);
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                log.error("处理价格推送数据失败", e);
+            }
         }
+
+
+
     }
 
     /**

--
Gitblit v1.9.1