From 3bec00212427e5e03fc386c2b28a0071359eb003 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Mon, 29 Dec 2025 16:26:53 +0800
Subject: [PATCH] feat(okxNewPrice): 切换WebSocket数据源为K线频道并优化数据处理逻辑
---
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxKlineWebSocketClient.java | 264 +++++++++++++++++++---------------------------------
1 files changed, 98 insertions(+), 166 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxKlineWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxKlineWebSocketClient.java
index 2d487bc..3b5ff27 100644
--- a/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxKlineWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxKlineWebSocketClient.java
@@ -57,8 +57,8 @@
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
- private static final String CHANNEL = "mark-price";
-// private static final String CHANNEL = "candle5m";
+// private static final String CHANNEL = "mark-price";
+ private static final String CHANNEL = "candle1m";
// private static final String CHANNEL = "candle15m";
// 心跳超时时间(秒),小于30秒
@@ -121,8 +121,8 @@
log.info("OkxKlineWebSocketClient销毁完成");
}
- private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/public";
- private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/public";
+ private static final String WS_URL_MONIPAN = "wss://wspap.okx.com:8443/ws/v5/business";
+ private static final String WS_URL_SHIPAN = "wss://ws.okx.com:8443/ws/v5/business";
private static final boolean isAccountType = false;
/**
@@ -276,182 +276,114 @@
*/
private void processPushDataV2(JSONObject response) {
try {
- JSONArray dataArray = response.getJSONArray("data");
- if (dataArray != null && !dataArray.isEmpty()) {
- for (int i = 0; i < dataArray.size(); i++) {
- try {
- JSONObject priceData = dataArray.getJSONObject(i);
- String instId = priceData.getString("instId");
- String markPx = priceData.getString("markPx");
- // 保存价格到Redis
- redisUtils.set(CoinEnums.HE_YUE.getCode(), markPx);
+ /**
+ * {
+ * "arg": {
+ * "channel": "candle1D",
+ * "instId": "BTC-USDT"
+ * },
+ * "data": [
+ * [
+ * "1629993600000",
+ * "42500",
+ * "48199.9",
+ * "41006.1",
+ * "41006.1",
+ * "3587.41204591",
+ * "166741046.22583129",
+ * "166741046.22583129",
+ * "0"
+ * ]
+ * ]
+ * }
+ */
+ JSONObject arg = response.getJSONObject("arg");
+ if (arg == null) {
+ log.warn("{}: 无效的推送数据,缺少 'arg' 字段", response);
+ return;
+ }
- log.debug("更新最新价格: {} = {}, 币种: {}", CoinEnums.HE_YUE.getCode(), markPx, instId);
+ String channel = arg.getString("channel");
+ if (channel == null) {
+ log.warn("{}: 无效的推送数据,缺少 'channel' 字段", response);
+ return;
+ }
- // 价格变化时,触发所有账号的量化操作
+ String instId = arg.getString("instId");
+ if (instId == null) {
+ log.warn("{}: 无效的推送数据,缺少 'instId' 字段", response);
+ return;
+ }
- //调用策略
- // 创建策略实例
- MacdMaStrategy strategy = new MacdMaStrategy();
+ if (CHANNEL.equals(channel) && CoinEnums.HE_YUE.getCode().equals(instId)) {
+ JSONArray dataArray = response.getJSONArray("data");
+ if (dataArray == null || dataArray.isEmpty()) {
+ log.warn("K线频道数据为空");
+ return;
+ }
+ JSONArray data = dataArray.getJSONArray(0);
+ BigDecimal openPx = new BigDecimal(data.getString(1));
+ BigDecimal highPx = new BigDecimal(data.getString(2));
+ BigDecimal lowPx = new BigDecimal(data.getString(3));
+ BigDecimal closePx = new BigDecimal(data.getString(4));
+ BigDecimal vol = new BigDecimal(data.getString(5));
+ /**
+ * K线状态
+ * 0:K线未完结
+ * 1:K线已完结
+ */
+ String confirm = data.getString(8);
+ if ("1".equals(confirm)){
+ //调用策略
+ // 创建策略实例
+ MacdMaStrategy strategy = new MacdMaStrategy();
- // 生成100个15分钟价格数据点
- List<Kline> kline15MinuteData = getKlineDataByInstIdAndBar(instId, "1m");
- List<BigDecimal> historicalPrices = kline15MinuteData.stream()
- .map(Kline::getC)
- .collect(Collectors.toList());
- log.info("生成100个15分钟价格数据点成功!");
- // 使用策略分析最新价格数据
- MacdMaStrategy.TradingOrder tradingOrderOpen = strategy.generateTradingOrder(historicalPrices,MacdMaStrategy.OperationType.open.name());
- if (tradingOrderOpen == null ){
- return;
- }
- Collection<OkxQuantWebSocketClient> allClients = clientManager.getAllClients();
- //如果为空,则直接返回
- if (allClients.isEmpty()) {
- return;
- }
- // 获取所有OkxQuantWebSocketClient实例
- for (OkxQuantWebSocketClient client : clientManager.getAllClients()) {
- String accountName = client.getAccountName();
- if (accountName != null) {
- if (ObjectUtil.isNotEmpty(tradingOrderOpen)){
- // 根据信号执行交易操作
- TradeRequestParam tradeRequestParam = new TradeRequestParam();
+ // 生成100个15分钟价格数据点
+ List<Kline> kline15MinuteData = getKlineDataByInstIdAndBar(instId, "1m");
+ List<BigDecimal> historicalPrices = kline15MinuteData.stream()
+ .map(Kline::getC)
+ .collect(Collectors.toList());
+ log.info("生成100个1分钟价格数据点成功!");
+ // 使用策略分析最新价格数据
+ MacdMaStrategy.TradingOrder tradingOrderOpen = strategy.generateTradingOrder(historicalPrices,MacdMaStrategy.OperationType.open.name());
+ if (tradingOrderOpen == null ){
+ return;
+ }
+ Collection<OkxQuantWebSocketClient> allClients = clientManager.getAllClients();
+ //如果为空,则直接返回
+ if (allClients.isEmpty()) {
+ return;
+ }
+ // 获取所有OkxQuantWebSocketClient实例
+ for (OkxQuantWebSocketClient client : clientManager.getAllClients()) {
+ String accountName = client.getAccountName();
+ if (accountName != null) {
+ if (ObjectUtil.isNotEmpty(tradingOrderOpen)){
+ // 根据信号执行交易操作
+ TradeRequestParam tradeRequestParam = new TradeRequestParam();
- String posSide = tradingOrderOpen.getPosSide();
- tradeRequestParam.setPosSide(posSide);
- String currentPrice = String.valueOf(markPx);
- tradeRequestParam = caoZuoService.caoZuoStrategy(accountName, currentPrice, posSide);
+ String posSide = tradingOrderOpen.getPosSide();
+ tradeRequestParam.setPosSide(posSide);
+ String currentPrice = String.valueOf(closePx);
+ tradeRequestParam = caoZuoService.caoZuoStrategy(accountName, currentPrice, posSide);
- String side = tradingOrderOpen.getSide();
- tradeRequestParam.setSide(side);
+ String side = tradingOrderOpen.getSide();
+ tradeRequestParam.setSide(side);
- String clOrdId = WsParamBuild.getOrderNum(side);
- tradeRequestParam.setClOrdId(clOrdId);
+ String clOrdId = WsParamBuild.getOrderNum(side);
+ tradeRequestParam.setClOrdId(clOrdId);
- String sz = InstrumentsWs.getAccountMap(accountName).get(CoinEnums.BUY_CNT_INIT.name());
- tradeRequestParam.setSz(sz);
- TradeOrderWs.orderEvent(client.getWebSocketClient(), tradeRequestParam);
- }
+ String sz = InstrumentsWs.getAccountMap(accountName).get(CoinEnums.BUY_CNT_INIT.name());
+ tradeRequestParam.setSz(sz);
+ TradeOrderWs.orderEvent(client.getWebSocketClient(), tradeRequestParam);
}
}
- } catch (Exception innerEx) {
- log.warn("处理单条价格数据失败", innerEx);
}
}
}
} catch (Exception e) {
- log.error("处理价格推送数据失败", e);
+ log.error("处理 K线频道推送数据失败", e);
}
-// try {
-// /**
-// * {
-// * "arg": {
-// * "channel": "candle1D",
-// * "instId": "BTC-USDT"
-// * },
-// * "data": [
-// * [
-// * "1629993600000",
-// * "42500",
-// * "48199.9",
-// * "41006.1",
-// * "41006.1",
-// * "3587.41204591",
-// * "166741046.22583129",
-// * "166741046.22583129",
-// * "0"
-// * ]
-// * ]
-// * }
-// */
-// JSONObject arg = response.getJSONObject("arg");
-// if (arg == null) {
-// log.warn("{}: 无效的推送数据,缺少 'arg' 字段", response);
-// return;
-// }
-//
-// String channel = arg.getString("channel");
-// if (channel == null) {
-// log.warn("{}: 无效的推送数据,缺少 'channel' 字段", response);
-// return;
-// }
-//
-// String instId = arg.getString("instId");
-// if (instId == null) {
-// log.warn("{}: 无效的推送数据,缺少 'instId' 字段", response);
-// return;
-// }
-//
-// if (CHANNEL.equals(channel) && CoinEnums.HE_YUE.getCode().equals(instId)) {
-// JSONArray dataArray = response.getJSONArray("data");
-// if (dataArray == null || dataArray.isEmpty()) {
-// log.warn("K线频道数据为空");
-// return;
-// }
-// JSONArray data = dataArray.getJSONArray(0);
-// BigDecimal openPx = new BigDecimal(data.getString(1));
-// BigDecimal highPx = new BigDecimal(data.getString(2));
-// BigDecimal lowPx = new BigDecimal(data.getString(3));
-// BigDecimal closePx = new BigDecimal(data.getString(4));
-// BigDecimal vol = new BigDecimal(data.getString(5));
-// /**
-// * K线状态
-// * 0:K线未完结
-// * 1:K线已完结
-// */
-// String confirm = data.getString(8);
-// if ("1".equals(confirm)){
-// //调用策略
-// // 创建策略实例
-// MacdMaStrategy strategy = new MacdMaStrategy();
-//
-// // 生成100个15分钟价格数据点
-// List<Kline> kline15MinuteData = getKlineDataByInstIdAndBar(instId, "1m");
-// List<BigDecimal> historicalPrices = kline15MinuteData.stream()
-// .map(Kline::getC)
-// .collect(Collectors.toList());
-// log.info("生成100个15分钟价格数据点成功!");
-// // 使用策略分析最新价格数据
-// MacdMaStrategy.TradingOrder tradingOrderOpen = strategy.generateTradingOrder(historicalPrices,MacdMaStrategy.OperationType.open.name());
-// if (tradingOrderOpen == null ){
-// return;
-// }
-// Collection<OkxQuantWebSocketClient> allClients = clientManager.getAllClients();
-// //如果为空,则直接返回
-// if (allClients.isEmpty()) {
-// return;
-// }
-// // 获取所有OkxQuantWebSocketClient实例
-// for (OkxQuantWebSocketClient client : clientManager.getAllClients()) {
-// String accountName = client.getAccountName();
-// if (accountName != null) {
-// if (ObjectUtil.isNotEmpty(tradingOrderOpen)){
-// // 根据信号执行交易操作
-// TradeRequestParam tradeRequestParam = new TradeRequestParam();
-//
-// String posSide = tradingOrderOpen.getPosSide();
-// tradeRequestParam.setPosSide(posSide);
-// String currentPrice = String.valueOf(closePx);
-// tradeRequestParam = caoZuoService.caoZuoStrategy(accountName, currentPrice, posSide);
-//
-// String side = tradingOrderOpen.getSide();
-// tradeRequestParam.setSide(side);
-//
-// String clOrdId = WsParamBuild.getOrderNum(side);
-// tradeRequestParam.setClOrdId(clOrdId);
-//
-// String sz = InstrumentsWs.getAccountMap(accountName).get(CoinEnums.BUY_CNT_INIT.name());
-// tradeRequestParam.setSz(sz);
-// TradeOrderWs.orderEvent(client.getWebSocketClient(), tradeRequestParam);
-// }
-// }
-// }
-// }
-// }
-// } catch (Exception e) {
-// log.error("处理 K线频道推送数据失败", e);
-// }
}
/**
@@ -526,7 +458,7 @@
TradingStrategy tradingStrategy = new TradingStrategy();
// 生成100个15分钟价格数据点
- List<Kline> kline15MinuteData = getKlineDataByInstIdAndBar(instId, "15m");
+ List<Kline> kline15MinuteData = getKlineDataByInstIdAndBar(instId, "1m");
//stream流获取kline15MinuteData中的o数据的集合
List<BigDecimal> prices = kline15MinuteData.stream()
.map(Kline::getC)
--
Gitblit v1.9.1