package com.xcong.excoin.modules.okxApi.wsHandler.handler; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.okxApi.IOkxStrategy; import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import java.math.BigDecimal; /** * OKX 标记价格频道处理器(mark-price)— 策略的价格驱动源。 * *

定位

* 这是一个公开频道,连接到 OKX 公开 WebSocket 端点,无需登录认证。 * 替代 candle1m K 线频道,作为策略唯一的实时价格来源。 * 标记价格变化时每 200ms 推送一次,无变化时每 10s 推送一次。 * *

双回调

* * *

订阅格式

*
 * {"op":"subscribe","args":[{"channel":"mark-price","instId":"ETH-USDT-SWAP"}]}
 * 
* *

数据推送格式

*
 * {
 *   "arg": {"channel":"mark-price","instId":"ETH-USDT-SWAP"},
 *   "data": [{
 *     "instType": "SWAP",
 *     "instId": "ETH-USDT-SWAP",
 *     "markPx": "42310.6",
 *     "ts": "1630049139746"
 *   }]
 * }
 * 
* *

推送频率

* * * @author Administrator */ @Slf4j public class MarkPriceOkxChannelHandler implements OkxChannelHandler { /** 频道名称 */ private static final String CHANNEL_NAME = "mark-price"; /** 交易对标识,如 "ETH-USDT-SWAP" */ private final String instId; /** 策略服务实例,接收标记价格回调 */ private final IOkxStrategy strategy; /** 订阅确认状态 */ private volatile boolean subscribed = false; /** * 构造标记价格频道处理器。 * * @param instId 交易对标识(如 "ETH-USDT-SWAP") * @param strategy OKX 交易策略服务实例 */ public MarkPriceOkxChannelHandler(String instId, IOkxStrategy strategy) { this.instId = instId; this.strategy = strategy; } /** * @return 频道名称 "mark-price" */ @Override public String getChannelName() { return CHANNEL_NAME; } /** * @return 交易对标识(如 "ETH-USDT-SWAP") */ @Override public String getInstId() { return instId; } /** * 发送标记价格频道订阅请求(公开频道,无需签名)。 * * @param ws OKX 公开 WebSocket 客户端 */ @Override public void subscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("op", "subscribe"); JSONArray args = new JSONArray(); JSONObject arg = new JSONObject(); arg.put("channel", CHANNEL_NAME); arg.put("instId", instId); args.add(arg); msg.put("args", args); ws.send(msg.toJSONString()); log.info("[OKX-WS] 订阅标记价格频道, instId: {}", instId); } /** * 发送标记价格频道取消订阅请求。 * * @param ws OKX 公开 WebSocket 客户端 */ @Override public void unsubscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("op", "unsubscribe"); JSONArray args = new JSONArray(); JSONObject arg = new JSONObject(); arg.put("channel", CHANNEL_NAME); arg.put("instId", instId); args.add(arg); msg.put("args", args); ws.send(msg.toJSONString()); log.info("[OKX-WS] 取消订阅标记价格频道, instId: {}", instId); } /** * 处理标记价格推送消息。 * *

数据提取

* 从 {@code data[0].markPx} 提取标记价格,同时回调: *
    *
  1. {@code gridTradeService.onKline(markPrice)} — 驱动网格策略
  2. *
  3. {@code gridTradeService.setMarkPrice(markPrice)} — PnL 计算用
  4. *
* * @param response WebSocket 推送的完整 JSON * @return true 表示已处理(匹配成功) */ @Override public boolean handleMessage(JSONObject response) { JSONObject arg = response.getJSONObject("arg"); if (arg == null || !CHANNEL_NAME.equals(arg.getString("channel"))) { return false; } try { JSONArray dataArray = response.getJSONArray("data"); if (dataArray == null || dataArray.isEmpty()) { return true; } // data[0] 是一个 JSONObject: {instType, instId, markPx, ts} JSONObject markData = dataArray.getJSONObject(0); if (markData == null) { return true; } String markPxStr = markData.getString("markPx"); if (markPxStr == null || markPxStr.isEmpty()) { return true; } BigDecimal markPrice = new BigDecimal(markPxStr); if (strategy != null) { strategy.setMarkPrice(markPrice); strategy.onKline(markPrice); } } catch (Exception e) { log.error("[OKX-WS] 处理 mark-price 数据失败", e); } return true; } // ==================== 订阅状态 ==================== @Override public boolean isSubscribed() { return subscribed; } @Override public void setSubscribed(boolean subscribed) { this.subscribed = subscribed; } }