package com.xcong.excoin.modules.gateApi.wsHandler.handler; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.blackchain.service.DateUtil; import com.xcong.excoin.modules.gateApi.GateGridTradeService; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import java.math.BigDecimal; /** * K 线(Candlestick)频道处理器。 * *

特点

* 公开频道,无需 HMAC-SHA512 认证签名。订阅 1m 周期的 K 线数据。 * *

数据流

*
 *   WebSocket 推送 update event
 *     → handleMessage() → 解析 OHLCV → log 打印 → gridTradeService.onKline(closePx)
 *       → WAITING_KLINE: 首次 K 线触发基底双开
 *       → ACTIVE: 驱动 processShortGrid + processLongGrid 网格触发
 * 
* *

订阅格式

* payload: {@code ["1m", contract]} * * @author Administrator */ @Slf4j public class CandlestickChannelHandler implements GateChannelHandler { private static final String CHANNEL_NAME = "futures.candlesticks"; /** K 线周期,固定 1 分钟 */ private static final String INTERVAL = "1m"; /** 合约名称 */ private final String contract; /** 网格交易服务,接收 K 线回调 */ private final GateGridTradeService gridTradeService; /** * @param contract 合约名称(如 ETH_USDT) * @param gridTradeService 网格交易策略服务实例 */ public CandlestickChannelHandler(String contract, GateGridTradeService gridTradeService) { this.contract = contract; this.gridTradeService = gridTradeService; } /** @return 频道名称 "futures.candlesticks" */ @Override public String getChannelName() { return CHANNEL_NAME; } /** * 发送 K 线频道订阅请求(公开频道,无需签名)。 * *

订阅格式

*
     * {
     *   "time":    <unix时间戳(秒)>,
     *   "channel": "futures.candlesticks",
     *   "event":   "subscribe",
     *   "payload": ["1m", "{contract}"]
     * }
     * 
*/ @Override public void subscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("time", System.currentTimeMillis() / 1000); msg.put("channel", CHANNEL_NAME); msg.put("event", "subscribe"); JSONArray payload = new JSONArray(); payload.add(INTERVAL); payload.add(contract); msg.put("payload", payload); ws.send(msg.toJSONString()); log.info("[{}] 订阅成功, 合约:{}, 周期:{}", CHANNEL_NAME, contract, INTERVAL); } /** * 发送 K 线频道取消订阅请求。 */ @Override public void unsubscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("time", System.currentTimeMillis() / 1000); msg.put("channel", CHANNEL_NAME); msg.put("event", "unsubscribe"); JSONArray payload = new JSONArray(); payload.add(INTERVAL); payload.add(contract); msg.put("payload", payload); ws.send(msg.toJSONString()); log.info("[{}] 取消订阅成功", CHANNEL_NAME); } /** * 处理 K 线推送消息。 * *

数据提取

* result[0] 中提取: * * *

注意

* 不判断 w(已完结)——策略需要 tick 级实时响应价格变动, * 而非等 1 分钟烛线完结后才行动。 * * @param response WebSocket 推送的完整 JSON * @return true 表示已处理(匹配成功) */ @Override public boolean handleMessage(JSONObject response) { if (!CHANNEL_NAME.equals(response.getString("channel"))) { return false; } try { JSONArray resultArray = response.getJSONArray("result"); if (resultArray == null || resultArray.isEmpty()) { log.warn("[{}] 数据为空", CHANNEL_NAME); return true; } JSONObject data = resultArray.getJSONObject(0); BigDecimal closePx = new BigDecimal(data.getString("c")); log.info("========== Gate K线数据 =========="); log.info("名称: {} 时间: {}", data.getString("n"), DateUtil.TimeStampToDateTime(data.getLong("t"))); log.info("收盘: {} 已完结: {}",data.getString("c"),data.getBooleanValue("w")); log.info("=================================="); if (gridTradeService != null) { gridTradeService.onKline(closePx); } } catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); } return true; } }