Helius
2020-08-31 237d0b600b55ecbf3d4f241568862ae7b498e983
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
package com.xcong.excoin.rabbit.pricequeue;
 
import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.rabbit.producer.OrderProducer;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.PriorityBlockingQueue;
 
@Slf4j
@Component
public class WebsocketPriceService {
 
    @Autowired
    OrderProducer orderProducer;
    @Resource
    private RedisUtils redisUtils;
 
    /**
     * @param symbol
     * @param price
     */
    public void comparePriceAsc(String symbol, String price) {
        // 比较价格 正序的 最小元素在头部 开多止盈 开空止损等
        PriorityBlockingQueue<AscBigDecimal> queue = PricePriorityQueue.getQueueAsc(symbol);
        // 最小的
        AscBigDecimal b = queue.peek();
        // 当前价
        AscBigDecimal now = new AscBigDecimal(price);
        List<AscBigDecimal> list = new ArrayList<AscBigDecimal>();
        // 找到所有比当前价格大的 是需要操作的
        if (b != null && b.compareTo(now) <= 0) {
            // 可以操作
            System.out.println("当前价格:" + price + "---正序---" + "队列价格:" + b.getValue().toPlainString() + " time:" + new Date());
            while (queue.peek() != null && queue.peek().compareTo(now) <= 0) {
                // 可以发送消息操作
                list.add(queue.remove());
            }
        }
 
        if (CollectionUtils.isNotEmpty(list)) {
            dealAscPriceOrderAndSenMq(list, symbol);
        }
 
    }
 
    public void comparePriceDesc(String symbol, String price) {
        // 比较价格 倒叙的 开多止损  开空止盈
        PriorityBlockingQueue<DescBigDecimal> queue = PricePriorityQueue.getQueueDesc(symbol);
        // 最大价格
        DescBigDecimal b = queue.peek();
        // 当前价格
        DescBigDecimal now = new DescBigDecimal(price);
        List<DescBigDecimal> list = new ArrayList<DescBigDecimal>();
        // 找到比当前价格还大的就是需要操作的 开多止损
        // 即最大的币当前价大 那么需要开多止损
        if (b != null && b.compareTo(now) <= 0) {
            // 可以操作
            System.out.println("当前价格:" + price + "---倒序操作---" + "队列:" + b.getValue().toPlainString() + " time:" + new Date());
 
            while (queue.peek() != null && queue.peek().compareTo(now) <= 0) {
                // 可以发送消息操作
                list.add(queue.remove());
                log.info("#{}#", JSONObject.toJSONString(list));
            }
        }
        if (CollectionUtils.isNotEmpty(list)) {
            dealDescPriceOrderAndSenMq(list, symbol);
        }
 
    }
 
    private void addExecType(OrderModel model) {
        List<Object> orderTypes = redisUtils.lGet(AppContants.RABBIT_TYPE + model.getOrderId(), 0 , -1);
        if (CollUtil.isNotEmpty(orderTypes)) {
            orderTypes.add(model.getType());
        } else {
            orderTypes = new ArrayList<>();
            orderTypes.add(model.getType());
        }
 
        redisUtils.lSet(AppContants.RABBIT_TYPE + model.getOrderId(), orderTypes, 10);
        redisUtils.lSet(AppContants.MEMBER_TYPE + model.getMemberId(), orderTypes, 5);
    }
 
    // 处理消息 正序的 包括
    // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空
    public void dealAscPriceOrderAndSenMq(List<AscBigDecimal> list, String symbol) {
        if (CollectionUtils.isNotEmpty(list)) {
            // 根据不同类型发送不同消息  1 倒序 2  正序
            List<OrderModel> orderModelList = new ArrayList<OrderModel>();
            // 3 正序
            Map<String, List<OrderModel>> orderMap = PricePriorityQueue.getOrderMap(symbol, 3);
            // 根据价格查询到对应的订单
            for (AscBigDecimal asc : list) {
                String key = asc.getValue().toPlainString();
                assert orderMap != null;
                log.info("----->->{}, --> {}", JSONObject.toJSONString(orderMap), key);
                if (orderMap.containsKey(key)) {
                    orderModelList.addAll(orderMap.get(key));
                    orderMap.remove(key);
                }
 
            }
            log.info("------>{}", JSONObject.toJSONString(orderModelList));
            if (CollectionUtils.isEmpty(orderModelList)) {
                return;
            }
            log.info("本次执行的列表ASC");
            // 根据订单的类型发送消息
            // 3:开空  7:爆仓平空
            // 9:止盈平多 12:止损平空
            for (OrderModel model : orderModelList) {
                /*
                   问题: 1、逐仓: 当行情大时,若设置的止损点与爆仓过于接近,则可能会出现直接爆仓,而不止损的情况
                        2、全仓: 止盈价/止损价 设置的与委托平仓价相同,需优先处理止盈/止损
                   解决: 将订单ID作为Key, 该订单执行的队列类型集合作为value, 用于在执行爆仓、委托平仓时,是否存在止盈/止损,若存在则不执行该爆仓和委托平仓
                 */
                addExecType(model);
 
                // 止损平空
                List<OrderModel> kkzsList = new ArrayList<OrderModel>();
                // 止盈平多
                List<OrderModel> kdzyList = new ArrayList<OrderModel>();
                // 爆仓平空
                List<OrderModel> bcList = new ArrayList<OrderModel>();
                // 开空
                List<OrderModel> wtkkList = new ArrayList<OrderModel>();
                // 委托平多
                List<OrderModel> wtpdList = new ArrayList<>();
                switch (model.getType()) {
                    case 3:
                        wtkkList.add(model);
                        break;
                    case 4:
                        wtpdList.add(model);
                        break;
                    case 7:
                        bcList.add(model);
                        break;
                    case 9:
                        kdzyList.add(model);
                        break;
                    case 12:
                        kkzsList.add(model);
                        break;
                    default:
                        log.info("#price-service unknown type#");
                        break;
                }
 
                // 发送消息
                if (CollectionUtils.isNotEmpty(kkzsList)) {
                    String kkzs = JSONObject.toJSONString(kkzsList);
                    orderProducer.sendLessLoss(kkzs);
                }
                if (CollectionUtils.isNotEmpty(kdzyList)) {
                    String kdzy = JSONObject.toJSONString(kdzyList);
                    orderProducer.sendMorePro(kdzy);
                }
                if (CollectionUtils.isNotEmpty(bcList)) {
                    orderProducer.sendCoinout(JSONObject.toJSONString(bcList));
                }
                if (CollectionUtils.isNotEmpty(wtkkList)) {
                    orderProducer.sendLimit(JSONObject.toJSONString(wtkkList));
                }
                if (CollectionUtils.isNotEmpty(wtpdList)) {
                    orderProducer.sendLimitClose(JSONObject.toJSONString(wtpdList));
                }
            }
        }
    }
 
