From d09c2a58f85aafc92d6bff7a1131fd0b376eee03 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Fri, 12 Dec 2025 15:15:39 +0800
Subject: [PATCH] feat(price): 新增开仓价和持仓量数据处理功能
---
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java | 42 ++++++++++
src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java | 170 ++++++++++++++++++++++++++++++++++++------
src/main/java/com/xcong/excoin/modules/symbols/service/impl/SymbolsServiceImpl.java | 13 +--
3 files changed, 193 insertions(+), 32 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
index 1ff740a..4b1200a 100644
--- a/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/newPrice/OkxNewPriceWebSocketClient.java
@@ -41,7 +41,9 @@
private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
private static final String WS_URL = "wss://ws.okx.com:8443/ws/v5/public";
- private static final String CHANNEL = "mark-price";
+ private static final String CHANNEL_MARK_PRICE = "mark-price";
+ private static final String CHANNEL_INDEX_TICKERS = "index-tickers";
+ private static final String CHANNEL_OPEN_INTEREST = "open-interest";
private static final String[] INST_IDS = {
"EOS-USDT","BTC-USDT", "ETH-USDT", "XRP-USDT", "LTC-USDT", "BCH-USDT", "ETC-USDT"
@@ -98,6 +100,8 @@
log.info("OKX New Price WebSocket连接成功");
resetHeartbeatTimer();
subscribeChannels();
+ subscribeIndexChannels();
+ subscribeOpenInterestChannels();
}
@Override
@@ -146,7 +150,48 @@
JSONArray argsArray = new JSONArray();
for (String instId : INST_IDS) {
JSONObject arg = new JSONObject();
- arg.put("channel", CHANNEL);
+ arg.put("channel", CHANNEL_MARK_PRICE);
+ arg.put("instId", instId);
+ argsArray.add(arg);
+ }
+
+ subscribeMsg.put("args", argsArray);
+ webSocketClient.send(subscribeMsg.toJSONString());
+ log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
+ }
+
+ /**
+ * 订阅指定交易对的价格通道。
+ * 构造订阅请求并发送给服务端。
+ */
+ private void subscribeIndexChannels() {
+ JSONObject subscribeMsg = new JSONObject();
+ subscribeMsg.put("op", "subscribe");
+
+ JSONArray argsArray = new JSONArray();
+ for (String instId : INST_IDS) {
+ JSONObject arg = new JSONObject();
+ arg.put("channel", CHANNEL_INDEX_TICKERS);
+ arg.put("instId", instId);
+ argsArray.add(arg);
+ }
+
+ subscribeMsg.put("args", argsArray);
+ webSocketClient.send(subscribeMsg.toJSONString());
+ log.info("已发送价格订阅请求,订阅通道数: {}", argsArray.size());
+ }
+ /**
+ * 订阅指定交易对的价格通道。
+ * 构造订阅请求并发送给服务端。
+ */
+ private void subscribeOpenInterestChannels() {
+ JSONObject subscribeMsg = new JSONObject();
+ subscribeMsg.put("op", "subscribe");
+
+ JSONArray argsArray = new JSONArray();
+ for (String instId : INST_IDS) {
+ JSONObject arg = new JSONObject();
+ arg.put("channel", CHANNEL_OPEN_INTEREST);
arg.put("instId", instId);
argsArray.add(arg);
}
@@ -190,35 +235,112 @@
* @param response 包含价格数据的 JSON 对象
*/
private void processPushData(JSONObject response) {
- try {
- JSONArray dataArray = response.getJSONArray("data");
- if (dataArray != null && !dataArray.isEmpty()) {
- for (int i = 0; i < dataArray.size(); i++) {
- try {
- JSONObject priceData = dataArray.getJSONObject(i);
- String instId = priceData.getString("instId");
- String markPx = priceData.getString("markPx");
- String ts = priceData.getString("ts");
+ JSONObject arg = response.getJSONObject("arg");
+ if (arg == null) {
+ log.warn("无效的推送数据,缺少 'arg' 字段 :{}",response);
+ return;
+ }
- String redisKey = buildRedisKey(instId);
- redisUtils.set(redisKey, markPx);
+ String channel = arg.getString("channel");
+ if (channel == null) {
+ log.warn("无效的推送数据,缺少 'channel' 字段{}",response);
+ return;
+ }
- String symbol = CoinTypeConvert.okxConvert(instId);
- if (symbol != null) {
- redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
- websocketPriceService.comparePriceAsc(symbol, markPx);
- websocketPriceService.comparePriceDesc(symbol, markPx);
+ if (CHANNEL_MARK_PRICE.equals(channel)) {
+ try {
+ JSONArray dataArray = response.getJSONArray("data");
+ if (dataArray != null && !dataArray.isEmpty()) {
+ for (int i = 0; i < dataArray.size(); i++) {
+ try {
+ JSONObject priceData = dataArray.getJSONObject(i);
+ String instId = priceData.getString("instId");
+ String markPx = priceData.getString("markPx");
+ String ts = priceData.getString("ts");
+
+ String redisKey = buildRedisKey(instId);
+ redisUtils.set(redisKey, markPx);
+
+ String symbol = CoinTypeConvert.okxConvert(instId);
+ if (symbol != null) {
+ redisUtils.set(CoinTypeConvert.convertToKey(symbol), markPx);
+ websocketPriceService.comparePriceAsc(symbol, markPx);
+ websocketPriceService.comparePriceDesc(symbol, markPx);
+ }
+
+ log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
+ } catch (Exception innerEx) {
+ log.warn("处理单条价格数据失败", innerEx);
}
-
- log.debug("更新最新价格: {} = {}, 时间: {}", redisKey, markPx, ts);
- } catch (Exception innerEx) {
- log.warn("处理单条价格数据失败", innerEx);
}
}
+ } catch (Exception e) {
+ log.error("处理价格推送数据失败", e);
}
- } catch (Exception e) {
- log.error("处理价格推送数据失败", e);
+ }else if (CHANNEL_INDEX_TICKERS.equals(channel)) {
+ try {
+ JSONArray dataArray = response.getJSONArray("data");
+ if (dataArray != null && !dataArray.isEmpty()) {
+ for (int i = 0; i < dataArray.size(); i++) {
+ try {
+ JSONObject priceData = dataArray.getJSONObject(i);
+ String instId = priceData.getString("instId");
+ String open24h = priceData.getString("open24h");
+ String high24h = priceData.getString("high24h");
+ String low24h = priceData.getString("low24h");
+ String sodUtc0 = priceData.getString("sodUtc0");
+ String sodUtc8 = priceData.getString("sodUtc8");
+ String ts = priceData.getString("ts");
+
+ String redisKey = "open:" + buildRedisKey(instId);
+ redisUtils.set(redisKey, open24h);
+
+ String symbol = CoinTypeConvert.okxConvert(instId);
+ if (symbol != null) {
+ redisUtils.set(CoinTypeConvert.convertToOpenKey(symbol), open24h);
+ }
+
+ log.debug("更新开仓价格: {} = {}, 时间: {}", redisKey, open24h, ts);
+ } catch (Exception innerEx) {
+ log.warn("处理单条价格数据失败", innerEx);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("处理价格推送数据失败", e);
+ }
+ }else if (CHANNEL_OPEN_INTEREST.equals(channel)) {
+ try {
+ JSONArray dataArray = response.getJSONArray("data");
+ if (dataArray != null && !dataArray.isEmpty()) {
+ for (int i = 0; i < dataArray.size(); i++) {
+ try {
+ JSONObject priceData = dataArray.getJSONObject(i);
+ String instId = priceData.getString("instId");
+ String oiUsd = priceData.getString("oiUsd");
+ String ts = priceData.getString("ts");
+
+ String redisKey = "volume:" + buildRedisKey(instId);
+ redisUtils.set(redisKey, oiUsd);
+
+ String symbol = CoinTypeConvert.okxConvert(instId);
+ if (symbol != null) {
+ redisUtils.set(CoinTypeConvert.convertToVolumeKey(symbol), oiUsd);
+ }
+
+ log.debug("更新持仓量: {} = {}, 时间: {}", redisKey, oiUsd, ts);
+ } catch (Exception innerEx) {
+ log.warn("处理单条价格数据失败", innerEx);
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("处理价格推送数据失败", e);
+ }
}
+
+
+
}
/**
diff --git a/src/main/java/com/xcong/excoin/modules/symbols/service/impl/SymbolsServiceImpl.java b/src/main/java/com/xcong/excoin/modules/symbols/service/impl/SymbolsServiceImpl.java
index df1632d..98af864 100644
--- a/src/main/java/com/xcong/excoin/modules/symbols/service/impl/SymbolsServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/symbols/service/impl/SymbolsServiceImpl.java
@@ -126,20 +126,17 @@
public HomeSymbolsVo getSymbolReturnData(String symbol) {
PlatformCnyUsdtExchangeEntity cnyUsdtExchange = platformCnyUsdtExchangeDao.getCNYAndUSDTOne();
- // 获取当日k线数据
-// Candlestick symbolObject = (Candlestick) redisUtils.get(symbol);
// 获取当前币种最新价
BigDecimal newestPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol)));
- // 获取当日k线的开盘价
-// BigDecimal openPrice = symbolObject.getOpen();
-
-// BigDecimal upOrDown = newestPrice.subtract(openPrice).divide(openPrice, 8, BigDecimal.ROUND_HALF_UP);
+ BigDecimal openPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToOpenKey(symbol)));
+ BigDecimal volume = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToVolumeKey(symbol)));
+ BigDecimal upOrDown = newestPrice.subtract(openPrice).divide(openPrice, 8, BigDecimal.ROUND_HALF_UP);
HomeSymbolsVo homeSymbolsVo = new HomeSymbolsVo();
homeSymbolsVo.setSymbol(symbol);
homeSymbolsVo.setCurrentPrice(newestPrice);
-// homeSymbolsVo.setUpOrDown(upOrDown);
-// homeSymbolsVo.setVolume(symbolObject.getAmount());
+ homeSymbolsVo.setUpOrDown(upOrDown);
+ homeSymbolsVo.setVolume(volume);
if (cnyUsdtExchange != null) {
BigDecimal cnyPrice = newestPrice.multiply(cnyUsdtExchange.getValue()).setScale(2, BigDecimal.ROUND_HALF_UP);
homeSymbolsVo.setCnyPrice(cnyPrice);
diff --git a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
index 84cbf91..586fa07 100644
--- a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
+++ b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -53,6 +53,48 @@
}
}
+
+
+ public static String convertToOpenKey(String symbol) {
+ switch (symbol) {
+ case "BTC/USDT":
+ return "open:BTC_NEW_PRICE";
+ case "ETH/USDT":
+ return "open:ETH_NEW_PRICE";
+ case "XRP/USDT":
+ return "open:XRP_NEW_PRICE";
+ case "LTC/USDT":
+ return "open:LTC_NEW_PRICE";
+ case "BCH/USDT":
+ return "open:BCH_NEW_PRICE";
+ case "ETC/USDT":
+ return "open:ETC_NEW_PRICE";
+ default:
+ return null;
+ }
+ }
+
+
+
+ public static String convertToVolumeKey(String symbol) {
+ switch (symbol) {
+ case "BTC/USDT":
+ return "volume:BTC_NEW_PRICE";
+ case "ETH/USDT":
+ return "volume:ETH_NEW_PRICE";
+ case "XRP/USDT":
+ return "volume:XRP_NEW_PRICE";
+ case "LTC/USDT":
+ return "volume:LTC_NEW_PRICE";
+ case "BCH/USDT":
+ return "volume:BCH_NEW_PRICE";
+ case "ETC/USDT":
+ return "volume:ETC_NEW_PRICE";
+ default:
+ return null;
+ }
+ }
+
public static String convertContractTypeToCoin(String symbol) {
if (symbol.indexOf("/") > 0) {
return symbol.substring(0, symbol.indexOf("/"));
--
Gitblit v1.9.1