From e5e3bc5b8f863ce8f1069a12350432ea8cfbf965 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Fri, 26 Feb 2021 15:31:18 +0800
Subject: [PATCH] modify

---
 src/main/java/com/xcong/excoin/netty/common/ChannelManager.java                           |   22 +++
 src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java                           |   10 ++
 src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java        |   22 ++++
 src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java                 |   47 +++++++++
 src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java                               |   39 ++++++-
 src/main/java/com/xcong/excoin/configurations/RedisConfig.java                            |   18 +++
 src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java |   47 +++++++++
 src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java     |    2 
 src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java                       |   32 +++--
 src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java                       |   12 +
 10 files changed, 223 insertions(+), 28 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/configurations/RedisConfig.java b/src/main/java/com/xcong/excoin/configurations/RedisConfig.java
index a9a61fa..fafc19c 100644
--- a/src/main/java/com/xcong/excoin/configurations/RedisConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RedisConfig.java
@@ -3,11 +3,15 @@
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.xcong.excoin.configurations.listener.RedisReceiver;
 import org.springframework.cache.annotation.EnableCaching;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.data.redis.connection.RedisConnectionFactory;
 import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
 import org.springframework.data.redis.serializer.StringRedisSerializer;
 
@@ -44,4 +48,18 @@
         return template;
     }
 
+    @Bean
+    MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
+        return new MessageListenerAdapter(receiver, "onMessage");
+    }
+
+    @Bean
+    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
+        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+        container.setConnectionFactory(connectionFactory);
+        container.addMessageListener(listenerAdapter, new PatternTopic("channel:newprice"));
+        return container;
+    }
+
+
 }
diff --git a/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java b/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java
index 06994e0..29afb65 100644
--- a/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java
@@ -2,6 +2,8 @@
 
 import com.aliyun.oss.OSS;
 import com.aliyun.oss.OSSClientBuilder;
+import com.huobi.client.SubscriptionClient;
+import com.huobi.client.SubscriptionOptions;
 import com.xcong.excoin.configurations.properties.AliOssProperties;
 import com.xcong.excoin.configurations.security.UserAuthenticationArgumentResolver;
 import com.xcong.excoin.utils.SpringContextHolder;
@@ -51,6 +53,14 @@
         return new OSSClientBuilder().build(aliOssProperties.getEndPoint(), aliOssProperties.getAccessKeyId(), aliOssProperties.getAccessKeySecret());
     }
 
+    @Bean
+    public SubscriptionClient subscriptionClient() {
+        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
+        subscriptionOptions.setConnectionDelayOnFailure(3);
+        subscriptionOptions.setUri("wss://api.hadax.com/ws");
+        return SubscriptionClient.create("", "", subscriptionOptions);
+    }
+
 //    @Bean
 //    public SpringContextHolder springContextHolder() {
 //        return new SpringContextHolder();
