Administrator
2025-12-09 249ddecd3afeb12537edfef4d7a127ffe32f7b9f
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package com.xcong.excoin.modules.okxNewPrice.celue;
 
import com.xcong.excoin.modules.okxNewPrice.okxWs.InstrumentsWs;
import com.xcong.excoin.modules.okxNewPrice.okxWs.PositionsWs;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.OrderParamEnums;
import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeQueue;
import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService;
import com.xcong.excoin.rabbit.pricequeue.AscBigDecimal;
import com.xcong.excoin.rabbit.pricequeue.DescBigDecimal;
import com.xcong.excoin.utils.RedisUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
 
import java.math.BigDecimal;
import java.util.concurrent.PriorityBlockingQueue;
 
/**
 * 操作服务实现类,用于处理与交易相关的逻辑操作。
 * 包括根据市场行情判断是否进行加仓或减仓,并维护相关价格队列。
 *
 * @author Administrator
 */
@Slf4j
@Service
@RequiredArgsConstructor
public class CaoZuoServiceImpl implements CaoZuoService {
 
    private final RedisUtils redisUtils;
    private final WangGeService wangGeService;
 
    /**
     * 执行主要的操作逻辑,包括读取合约状态、获取市场价格信息,
     * 并根据当前持仓均价和标记价格决定是否执行买卖操作。
     *
     * @return 返回操作类型字符串(如买入BUY、卖出SELL等)
     */
    @Override
    public String caoZuo() {
        // 构造Redis键名
        final String coinCode = CoinEnums.HE_YUE.getCode();
        final String instrumentsKey = InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + coinCode + ":state";
        final String positionsMarkPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":markPx";
        final String positionsAvgPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":avgPx";
        final String positionsOrderPriceKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":orderPrice";
 
        // 获取合约状态
        String state = (String) redisUtils.get(instrumentsKey);
        if (state == null || !OrderParamEnums.STATE_1.getValue().equals(state)) {
            return OrderParamEnums.HOLDING.getValue();
        }
        if (OrderParamEnums.STATE_4.getValue().equals(state)) {
            return OrderParamEnums.ORDERING.getValue();
        }
 
        log.info(OrderParamEnums.getNameByValue(state));
 
        // 获取标记价格和平均持仓价格
        Object markPxObj = redisUtils.get(positionsMarkPxKey);
        Object avgPxObj = redisUtils.get(positionsAvgPxKey);
 
        if (markPxObj == null || avgPxObj == null) {
            return OrderParamEnums.INIT.getValue();
        }
 
        try {
            BigDecimal markPx = new BigDecimal((String) markPxObj);
            BigDecimal avgPx = new BigDecimal((String) avgPxObj);
 
            log.info("开仓价格: {}, 当前价格:{},匹配队列中......", avgPx, markPx);
 
            // 初始化网格队列
            PriorityBlockingQueue<AscBigDecimal> queueAsc = WangGeQueue.getQueueAsc();
            PriorityBlockingQueue<DescBigDecimal> queueKaiCang = wangGeService.initKaiCang(avgPx, queueAsc);
            PriorityBlockingQueue<AscBigDecimal> queuePingCang = wangGeService.initPingCang(avgPx, queueAsc);
 
            // 处理订单价格在队列中的情况
            String orderPrice = (String) redisUtils.get(positionsOrderPriceKey);
            handleOrderPriceInQueues(orderPrice, queueKaiCang, queuePingCang);
 
            String side = OrderParamEnums.HOLDING.getValue();
 
            // 判断是加仓还是减仓
            if (avgPx.compareTo(markPx) > 0) {
                DescBigDecimal kaiCang = queueKaiCang.peek();
                if (kaiCang != null && kaiCang.getValue().compareTo(markPx) >= 0) {
                    log.info("开始加仓...开仓队列价格大于当前价格{}>{}", kaiCang.getValue(), markPx);
                    side = OrderParamEnums.BUY.getValue();
                    redisUtils.set(positionsOrderPriceKey, String.valueOf(kaiCang.getValue()), 0);
                } else {
                    log.info("未触发加仓......,等待");
                }
            } else if (avgPx.compareTo(markPx) < 0) {
                AscBigDecimal pingCang = queuePingCang.peek();
                if (pingCang != null && pingCang.getValue().compareTo(markPx) <= 0) {
                    log.info("开始减仓...平仓队列价格小于当前价格{}<={}", pingCang.getValue(), markPx);
                    side = OrderParamEnums.SELL.getValue();
                    redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0);
                } else {
                    log.info("未触发减仓......,等待");
                }
            } else {
                log.info("价格波动较小......,等待");
            }
 
            return side;
        } catch (NumberFormatException e) {
            log.error("解析价格失败,请检查Redis中的值是否合法", e);
            return OrderParamEnums.HOLDING.getValue();
        }
    }
 
    /**
     * 根据订单价格更新开仓和平仓队列的内容。
     * 若该价格不在对应队列中则加入;若已存在,则从队列中移除。
     *
     * @param orderPrice 订单价格
     * @param queueKaiCang 开仓价格优先队列(降序)
     * @param queuePingCang 平仓价格优先队列(升序)
     */
    private void handleOrderPriceInQueues(String orderPrice,
                                          PriorityBlockingQueue<DescBigDecimal> queueKaiCang,
                                          PriorityBlockingQueue<AscBigDecimal> queuePingCang) {
        if (orderPrice == null) {
            return;
        }
 
        BigDecimal priceDecimal;
        try {
            priceDecimal = new BigDecimal(orderPrice);
        } catch (NumberFormatException ex) {
            log.warn("无效的价格格式: {}", orderPrice);
            return;
        }
 
        boolean kaiCangExists = queueKaiCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal));
        if (!kaiCangExists) {
            queueKaiCang.add(new DescBigDecimal(orderPrice));
        } else {
            queueKaiCang.removeIf(item -> item.getValue().equals(priceDecimal));
        }
 
        boolean pingCangExists = queuePingCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal));
        if (!pingCangExists) {
            queuePingCang.add(new AscBigDecimal(orderPrice));
        } else {
            queuePingCang.removeIf(item -> item.getValue().equals(priceDecimal));
        }
    }
 
    /**
     * 计算盈亏金额。
     *
     * @param faceValue 面值
     * @param position 持仓数量
     * @param contractMultiplier 合约乘数
     * @param markPrice 标记价格
     * @param openPrice 开仓价格
     * @param isLong 是否为多头仓位
     * @param minTickSz 最小变动单位精度
     * @return 盈亏金额,保留指定精度的小数位
     */
    public BigDecimal profit(BigDecimal faceValue, BigDecimal position, BigDecimal contractMultiplier,
                             BigDecimal markPrice, BigDecimal openPrice, boolean isLong, int minTickSz) {
        BigDecimal profit = BigDecimal.ZERO;
        if (isLong) {
            profit = markPrice.subtract(openPrice)
                    .multiply(faceValue)
                    .multiply(contractMultiplier)
                    .multiply(position);
        } else {
            profit = openPrice.subtract(markPrice)
                    .multiply(faceValue)
                    .multiply(contractMultiplier)
                    .multiply(position);
        }
        return profit.setScale(minTickSz, BigDecimal.ROUND_DOWN);
    }
}