Helius
2021-06-15 557e3ba49b7171b860512d1fddbe980876cee503
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package com.xcong.excoin.rabbit.consumer;
 
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.modules.exchange.service.HandleKlineService;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.websocket.CandlestickModel;
import com.xcong.excoin.websocket.NewCandlestick;
import com.xcong.excoin.websocket.TradePlateSendWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
 
/**
 *  websocket 只能后台撮合交易那台开启
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "exchange-trade", havingValue = "true")
public class ExchangeConsumer {
 
    @Resource
    private TradePlateSendWebSocket tradePlateSendWebSocket;
 
    @Resource
    private RedisUtils redisUtils;
 
    @Resource
    private HandleKlineService handleKlineService;
 
    @Resource
    private OrderCoinService orderCoinService;
 
    /**
     *  发送盘口信息
     * @param content
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
    public void tradePlate(String content) {
        //log.info("--发送盘口消息--");
        tradePlateSendWebSocket.sendMessagePlate("BZZ/USDT",content,null);
    }
 
    /**
     *  处理订单
     * @param content
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE)
    public void handleTradeExchange(String content) {
       // log.info("#处理订单---->{}#", content);
        List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        // 去掉空的  暂时这样
        Iterator<ExchangeTrade> iterator = exchangeTrades.iterator();
        while (iterator.hasNext()){
            if(iterator.next()==null){
                iterator.remove();
            }
        }
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        // 先处理处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
        try{
            // 处理K线 并更新最新价
            handleKlineService.handleExchangeOrderToKline(exchangeTrades);
            // 推送最新K线
            String symbol = exchangeTrades.get(0).getSymbol();
            String symbolUsdt = symbol;
            if(!symbol.contains("USDT")){
                symbolUsdt = symbol+"/USDT";
            }
            String key = "NEW_KINE_{}";
            key = StrUtil.format(key, symbolUsdt);
            Object o = redisUtils.get(key);
            Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
            Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
 
 
            for(Map.Entry<String, Candlestick> map : entries){
                String ch = "market.{}.kline.{}";
                Candlestick value = map.getValue();
                String key1 = map.getKey();
                String chKey = key1;
                if(key1.equals("1hour")){
                    chKey = "60min";
                }
                // 转换
                NewCandlestick newCandlestick= new NewCandlestick();
                String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt);
                ch = StrUtil.format(ch, nekkusdt,chKey);
                newCandlestick.setCh(ch);
                CandlestickModel model = new CandlestickModel();
                model.setVol(value.getVolume());
                model.setLow(value.getLow());
                model.setOpen(value.getOpen());
                model.setHigh(value.getHigh());
                model.setCount(value.getCount());
                model.setAmount(value.getAmount());
                model.setId(value.getTimestamp()/1000);
                model.setTimestamp(value.getTimestamp()/1000);
                model.setClose(value.getClose());
                newCandlestick.setTick(model);
                tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null);
            }
 
        }catch (Exception e){
            e.printStackTrace();
        }
 
    }
 
    /**
     *  撮合交易订单全部完成
     * @param content
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE)
    public void doComplete(String content) {
        log.debug("#完成的订单---->{}#", content);
        List<OrderCoinsEntity> exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class);
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        orderCoinService.completeOrder(exchangeTrades);
    }
 
}