Administrator
4 days ago 981a87bd0b06a17064f7b0d2435e96045cc03987
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package com.xcong.excoin.modules.okxApi.wsHandler.handler;
 
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.OkxConfig;
import com.xcong.excoin.modules.okxApi.OkxGridTradeService;
import com.xcong.excoin.modules.okxApi.wsHandler.AbstractOkxPrivateChannelHandler;
import com.xcong.excoin.modules.okxApi.TraderParam;
import lombok.extern.slf4j.Slf4j;
 
import java.math.BigDecimal;
 
/**
 * OKX 仓位频道处理器(positions),接收仓位更新推送并回调
 * {@link OkxGridTradeService#onPositionUpdate(String, TraderParam.Direction, BigDecimal, BigDecimal)}。
 *
 * <h3>订阅格式</h3>
 * 私有频道,需要先登录认证。订阅 arg 使用 instType: "SWAP"(不指定具体 instId,
 * 会在 handleMessage 中通过 config.getContract() 过滤)。
 * <pre>
 * {"op":"subscribe","args":[{"channel":"positions","instType":"SWAP"}]}
 * </pre>
 *
 * <h3>数据推送格式</h3>
 * <pre>
 * {
 *   "arg": {"channel":"positions","instType":"SWAP"},
 *   "data": [{
 *     "instId": "ETH-USDT-SWAP",
 *     "posSide": "long",        // "long" 或 "short"
 *     "pos": "1",               // 持仓张数
 *     "avgPx": "3000",          // 开仓均价
 *     ...
 *   }]
 * }
 * </pre>
 *
 * <h3>数据映射</h3>
 * <ul>
 *   <li>posSide "long" → {@link TraderParam.Direction#LONG},映射为 DUAL_LONG</li>
 *   <li>posSide "short" → {@link TraderParam.Direction#SHORT},映射为 DUAL_SHORT</li>
 *   <li>pos → 持仓张数(绝对值)</li>
 *   <li>avgPx → 开仓均价</li>
 * </ul>
 *
 * @author Administrator
 */
@Slf4j
public class PositionsOkxChannelHandler extends AbstractOkxPrivateChannelHandler {
 
    /** OKX 仓位频道名称 */
    private static final String CHANNEL_NAME = "positions";
 
    /** OKX 配置,用于获取合约名称进行过滤 */
    private final OkxConfig config;
 
    /**
     * 构造仓位频道处理器。
     *
     * @param config           OKX 配置实例(提供合约名称等)
     * @param gridTradeService OKX 网格交易策略服务实例
     */
    public PositionsOkxChannelHandler(OkxConfig config, OkxGridTradeService gridTradeService) {
        super(CHANNEL_NAME,
                config.getApiKey(), config.getApiSecret(), config.getPassphrase(),
                config.getContract(),
                gridTradeService);
        this.config = config;
    }
 
    /**
     * 处理仓位推送消息。
     *
     * <h3>处理流程</h3>
     * <ol>
     *   <li>检查 arg.channel 是否匹配 "positions"</li>
     *   <li>遍历 data 数组,按 instId 过滤出 config.getContract() 对应的仓位</li>
     *   <li>映射 posSide → Direction(long=DualLong, short=DualShort)</li>
     *   <li>提取 pos(张数)、avgPx(均价)</li>
     *   <li>调用 gridTradeService.onPositionUpdate(contract, direction, size, entryPrice)</li>
     * </ol>
     *
     * @param response WebSocket 推送的完整 JSON
     * @return true 表示已处理(匹配成功)
     */
    @Override
    public boolean handleMessage(JSONObject response) {
        JSONObject arg = response.getJSONObject("arg");
        if (arg == null || !CHANNEL_NAME.equals(arg.getString("channel"))) {
            return false;
        }
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray == null || dataArray.isEmpty()) {
                return true;
            }
 
            String contract = config.getContract(); // e.g., "ETH-USDT"
            for (int i = 0; i < dataArray.size(); i++) {
                JSONObject posData = dataArray.getJSONObject(i);
 
                // 按 instId 精确过滤,只处理当前合约的仓位(避免误匹配交割合约)
                String dataInstId = posData.getString("instId");
                if (dataInstId == null || !dataInstId.equals(contract)) {
                    continue;
                }
 
                // 解析持仓方向:OKX 的 posSide 可以是 "long" 或 "short"
                String posSide = posData.getString("posSide");
                TraderParam.Direction direction;
                if ("long".equals(posSide)) {
                    direction = TraderParam.Direction.LONG;
                } else if ("short".equals(posSide)) {
                    direction = TraderParam.Direction.SHORT;
                } else {
                    log.debug("[OKX-WS] positions 忽略 net 方向: {}", posSide);
                    continue;
                }
 
                String posStr = posData.getString("pos");
                String avgPxStr = posData.getString("avgPx");
                // 仓位归零时 OKX 推送 avgPx: ""(空串),需做防护
                BigDecimal size = (posStr != null && !posStr.isEmpty())
                        ? new BigDecimal(posStr) : BigDecimal.ZERO;
                BigDecimal entryPrice = (avgPxStr != null && !avgPxStr.isEmpty())
                        ? new BigDecimal(avgPxStr) : BigDecimal.ZERO;
 
//                log.info("[OKX-WS] positions 持仓更新, instId:{}, posSide:{}, pos:{}, avgPx:{}",
//                        dataInstId, posSide, size, entryPrice);
 
                if (getGridTradeService() != null) {
                    getGridTradeService().onPositionUpdate(contract, direction, size, entryPrice);
                }
            }
        } catch (Exception e) {
            log.error("[OKX-WS] 处理 positions 数据失败", e);
        }
        return true;
    }
}