package com.xcong.excoin.modules.okxApi.wsHandler.handler;
|
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.xcong.excoin.modules.okxApi.OkxGridTradeService;
|
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
|
import lombok.extern.slf4j.Slf4j;
|
import org.java_websocket.client.WebSocketClient;
|
|
import java.math.BigDecimal;
|
|
/**
|
* OKX K线频道处理器(candle1m)— 策略的唯一价格时间驱动源。
|
*
|
* <h3>定位</h3>
|
* 这是一个<b>公开频道</b>,连接到 OKX 公开 WebSocket 端点,
|
* 无需登录认证。订阅 1 分钟 K 线实时推送,每收到一根 K 线即触发
|
* {@link OkxGridTradeService#onKline(BigDecimal)},由策略引擎决定是否开仓/止盈。
|
*
|
* <h3>订阅格式</h3>
|
* <pre>
|
* {"op":"subscribe","args":[{"channel":"candle1m","instId":"ETH-USDT-SWAP"}]}
|
* </pre>
|
*
|
* <h3>数据推送格式</h3>
|
* <pre>
|
* {
|
* "arg": {"channel":"candle1m","instId":"ETH-USDT-SWAP"},
|
* "data": [
|
* ["timestamp","open","high","low","close","vol","volCcy","volCcyQuote","confirm"]
|
* ]
|
* }
|
* </pre>
|
*
|
* <h3>字段说明</h3>
|
* <table>
|
* <tr><th>索引</th><th>字段</th><th>含义</th></tr>
|
* <tr><td>0</td><td>ts</td><td>K线起始时间(Unix ms)</td></tr>
|
* <tr><td>1</td><td>o</td><td>开盘价</td></tr>
|
* <tr><td>2</td><td>h</td><td>最高价</td></tr>
|
* <tr><td>3</td><td>l</td><td>最低价</td></tr>
|
* <tr><td>4</td><td>c</td><td><b>收盘价</b>(用于驱动策略)</td></tr>
|
* <tr><td>5</td><td>vol</td><td>成交量(张)</td></tr>
|
* <tr><td>6</td><td>volCcy</td><td>成交量(币)</td></tr>
|
* <tr><td>7</td><td>volCcyQuote</td><td>成交量(USDT)</td></tr>
|
* <tr><td>8</td><td>confirm</td><td>K线状态("0"=未完结,"1"=已完结)</td></tr>
|
* </table>
|
*
|
* <h3>注意</h3>
|
* 不判断 confirm 字段(K线是否完结)——策略需要 tick 级实时响应价格变动,
|
* 而非等 1 分钟烛线完结后才行动。
|
*
|
* @author Administrator
|
*/
|
@Slf4j
|
public class CandlestickOkxChannelHandler implements OkxChannelHandler {
|
|
/** 频道名称 */
|
private static final String CHANNEL_NAME = "candle1m";
|
|
/** 交易对标识,如 "ETH-USDT-SWAP" */
|
private final String instId;
|
|
/** 网格交易服务,接收 K 线回调 */
|
private final OkxGridTradeService gridTradeService;
|
|
/** 订阅确认状态 */
|
private volatile boolean subscribed = false;
|
|
/**
|
* 构造 K 线频道处理器。
|
*
|
* @param instId 交易对标识(如 "ETH-USDT-SWAP")
|
* @param gridTradeService OKX 网格交易策略服务实例
|
*/
|
public CandlestickOkxChannelHandler(String instId, OkxGridTradeService gridTradeService) {
|
this.instId = instId;
|
this.gridTradeService = gridTradeService;
|
}
|
|
/**
|
* @return 频道名称 "candle1m"
|
*/
|
@Override
|
public String getChannelName() {
|
return CHANNEL_NAME;
|
}
|
|
/**
|
* @return 交易对标识(如 "ETH-USDT-SWAP")
|
*/
|
@Override
|
public String getInstId() {
|
return instId;
|
}
|
|
/**
|
* 发送 K 线频道订阅请求(公开频道,无需签名)。
|
*
|
* <h3>订阅格式</h3>
|
* <pre>
|
* {"op":"subscribe","args":[{"channel":"candle1m","instId":"ETH-USDT-SWAP"}]}
|
* </pre>
|
*
|
* @param ws OKX 公开 WebSocket 客户端
|
*/
|
@Override
|
public void subscribe(WebSocketClient ws) {
|
JSONObject msg = new JSONObject();
|
msg.put("op", "subscribe");
|
JSONArray args = new JSONArray();
|
JSONObject arg = new JSONObject();
|
arg.put("channel", CHANNEL_NAME);
|
arg.put("instId", instId);
|
args.add(arg);
|
msg.put("args", args);
|
ws.send(msg.toJSONString());
|
log.info("[OKX-WS] 订阅K线频道, instId: {}", instId);
|
}
|
|
/**
|
* 发送 K 线频道取消订阅请求。
|
*
|
* @param ws OKX 公开 WebSocket 客户端
|
*/
|
@Override
|
public void unsubscribe(WebSocketClient ws) {
|
JSONObject msg = new JSONObject();
|
msg.put("op", "unsubscribe");
|
JSONArray args = new JSONArray();
|
JSONObject arg = new JSONObject();
|
arg.put("channel", CHANNEL_NAME);
|
arg.put("instId", instId);
|
args.add(arg);
|
msg.put("args", args);
|
ws.send(msg.toJSONString());
|
log.info("[OKX-WS] 取消订阅K线频道, instId: {}", instId);
|
}
|
|
/**
|
* 处理 K 线推送消息。
|
*
|
* <h3>数据提取</h3>
|
* 从 {@code data[0]} 数组中提取索引 4(收盘价 close),
|
* 传给 {@code gridTradeService.onKline(closePrice)}。
|
*
|
* <h3>OKX 数据格式</h3>
|
* data 是一个二维数组,第一层是 K 线条数(通常 1 条),
|
* 第二层是各字段值。data[0][4] = 收盘价。
|
*
|
* @param response WebSocket 推送的完整 JSON
|
* @return true 表示已处理(匹配成功)
|
*/
|
@Override
|
public boolean handleMessage(JSONObject response) {
|
JSONObject arg = response.getJSONObject("arg");
|
if (arg == null || !CHANNEL_NAME.equals(arg.getString("channel"))) {
|
return false;
|
}
|
try {
|
JSONArray dataArray = response.getJSONArray("data");
|
if (dataArray == null || dataArray.isEmpty()) {
|
log.debug("[OKX-WS] candle1m 数据为空");
|
return true;
|
}
|
// data[0] 是一个数组: [ts, o, h, l, c, vol, volCcy, volCcyQuote, confirm]
|
JSONArray candle = dataArray.getJSONArray(0);
|
if (candle == null || candle.size() < 5) {
|
log.warn("[OKX-WS] candle1m 数据格式异常: {}", dataArray);
|
return true;
|
}
|
// 索引 4 = 收盘价 close
|
BigDecimal closePrice = candle.getBigDecimal(4);
|
|
if (gridTradeService != null && closePrice != null) {
|
gridTradeService.onKline(closePrice);
|
}
|
} catch (Exception e) {
|
log.error("[OKX-WS] 处理 candle1m 数据失败", e);
|
}
|
return true;
|
}
|
|
// ==================== 订阅状态 ====================
|
|
@Override
|
public boolean isSubscribed() {
|
return subscribed;
|
}
|
|
@Override
|
public void setSubscribed(boolean subscribed) {
|
this.subscribed = subscribed;
|
}
|
}
|