diff --git a/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java b/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java
new file mode 100644
index 0000000..75a5730
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.configurations.listener;
+
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.ApplicationEventPublisherAware;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.core.RedisKeyExpiredEvent;
+import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.Topic;
+import org.springframework.lang.Nullable;
+
+/**
+ * @author wzy
+ * @date 2021-02-25
+ **/
+public class KeyUpdateEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
+    private static final Topic KEYEVENT_DELETE_TOPIC = new PatternTopic("__keyevent@*__:set");
+
+    @Nullable
+    private ApplicationEventPublisher publisher;
+
+    public KeyUpdateEventMessageListener(RedisMessageListenerContainer listenerContainer) {
+        super(listenerContainer);
+    }
+
+    @Override
+    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
+        listenerContainer.addMessageListener(this, KEYEVENT_DELETE_TOPIC);
+    }
+
+    @Override
+    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
+        this.publisher = applicationEventPublisher;
+    }
+
+    @Override
+    protected void doHandleMessage(Message message) {
+        this.publishEvent(new RedisKeyExpiredEvent(message.getBody()));
+    }
+
+    protected void publishEvent(RedisKeyExpiredEvent event) {
+        if (this.publisher != null) {
+            this.publisher.publishEvent(event);
+        }
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java b/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java
new file mode 100644
index 0000000..8be8810
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java
@@ -0,0 +1,22 @@
+package com.xcong.excoin.configurations.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2021-02-25
+ **/
+@Slf4j
+public class RedisKeyUpdateListener extends KeyUpdateEventMessageListener{
+    public RedisKeyUpdateListener(RedisMessageListenerContainer listenerContainer) {
+        super(listenerContainer);
+    }
+
+    @Override
+    public void onMessage(Message message, byte[] pattern) {
+        log.info("{}", message.toString());
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java b/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java
new file mode 100644
index 0000000..f8a4bba
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.configurations.listener;
+
+import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.model.event.TradeEvent;
+import com.xcong.excoin.modules.symbols.service.SymbolsService;
+import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @author wzy
+ * @date 2021-02-25
+ **/
+@Slf4j
+@Component
+public class RedisReceiver implements MessageListener {
+
+    @Resource
+    private RedisUtils redisUtils;
+
+    @Resource
+    private SymbolsService symbolsService;
+
+    @Resource
+    private WebsocketPriceService websocketPriceService;
+
+    @Override
+    public void onMessage(Message message, byte[] bytes) {
+        String data = message.toString().replaceAll("\"", "");
+        String[] dataArr = data.split("_");
+
+        String symbol = dataArr[0];
+        String price = dataArr[1];
+
+        redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
+        // 比较
+        websocketPriceService.comparePriceAsc(symbol, price);
+        websocketPriceService.comparePriceDesc(symbol, price);
+        websocketPriceService.wholeBomb();
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
index a7e99ec..396da2f 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -85,7 +85,7 @@
         // 获取最新区块
         String string = redisUtils.getString(ETH_USDT_BLOCK_NUM);
         if(string==null){
-            string = "11892420";
+            string = "11925303";
         }
         BigInteger blockNum = new BigInteger(string);
         Credentials credentials = Credentials.create(privateKey);
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 3937137..e6f6336 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -61,6 +61,12 @@
     }
 
     public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+        String t = "";
+        if (type.contains("depth")) {
+            String[] s = type.split("_");
+            type = s[0];
+            t = s[1];
+        }
         switch (type) {
             case "kline" :
                 ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -71,12 +77,13 @@
                 KLINE_MAP.put(symbol, kline);
                 break;
             case "depth" :
-                ChannelGroup depth = DEPTH_MAP.get(symbol);
+                String key = symbol + "_" + t;
+                ChannelGroup depth = DEPTH_MAP.get(key);
                 if (depth == null) {
                     depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                 }
                 depth.add(channel);
-                DEPTH_MAP.put(symbol, depth);
+                DEPTH_MAP.put(key, depth);
                 break;
             case "trade" :
                 ChannelGroup trade = TRADE_MAP.get(symbol);
@@ -92,6 +99,12 @@
     }
 
     public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+        String t = "";
+        if (type.contains("depth")) {
+            String[] s = type.split("_");
+            type = s[0];
+            t = s[1];
+        }
         switch (type) {
             case "kline" :
                 ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -102,12 +115,13 @@
                 KLINE_MAP.put(symbol, kline);
                 break;
             case "depth" :
-                ChannelGroup depth = DEPTH_MAP.get(symbol);
+                String key = symbol + "_" + t;
+                ChannelGroup depth = DEPTH_MAP.get(key);
                 if (depth == null) {
                     return;
                 }
                 depth.remove(channel);
-                DEPTH_MAP.put(symbol, depth);
+                DEPTH_MAP.put(key, depth);
                 break;
             case "trade" :
                 ChannelGroup trade = TRADE_MAP.get(symbol);
diff --git a/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java
index 6d6438c..364f75d 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java
@@ -9,6 +9,7 @@
 import com.xcong.excoin.utils.CoinTypeConvert;
 import com.xcong.excoin.utils.RedisUtils;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
 import org.springframework.stereotype.Component;
 
@@ -35,13 +36,16 @@
     @Resource
     private WebsocketPriceService websocketPriceService;
 
+    @Autowired
+    private SubscriptionClient subscriptionClient;
+
     @PostConstruct
     public void initNewestPrice() {
         log.info("#=======价格更新开启=======#");
-        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
-        subscriptionOptions.setConnectionDelayOnFailure(5);
-        subscriptionOptions.setUri("wss://api.hadax.com/ws");
-        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
+//        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
+//        subscriptionOptions.setConnectionDelayOnFailure(5);
+//        subscriptionOptions.setUri("wss://api.hadax.com/ws");
+//        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
 
         subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
             Candlestick data = candlestickEvent.getData();
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
index cd6cae0..2463a60 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -26,22 +26,49 @@
     @Autowired
     WebSocketServer webSocketServer;
 
-    @PostConstruct
+    @Autowired
+    private SubscriptionClient subscriptionClient;
+
+//    @PostConstruct
     public void data() throws Exception {
         webSocketServer.start();
         log.info("==================");
-        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
-        subscriptionOptions.setConnectionDelayOnFailure(5);
-        subscriptionOptions.setUri("wss://api.hadax.com/ws");
-        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
 
         subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
             Candlestick data = candlestickEvent.getData();
         });
 
-        subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
+//        subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
 //            log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids()));
 //            log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks()));
+//        });
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN1, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
+        });
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN5, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
+        });
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN15, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
+        });
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN60, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
+        });
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.HOUR4, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
+        });
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
+        });
+
+        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.WEEK1, (candlestickEvent) -> {
+            Candlestick data = candlestickEvent.getData();
         });
     }
 }
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
index 99ec8a9..32131b6 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -1,5 +1,6 @@
 package com.xcong.excoin.quartz.job;
 
