From e5e3bc5b8f863ce8f1069a12350432ea8cfbf965 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Fri, 26 Feb 2021 15:31:18 +0800
Subject: [PATCH] modify
---
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 22 +++
src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java | 10 ++
src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java | 22 ++++
src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java | 47 +++++++++
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java | 39 ++++++-
src/main/java/com/xcong/excoin/configurations/RedisConfig.java | 18 +++
src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java | 47 +++++++++
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java | 2
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java | 32 +++--
src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java | 12 +
10 files changed, 223 insertions(+), 28 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/configurations/RedisConfig.java b/src/main/java/com/xcong/excoin/configurations/RedisConfig.java
index a9a61fa..fafc19c 100644
--- a/src/main/java/com/xcong/excoin/configurations/RedisConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RedisConfig.java
@@ -3,11 +3,15 @@
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.xcong.excoin.configurations.listener.RedisReceiver;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -44,4 +48,18 @@
return template;
}
+ @Bean
+ MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
+ return new MessageListenerAdapter(receiver, "onMessage");
+ }
+
+ @Bean
+ RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
+ RedisMessageListenerContainer container = new RedisMessageListenerContainer();
+ container.setConnectionFactory(connectionFactory);
+ container.addMessageListener(listenerAdapter, new PatternTopic("channel:newprice"));
+ return container;
+ }
+
+
}
diff --git a/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java b/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java
index 06994e0..29afb65 100644
--- a/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java
@@ -2,6 +2,8 @@
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
+import com.huobi.client.SubscriptionClient;
+import com.huobi.client.SubscriptionOptions;
import com.xcong.excoin.configurations.properties.AliOssProperties;
import com.xcong.excoin.configurations.security.UserAuthenticationArgumentResolver;
import com.xcong.excoin.utils.SpringContextHolder;
@@ -51,6 +53,14 @@
return new OSSClientBuilder().build(aliOssProperties.getEndPoint(), aliOssProperties.getAccessKeyId(), aliOssProperties.getAccessKeySecret());
}
+ @Bean
+ public SubscriptionClient subscriptionClient() {
+ SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
+ subscriptionOptions.setConnectionDelayOnFailure(3);
+ subscriptionOptions.setUri("wss://api.hadax.com/ws");
+ return SubscriptionClient.create("", "", subscriptionOptions);
+ }
+
// @Bean
// public SpringContextHolder springContextHolder() {
// return new SpringContextHolder();
diff --git a/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java b/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java
new file mode 100644
index 0000000..75a5730
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.configurations.listener;
+
+import org.springframework.context.ApplicationEventPublisher;
+import org.springframework.context.ApplicationEventPublisherAware;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.core.RedisKeyExpiredEvent;
+import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
+import org.springframework.data.redis.listener.PatternTopic;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.data.redis.listener.Topic;
+import org.springframework.lang.Nullable;
+
+/**
+ * @author wzy
+ * @date 2021-02-25
+ **/
+public class KeyUpdateEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
+ private static final Topic KEYEVENT_DELETE_TOPIC = new PatternTopic("__keyevent@*__:set");
+
+ @Nullable
+ private ApplicationEventPublisher publisher;
+
+ public KeyUpdateEventMessageListener(RedisMessageListenerContainer listenerContainer) {
+ super(listenerContainer);
+ }
+
+ @Override
+ protected void doRegister(RedisMessageListenerContainer listenerContainer) {
+ listenerContainer.addMessageListener(this, KEYEVENT_DELETE_TOPIC);
+ }
+
+ @Override
+ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
+ this.publisher = applicationEventPublisher;
+ }
+
+ @Override
+ protected void doHandleMessage(Message message) {
+ this.publishEvent(new RedisKeyExpiredEvent(message.getBody()));
+ }
+
+ protected void publishEvent(RedisKeyExpiredEvent event) {
+ if (this.publisher != null) {
+ this.publisher.publishEvent(event);
+ }
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java b/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java
new file mode 100644
index 0000000..8be8810
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java
@@ -0,0 +1,22 @@
+package com.xcong.excoin.configurations.listener;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.listener.RedisMessageListenerContainer;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2021-02-25
+ **/
+@Slf4j
+public class RedisKeyUpdateListener extends KeyUpdateEventMessageListener{
+ public RedisKeyUpdateListener(RedisMessageListenerContainer listenerContainer) {
+ super(listenerContainer);
+ }
+
+ @Override
+ public void onMessage(Message message, byte[] pattern) {
+ log.info("{}", message.toString());
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java b/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java
new file mode 100644
index 0000000..f8a4bba
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java
@@ -0,0 +1,47 @@
+package com.xcong.excoin.configurations.listener;
+
+import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.model.event.TradeEvent;
+import com.xcong.excoin.modules.symbols.service.SymbolsService;
+import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
+import com.xcong.excoin.utils.CoinTypeConvert;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.data.redis.connection.Message;
+import org.springframework.data.redis.connection.MessageListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @author wzy
+ * @date 2021-02-25
+ **/
+@Slf4j
+@Component
+public class RedisReceiver implements MessageListener {
+
+ @Resource
+ private RedisUtils redisUtils;
+
+ @Resource
+ private SymbolsService symbolsService;
+
+ @Resource
+ private WebsocketPriceService websocketPriceService;
+
+ @Override
+ public void onMessage(Message message, byte[] bytes) {
+ String data = message.toString().replaceAll("\"", "");
+ String[] dataArr = data.split("_");
+
+ String symbol = dataArr[0];
+ String price = dataArr[1];
+
+ redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
+ // 比较
+ websocketPriceService.comparePriceAsc(symbol, price);
+ websocketPriceService.comparePriceDesc(symbol, price);
+ websocketPriceService.wholeBomb();
+ }
+}
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
index a7e99ec..396da2f 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -85,7 +85,7 @@
// 获取最新区块
String string = redisUtils.getString(ETH_USDT_BLOCK_NUM);
if(string==null){
- string = "11892420";
+ string = "11925303";
}
BigInteger blockNum = new BigInteger(string);
Credentials credentials = Credentials.create(privateKey);
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 3937137..e6f6336 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -61,6 +61,12 @@
}
public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+ String t = "";
+ if (type.contains("depth")) {
+ String[] s = type.split("_");
+ type = s[0];
+ t = s[1];
+ }
switch (type) {
case "kline" :
ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -71,12 +77,13 @@
KLINE_MAP.put(symbol, kline);
break;
case "depth" :
- ChannelGroup depth = DEPTH_MAP.get(symbol);
+ String key = symbol + "_" + t;
+ ChannelGroup depth = DEPTH_MAP.get(key);
if (depth == null) {
depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
depth.add(channel);
- DEPTH_MAP.put(symbol, depth);
+ DEPTH_MAP.put(key, depth);
break;
case "trade" :
ChannelGroup trade = TRADE_MAP.get(symbol);
@@ -92,6 +99,12 @@
}
public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
+ String t = "";
+ if (type.contains("depth")) {
+ String[] s = type.split("_");
+ type = s[0];
+ t = s[1];
+ }
switch (type) {
case "kline" :
ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -102,12 +115,13 @@
KLINE_MAP.put(symbol, kline);
break;
case "depth" :
- ChannelGroup depth = DEPTH_MAP.get(symbol);
+ String key = symbol + "_" + t;
+ ChannelGroup depth = DEPTH_MAP.get(key);
if (depth == null) {
return;
}
depth.remove(channel);
- DEPTH_MAP.put(symbol, depth);
+ DEPTH_MAP.put(key, depth);
break;
case "trade" :
ChannelGroup trade = TRADE_MAP.get(symbol);
diff --git a/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java
index 6d6438c..364f75d 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java
@@ -9,6 +9,7 @@
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@@ -35,13 +36,16 @@
@Resource
private WebsocketPriceService websocketPriceService;
+ @Autowired
+ private SubscriptionClient subscriptionClient;
+
@PostConstruct
public void initNewestPrice() {
log.info("#=======价格更新开启=======#");
- SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
- subscriptionOptions.setConnectionDelayOnFailure(5);
- subscriptionOptions.setUri("wss://api.hadax.com/ws");
- SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
+// SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
+// subscriptionOptions.setConnectionDelayOnFailure(5);
+// subscriptionOptions.setUri("wss://api.hadax.com/ws");
+// SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
Candlestick data = candlestickEvent.getData();
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
index cd6cae0..2463a60 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -26,22 +26,49 @@
@Autowired
WebSocketServer webSocketServer;
- @PostConstruct
+ @Autowired
+ private SubscriptionClient subscriptionClient;
+
+// @PostConstruct
public void data() throws Exception {
webSocketServer.start();
log.info("==================");
- SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
- subscriptionOptions.setConnectionDelayOnFailure(5);
- subscriptionOptions.setUri("wss://api.hadax.com/ws");
- SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
Candlestick data = candlestickEvent.getData();
});
- subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
+// subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
// log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids()));
// log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks()));
+// });
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN1, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
+ });
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN5, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
+ });
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN15, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
+ });
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN60, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
+ });
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.HOUR4, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
+ });
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
+ });
+
+ subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.WEEK1, (candlestickEvent) -> {
+ Candlestick data = candlestickEvent.getData();
});
}
}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
index 99ec8a9..32131b6 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -1,5 +1,6 @@
package com.xcong.excoin.quartz.job;
+import com.alibaba.fastjson.JSONObject;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.huobi.client.model.Candlestick;
@@ -11,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@@ -36,27 +38,31 @@
@Resource
private WebsocketPriceService websocketPriceService;
+ @Autowired
+ private RedisTemplate redisTemplate;
+
+ @Autowired
+ private SubscriptionClient subscriptionClient;
+
@PostConstruct
public void initNewestPrice() {
log.info("#=======价格更新开启=======#");
- SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
- subscriptionOptions.setConnectionDelayOnFailure(5);
- subscriptionOptions.setUri("wss://api.hadax.com/ws");
- SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
+
subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
String symbol = tradeEvent.getSymbol();
- // 根据symbol判断做什么操作
+// // 根据symbol判断做什么操作
symbol = CoinTypeConvert.convert(symbol);
if (null != symbol) {
String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
- // TODO 测试环境关闭这个插入redis
- redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
- // 比较
- websocketPriceService.comparePriceAsc(symbol, price);
- websocketPriceService.comparePriceDesc(symbol, price);
- websocketPriceService.wholeBomb();
- //System.out.println("比较完毕:"+symbol+"-"+price);
-
+ redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price);
+// // TODO 测试环境关闭这个插入redis
+// redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
+// // 比较
+// websocketPriceService.comparePriceAsc(symbol, price);
+// websocketPriceService.comparePriceDesc(symbol, price);
+// websocketPriceService.wholeBomb();
+// //System.out.println("比较完毕:"+symbol+"-"+price);
+//
}
});
--
Gitblit v1.9.1