package com.xcong.excoin.modules.gateApi.wsHandler.handler; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.blackchain.service.DateUtil; import com.xcong.excoin.modules.gateApi.GateGridTradeService; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import java.math.BigDecimal; /** * K 线频道处理器(futures.candlesticks)— 策略的唯一价格时间驱动源。 * *

定位

* 订阅 1 分钟 K 线实时推送,每收到一根 K 线(不等待完结)即触发 * {@link GateGridTradeService#onKline(BigDecimal)},由策略引擎决定是否开仓/止盈。 * *

订阅格式

* 公开频道,无需认证签名。payload: {@code ["1m", contract]}。 * * @author Administrator */ @Slf4j public class CandlestickChannelHandler implements GateChannelHandler { private static final String CHANNEL_NAME = "futures.candlesticks"; /** K 线周期,固定 1 分钟 */ private static final String INTERVAL = "1m"; /** 合约名称 */ private final String contract; /** 网格交易服务,接收 K 线回调 */ private final GateGridTradeService gridTradeService; /** * @param contract 合约名称(如 ETH_USDT) * @param gridTradeService 网格交易策略服务实例 */ public CandlestickChannelHandler(String contract, GateGridTradeService gridTradeService) { this.contract = contract; this.gridTradeService = gridTradeService; } /** @return 频道名称 "futures.candlesticks" */ @Override public String getChannelName() { return CHANNEL_NAME; } /** * 发送 K 线频道订阅请求(公开频道,无需签名)。 * *

订阅格式

*
     * {
     *   "time":    <unix时间戳(秒)>,
     *   "channel": "futures.candlesticks",
     *   "event":   "subscribe",
     *   "payload": ["1m", "{contract}"]
     * }
     * 
*/ @Override public void subscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("time", System.currentTimeMillis() / 1000); msg.put("channel", CHANNEL_NAME); msg.put("event", "subscribe"); JSONArray payload = new JSONArray(); payload.add(INTERVAL); payload.add(contract); msg.put("payload", payload); ws.send(msg.toJSONString()); log.info("[{}] 订阅成功, 合约:{}, 周期:{}", CHANNEL_NAME, contract, INTERVAL); } /** * 发送 K 线频道取消订阅请求。 */ @Override public void unsubscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("time", System.currentTimeMillis() / 1000); msg.put("channel", CHANNEL_NAME); msg.put("event", "unsubscribe"); JSONArray payload = new JSONArray(); payload.add(INTERVAL); payload.add(contract); msg.put("payload", payload); ws.send(msg.toJSONString()); log.info("[{}] 取消订阅成功", CHANNEL_NAME); } /** * 处理 K 线推送消息。 * *

数据提取

* result[0] 中提取: * * *

注意

* 不判断 w(已完结)——策略需要 tick 级实时响应价格变动, * 而非等 1 分钟烛线完结后才行动。 * * @param response WebSocket 推送的完整 JSON * @return true 表示已处理(匹配成功) */ @Override public boolean handleMessage(JSONObject response) { if (!CHANNEL_NAME.equals(response.getString("channel"))) { return false; } try { JSONArray resultArray = response.getJSONArray("result"); if (resultArray == null || resultArray.isEmpty()) { log.warn("[{}] 数据为空", CHANNEL_NAME); return true; } JSONObject data = resultArray.getJSONObject(0); BigDecimal closePx = new BigDecimal(data.getString("c")); // log.info("========== Gate K线数据 =========="); // log.info("名称: {} 时间: {}", data.getString("n"), DateUtil.TimeStampToDateTime(data.getLong("t"))); // log.info("收盘: {} 已完结: {}",data.getString("c"),data.getBooleanValue("w")); // log.info("=================================="); if (gridTradeService != null) { gridTradeService.onKline(closePx); } } catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); } return true; } }