+import com.alibaba.fastjson.JSONObject;
 import com.huobi.client.SubscriptionClient;
 import com.huobi.client.SubscriptionOptions;
 import com.huobi.client.model.Candlestick;
@@ -11,6 +12,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.data.redis.core.RedisTemplate;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
@@ -36,27 +38,31 @@
     @Resource
     private WebsocketPriceService websocketPriceService;
 
+    @Autowired
+    private RedisTemplate redisTemplate;
+
+    @Autowired
+    private SubscriptionClient subscriptionClient;
+
     @PostConstruct
     public void initNewestPrice() {
         log.info("#=======价格更新开启=======#");
-        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
-        subscriptionOptions.setConnectionDelayOnFailure(5);
-        subscriptionOptions.setUri("wss://api.hadax.com/ws");
-        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
+
         subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
             String symbol = tradeEvent.getSymbol();
-            // 根据symbol判断做什么操作
+//            // 根据symbol判断做什么操作
             symbol = CoinTypeConvert.convert(symbol);
             if (null != symbol) {
                 String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
-                // TODO 测试环境关闭这个插入redis
-                redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
-                // 比较
-                websocketPriceService.comparePriceAsc(symbol, price);
-                websocketPriceService.comparePriceDesc(symbol, price);
-                websocketPriceService.wholeBomb();
-                //System.out.println("比较完毕:"+symbol+"-"+price);
-
+                redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price);
+//                // TODO 测试环境关闭这个插入redis
+//                redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
+//                // 比较
+//                websocketPriceService.comparePriceAsc(symbol, price);
+//                websocketPriceService.comparePriceDesc(symbol, price);
+//                websocketPriceService.wholeBomb();
+//                //System.out.println("比较完毕:"+symbol+"-"+price);
+//
             }
 
         });

--
Gitblit v1.9.1