package com.xcong.excoin.modules.okxApi.wsHandler.handler;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.IOkxStrategy;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
/**
* OKX 标记价格频道处理器(mark-price)— 策略的价格驱动源。
*
*
定位
* 这是一个公开频道,连接到 OKX 公开 WebSocket 端点,无需登录认证。
* 替代 candle1m K 线频道,作为策略唯一的实时价格来源。
* 标记价格变化时每 200ms 推送一次,无变化时每 10s 推送一次。
*
* 双回调
*
* - {@link IOkxStrategy#onKline(BigDecimal)} — 驱动策略(处理开仓/止盈逻辑)
* - {@link IOkxStrategy#setMarkPrice(BigDecimal)} — PnLPriceMode.MARK_PRICE 计算未实现盈亏
*
*
* 订阅格式
*
* {"op":"subscribe","args":[{"channel":"mark-price","instId":"ETH-USDT-SWAP"}]}
*
*
* 数据推送格式
*
* {
* "arg": {"channel":"mark-price","instId":"ETH-USDT-SWAP"},
* "data": [{
* "instType": "SWAP",
* "instId": "ETH-USDT-SWAP",
* "markPx": "42310.6",
* "ts": "1630049139746"
* }]
* }
*
*
* 推送频率
*
* - 标记价格变化 → 每 200ms 推送一次
* - 标记价格无变化 → 每 10s 推送一次
*
*
* @author Administrator
*/
@Slf4j
public class MarkPriceOkxChannelHandler implements OkxChannelHandler {
/** 频道名称 */
private static final String CHANNEL_NAME = "mark-price";
/** 交易对标识,如 "ETH-USDT-SWAP" */
private final String instId;
/** 策略服务实例,接收标记价格回调 */
private final IOkxStrategy strategy;
/** 订阅确认状态 */
private volatile boolean subscribed = false;
/**
* 构造标记价格频道处理器。
*
* @param instId 交易对标识(如 "ETH-USDT-SWAP")
* @param strategy OKX 交易策略服务实例
*/
public MarkPriceOkxChannelHandler(String instId, IOkxStrategy strategy) {
this.instId = instId;
this.strategy = strategy;
}
/**
* @return 频道名称 "mark-price"
*/
@Override
public String getChannelName() {
return CHANNEL_NAME;
}
/**
* @return 交易对标识(如 "ETH-USDT-SWAP")
*/
@Override
public String getInstId() {
return instId;
}
/**
* 发送标记价格频道订阅请求(公开频道,无需签名)。
*
* @param ws OKX 公开 WebSocket 客户端
*/
@Override
public void subscribe(WebSocketClient ws) {
JSONObject msg = new JSONObject();
msg.put("op", "subscribe");
JSONArray args = new JSONArray();
JSONObject arg = new JSONObject();
arg.put("channel", CHANNEL_NAME);
arg.put("instId", instId);
args.add(arg);
msg.put("args", args);
ws.send(msg.toJSONString());
log.info("[OKX-WS] 订阅标记价格频道, instId: {}", instId);
}
/**
* 发送标记价格频道取消订阅请求。
*
* @param ws OKX 公开 WebSocket 客户端
*/
@Override
public void unsubscribe(WebSocketClient ws) {
JSONObject msg = new JSONObject();
msg.put("op", "unsubscribe");
JSONArray args = new JSONArray();
JSONObject arg = new JSONObject();
arg.put("channel", CHANNEL_NAME);
arg.put("instId", instId);
args.add(arg);
msg.put("args", args);
ws.send(msg.toJSONString());
log.info("[OKX-WS] 取消订阅标记价格频道, instId: {}", instId);
}
/**
* 处理标记价格推送消息。
*
* 数据提取
* 从 {@code data[0].markPx} 提取标记价格,同时回调:
*
* - {@code gridTradeService.onKline(markPrice)} — 驱动网格策略
* - {@code gridTradeService.setMarkPrice(markPrice)} — PnL 计算用
*
*
* @param response WebSocket 推送的完整 JSON
* @return true 表示已处理(匹配成功)
*/
@Override
public boolean handleMessage(JSONObject response) {
JSONObject arg = response.getJSONObject("arg");
if (arg == null || !CHANNEL_NAME.equals(arg.getString("channel"))) {
return false;
}
try {
JSONArray dataArray = response.getJSONArray("data");
if (dataArray == null || dataArray.isEmpty()) {
return true;
}
// data[0] 是一个 JSONObject: {instType, instId, markPx, ts}
JSONObject markData = dataArray.getJSONObject(0);
if (markData == null) {
return true;
}
String markPxStr = markData.getString("markPx");
if (markPxStr == null || markPxStr.isEmpty()) {
return true;
}
BigDecimal markPrice = new BigDecimal(markPxStr);
if (strategy != null) {
strategy.setMarkPrice(markPrice);
strategy.onKline(markPrice);
}
} catch (Exception e) {
log.error("[OKX-WS] 处理 mark-price 数据失败", e);
}
return true;
}
// ==================== 订阅状态 ====================
@Override
public boolean isSubscribed() {
return subscribed;
}
@Override
public void setSubscribed(boolean subscribed) {
this.subscribed = subscribed;
}
}