Administrator
4 days ago c7cb31dcafe3046f925f17e3d05604b35938199e
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package com.xcong.excoin.modules.gateApi.wsHandler.handler;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.blackchain.service.DateUtil;
import com.xcong.excoin.modules.gateApi.GateGridTradeService;
import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
 
import java.math.BigDecimal;
 
/**
 * K 线频道处理器。
 * 公开频道,无需认证。订阅 1m K 线,解析后回调 {@link GateGridTradeService#onKline}。
 *
 * @author Administrator
 */
@Slf4j
public class CandlestickChannelHandler implements GateChannelHandler {
 
    private static final String CHANNEL_NAME = "futures.candlesticks";
    private static final String INTERVAL = "1m";
 
    private final String contract;
    private final GateGridTradeService gridTradeService;
 
    public CandlestickChannelHandler(String contract, GateGridTradeService gridTradeService) {
        this.contract = contract;
        this.gridTradeService = gridTradeService;
    }
 
    @Override
    public String getChannelName() {
        return CHANNEL_NAME;
    }
 
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject subscribeMsg = new JSONObject();
        subscribeMsg.put("time", System.currentTimeMillis() / 1000);
        subscribeMsg.put("channel", CHANNEL_NAME);
        subscribeMsg.put("event", "subscribe");
        JSONArray payload = new JSONArray();
        payload.add(INTERVAL);
        payload.add(contract);
        subscribeMsg.put("payload", payload);
        ws.send(subscribeMsg.toJSONString());
        log.info("[{}] 已发送订阅请求,合约: {}, 周期: {}", CHANNEL_NAME, contract, INTERVAL);
    }
 
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject unsubscribeMsg = new JSONObject();
        unsubscribeMsg.put("time", System.currentTimeMillis() / 1000);
        unsubscribeMsg.put("channel", CHANNEL_NAME);
        unsubscribeMsg.put("event", "unsubscribe");
        JSONArray payload = new JSONArray();
        payload.add(INTERVAL);
        payload.add(contract);
        unsubscribeMsg.put("payload", payload);
        ws.send(unsubscribeMsg.toJSONString());
        log.info("[{}] 已发送取消订阅请求,合约: {}, 周期: {}", CHANNEL_NAME, contract, INTERVAL);
    }
 
    @Override
    public boolean handleMessage(JSONObject response) {
        String channel = response.getString("channel");
        if (!CHANNEL_NAME.equals(channel)) {
            return false;
        }
        try {
            JSONArray resultArray = response.getJSONArray("result");
            if (resultArray == null || resultArray.isEmpty()) {
                log.warn("[{}] 数据为空", CHANNEL_NAME);
                return true;
            }
            JSONObject data = resultArray.getJSONObject(0);
            BigDecimal closePx = new BigDecimal(data.getString("c"));
            long t = data.getLong("t");
            boolean windowClosed = data.getBooleanValue("w");
 
            log.info("========== Gate K线数据 ==========");
            log.info("名称(n): {}", data.getString("n"));
            log.info("时间  : {}", DateUtil.TimeStampToDateTime(t));
            log.info("开盘(o): {}", data.getString("o"));
            log.info("最高(h): {}", data.getString("h"));
            log.info("最低(l): {}", data.getString("l"));
            log.info("收盘(c): {}", data.getString("c"));
            log.info("成交量(v): {}", data.getString("v"));
            log.info("成交额(a): {}", data.getString("a"));
            log.info("K线完结(w): {}", windowClosed);
            log.info("==================================");
 
            if (gridTradeService != null) {
                gridTradeService.onKline(closePx);
            }
        } catch (Exception e) {
            log.error("[{}] 处理数据失败", CHANNEL_NAME, e);
        }
        return true;
    }
}