    // 处理消息 正序的 包括
    // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空
    public void dealDescPriceOrderAndSenMq(List<DescBigDecimal> list, String symbol) {
        if (CollectionUtils.isNotEmpty(list)) {
            // 根据不同类型发送不同消息  1 倒序 2  正序
            List<OrderModel> orderModelList = new ArrayList<OrderModel>();
            Map<String, List<OrderModel>> orderMap = PricePriorityQueue.getOrderMap(symbol, 2);
            // 根据价格查询到对应的订单
            for (DescBigDecimal desc : list) {
                String key = desc.getValue().toPlainString();
                assert orderMap != null;
                log.info("----->->{}, --> {}", JSONObject.toJSONString(orderMap), key);
                if (orderMap.containsKey(key)) {
                    orderModelList.addAll(orderMap.get(key));
                    orderMap.remove(key);
                }
 
            }
 
            if (CollectionUtils.isEmpty(orderModelList)) {
                return;
            }
            log.info("本次执行的列表Desc");
            // 根据订单的类型发送消息
            // 2:开多6:爆仓平多
            // 10:止盈平空11:止损平多
            for (OrderModel model : orderModelList) {
                /*
                   问题: 1、逐仓: 当行情大时,若设置的止损点与爆仓过于接近,则可能会出现直接爆仓,而不止损的情况
                        2、全仓: 止盈价/止损价 设置的与委托平仓价相同,需优先处理止盈/止损
                   解决: 将订单ID作为Key, 该订单执行的队列类型集合作为value, 用于在执行爆仓、委托平仓时,是否存在止盈/止损,若存在则不执行该爆仓和委托平仓
                 */
                addExecType(model);
 
                // 开空止盈
                List<OrderModel> kkzyList = new ArrayList<OrderModel>();
                // 开多止损
                List<OrderModel> kdzsList = new ArrayList<OrderModel>();
                // 爆仓
                List<OrderModel> bcList = new ArrayList<OrderModel>();
                // 开多委托
                List<OrderModel> wtkdList = new ArrayList<OrderModel>();
                // 委托平空
                List<OrderModel> wtpkList = new ArrayList<>();
                switch (model.getType()) {
                    case 2:
                        wtkdList.add(model);
                        break;
                    case 5:
                        wtpkList.add(model);
                        break;
                    case 6:
                        bcList.add(model);
                        break;
                    case 10:
                        kkzyList.add(model);
                        break;
                    case 11:
                        kdzsList.add(model);
                        break;
                    default:
                        break;
                }
 
                // 发送消息
                if (CollectionUtils.isNotEmpty(kkzyList)) {
                    String kkzy = JSONObject.toJSONString(kkzyList);
                    orderProducer.sendLessPro(kkzy);
                }
                if (CollectionUtils.isNotEmpty(kdzsList)) {
                    String kdzs = JSONObject.toJSONString(kdzsList);
                    orderProducer.sendMoreLoss(kdzs);
                }
                if (CollectionUtils.isNotEmpty(bcList)) {
                    orderProducer.sendCoinout(JSONObject.toJSONString(bcList));
                }
                if (CollectionUtils.isNotEmpty(wtkdList)) {
                    orderProducer.sendLimit(JSONObject.toJSONString(wtkdList));
                }
                if (CollectionUtils.isNotEmpty(wtpkList)) {
                    orderProducer.sendLimitClose(JSONObject.toJSONString(wtpkList));
                }
            }
        }
    }
 
}