package com.xcong.excoin.modules.gateApi.wsHandler.handler;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.blackchain.service.DateUtil;
import com.xcong.excoin.modules.gateApi.GateGridTradeService;
import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
/**
* K 线频道处理器(futures.candlesticks)— 策略的唯一价格时间驱动源。
*
*
定位
* 订阅 1 分钟 K 线实时推送,每收到一根 K 线(不等待完结)即触发
* {@link GateGridTradeService#onKline(BigDecimal)},由策略引擎决定是否开仓/止盈。
*
* 订阅格式
* 公开频道,无需认证签名。payload: {@code ["1m", contract]}。
*
* @author Administrator
*/
@Slf4j
public class CandlestickChannelHandler implements GateChannelHandler {
private static final String CHANNEL_NAME = "futures.candlesticks";
/** K 线周期,固定 1 分钟 */
private static final String INTERVAL = "1m";
/** 合约名称 */
private final String contract;
/** 网格交易服务,接收 K 线回调 */
private final GateGridTradeService gridTradeService;
/**
* @param contract 合约名称(如 ETH_USDT)
* @param gridTradeService 网格交易策略服务实例
*/
public CandlestickChannelHandler(String contract, GateGridTradeService gridTradeService) {
this.contract = contract;
this.gridTradeService = gridTradeService;
}
/** @return 频道名称 "futures.candlesticks" */
@Override
public String getChannelName() { return CHANNEL_NAME; }
/**
* 发送 K 线频道订阅请求(公开频道,无需签名)。
*
* 订阅格式
*
* {
* "time": <unix时间戳(秒)>,
* "channel": "futures.candlesticks",
* "event": "subscribe",
* "payload": ["1m", "{contract}"]
* }
*
*/
@Override
public void subscribe(WebSocketClient ws) {
JSONObject msg = new JSONObject();
msg.put("time", System.currentTimeMillis() / 1000);
msg.put("channel", CHANNEL_NAME);
msg.put("event", "subscribe");
JSONArray payload = new JSONArray();
payload.add(INTERVAL);
payload.add(contract);
msg.put("payload", payload);
ws.send(msg.toJSONString());
log.info("[{}] 订阅成功, 合约:{}, 周期:{}", CHANNEL_NAME, contract, INTERVAL);
}
/**
* 发送 K 线频道取消订阅请求。
*/
@Override
public void unsubscribe(WebSocketClient ws) {
JSONObject msg = new JSONObject();
msg.put("time", System.currentTimeMillis() / 1000);
msg.put("channel", CHANNEL_NAME);
msg.put("event", "unsubscribe");
JSONArray payload = new JSONArray();
payload.add(INTERVAL);
payload.add(contract);
msg.put("payload", payload);
ws.send(msg.toJSONString());
log.info("[{}] 取消订阅成功", CHANNEL_NAME);
}
/**
* 处理 K 线推送消息。
*
* 数据提取
* result[0] 中提取:
*
* - c(close):收盘价 → 传给 gridTradeService.onKline()
* - n(name):烛线名称(如 "1m_ETH_USDT")
* - t(time):烛线起始时间戳
* - w(window_close):烛线是否完结(仅日志输出,不做门控)
*
*
* 注意
* 不判断 w(已完结)——策略需要 tick 级实时响应价格变动,
* 而非等 1 分钟烛线完结后才行动。
*
* @param response WebSocket 推送的完整 JSON
* @return true 表示已处理(匹配成功)
*/
@Override
public boolean handleMessage(JSONObject response) {
if (!CHANNEL_NAME.equals(response.getString("channel"))) {
return false;
}
try {
JSONArray resultArray = response.getJSONArray("result");
if (resultArray == null || resultArray.isEmpty()) { log.warn("[{}] 数据为空", CHANNEL_NAME); return true; }
JSONObject data = resultArray.getJSONObject(0);
BigDecimal closePx = new BigDecimal(data.getString("c"));
// log.info("========== Gate K线数据 ==========");
// log.info("名称: {} 时间: {}", data.getString("n"), DateUtil.TimeStampToDateTime(data.getLong("t")));
// log.info("收盘: {} 已完结: {}",data.getString("c"),data.getBooleanValue("w"));
// log.info("==================================");
if (gridTradeService != null) {
gridTradeService.onKline(closePx);
}
} catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); }
return true;
}
}