From e5e3bc5b8f863ce8f1069a12350432ea8cfbf965 Mon Sep 17 00:00:00 2001 From: Helius <wangdoubleone@gmail.com> Date: Fri, 26 Feb 2021 15:31:18 +0800 Subject: [PATCH] modify --- src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 22 +++ src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java | 10 ++ src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java | 22 ++++ src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java | 47 +++++++++ src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java | 39 ++++++- src/main/java/com/xcong/excoin/configurations/RedisConfig.java | 18 +++ src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java | 47 +++++++++ src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java | 2 src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java | 32 +++-- src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java | 12 + 10 files changed, 223 insertions(+), 28 deletions(-) diff --git a/src/main/java/com/xcong/excoin/configurations/RedisConfig.java b/src/main/java/com/xcong/excoin/configurations/RedisConfig.java index a9a61fa..fafc19c 100644 --- a/src/main/java/com/xcong/excoin/configurations/RedisConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RedisConfig.java @@ -3,11 +3,15 @@ import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; +import com.xcong.excoin.configurations.listener.RedisReceiver; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @@ -44,4 +48,18 @@ return template; } + @Bean + MessageListenerAdapter listenerAdapter(RedisReceiver receiver) { + return new MessageListenerAdapter(receiver, "onMessage"); + } + + @Bean + RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { + RedisMessageListenerContainer container = new RedisMessageListenerContainer(); + container.setConnectionFactory(connectionFactory); + container.addMessageListener(listenerAdapter, new PatternTopic("channel:newprice")); + return container; + } + + } diff --git a/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java b/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java index 06994e0..29afb65 100644 --- a/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java @@ -2,6 +2,8 @@ import com.aliyun.oss.OSS; import com.aliyun.oss.OSSClientBuilder; +import com.huobi.client.SubscriptionClient; +import com.huobi.client.SubscriptionOptions; import com.xcong.excoin.configurations.properties.AliOssProperties; import com.xcong.excoin.configurations.security.UserAuthenticationArgumentResolver; import com.xcong.excoin.utils.SpringContextHolder; @@ -51,6 +53,14 @@ return new OSSClientBuilder().build(aliOssProperties.getEndPoint(), aliOssProperties.getAccessKeyId(), aliOssProperties.getAccessKeySecret()); } + @Bean + public SubscriptionClient subscriptionClient() { + SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); + subscriptionOptions.setConnectionDelayOnFailure(3); + subscriptionOptions.setUri("wss://api.hadax.com/ws"); + return SubscriptionClient.create("", "", subscriptionOptions); + } + // @Bean // public SpringContextHolder springContextHolder() { // return new SpringContextHolder(); diff --git a/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java b/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java new file mode 100644 index 0000000..75a5730 --- /dev/null +++ b/src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java @@ -0,0 +1,47 @@ +package com.xcong.excoin.configurations.listener; + +import org.springframework.context.ApplicationEventPublisher; +import org.springframework.context.ApplicationEventPublisherAware; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.core.RedisKeyExpiredEvent; +import org.springframework.data.redis.listener.KeyspaceEventMessageListener; +import org.springframework.data.redis.listener.PatternTopic; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.data.redis.listener.Topic; +import org.springframework.lang.Nullable; + +/** + * @author wzy + * @date 2021-02-25 + **/ +public class KeyUpdateEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware { + private static final Topic KEYEVENT_DELETE_TOPIC = new PatternTopic("__keyevent@*__:set"); + + @Nullable + private ApplicationEventPublisher publisher; + + public KeyUpdateEventMessageListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + @Override + protected void doRegister(RedisMessageListenerContainer listenerContainer) { + listenerContainer.addMessageListener(this, KEYEVENT_DELETE_TOPIC); + } + + @Override + public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) { + this.publisher = applicationEventPublisher; + } + + @Override + protected void doHandleMessage(Message message) { + this.publishEvent(new RedisKeyExpiredEvent(message.getBody())); + } + + protected void publishEvent(RedisKeyExpiredEvent event) { + if (this.publisher != null) { + this.publisher.publishEvent(event); + } + } +} diff --git a/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java b/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java new file mode 100644 index 0000000..8be8810 --- /dev/null +++ b/src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java @@ -0,0 +1,22 @@ +package com.xcong.excoin.configurations.listener; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.listener.RedisMessageListenerContainer; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @date 2021-02-25 + **/ +@Slf4j +public class RedisKeyUpdateListener extends KeyUpdateEventMessageListener{ + public RedisKeyUpdateListener(RedisMessageListenerContainer listenerContainer) { + super(listenerContainer); + } + + @Override + public void onMessage(Message message, byte[] pattern) { + log.info("{}", message.toString()); + } +} diff --git a/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java b/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java new file mode 100644 index 0000000..f8a4bba --- /dev/null +++ b/src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java @@ -0,0 +1,47 @@ +package com.xcong.excoin.configurations.listener; + +import com.alibaba.fastjson.JSONObject; +import com.huobi.client.model.event.TradeEvent; +import com.xcong.excoin.modules.symbols.service.SymbolsService; +import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; +import com.xcong.excoin.utils.CoinTypeConvert; +import com.xcong.excoin.utils.RedisUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.connection.Message; +import org.springframework.data.redis.connection.MessageListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @author wzy + * @date 2021-02-25 + **/ +@Slf4j +@Component +public class RedisReceiver implements MessageListener { + + @Resource + private RedisUtils redisUtils; + + @Resource + private SymbolsService symbolsService; + + @Resource + private WebsocketPriceService websocketPriceService; + + @Override + public void onMessage(Message message, byte[] bytes) { + String data = message.toString().replaceAll("\"", ""); + String[] dataArr = data.split("_"); + + String symbol = dataArr[0]; + String price = dataArr[1]; + + redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); + // 比较 + websocketPriceService.comparePriceAsc(symbol, price); + websocketPriceService.comparePriceDesc(symbol, price); + websocketPriceService.wholeBomb(); + } +} diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java index a7e99ec..396da2f 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java @@ -85,7 +85,7 @@ // 获取最新区块 String string = redisUtils.getString(ETH_USDT_BLOCK_NUM); if(string==null){ - string = "11892420"; + string = "11925303"; } BigInteger blockNum = new BigInteger(string); Credentials credentials = Credentials.create(privateKey); diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java index 3937137..e6f6336 100644 --- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java +++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java @@ -61,6 +61,12 @@ } public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { + String t = ""; + if (type.contains("depth")) { + String[] s = type.split("_"); + type = s[0]; + t = s[1]; + } switch (type) { case "kline" : ChannelGroup kline = KLINE_MAP.get(symbol); @@ -71,12 +77,13 @@ KLINE_MAP.put(symbol, kline); break; case "depth" : - ChannelGroup depth = DEPTH_MAP.get(symbol); + String key = symbol + "_" + t; + ChannelGroup depth = DEPTH_MAP.get(key); if (depth == null) { depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } depth.add(channel); - DEPTH_MAP.put(symbol, depth); + DEPTH_MAP.put(key, depth); break; case "trade" : ChannelGroup trade = TRADE_MAP.get(symbol); @@ -92,6 +99,12 @@ } public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { + String t = ""; + if (type.contains("depth")) { + String[] s = type.split("_"); + type = s[0]; + t = s[1]; + } switch (type) { case "kline" : ChannelGroup kline = KLINE_MAP.get(symbol); @@ -102,12 +115,13 @@ KLINE_MAP.put(symbol, kline); break; case "depth" : - ChannelGroup depth = DEPTH_MAP.get(symbol); + String key = symbol + "_" + t; + ChannelGroup depth = DEPTH_MAP.get(key); if (depth == null) { return; } depth.remove(channel); - DEPTH_MAP.put(symbol, depth); + DEPTH_MAP.put(key, depth); break; case "trade" : ChannelGroup trade = TRADE_MAP.get(symbol); diff --git a/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java index 6d6438c..364f75d 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java @@ -9,6 +9,7 @@ import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; @@ -35,13 +36,16 @@ @Resource private WebsocketPriceService websocketPriceService; + @Autowired + private SubscriptionClient subscriptionClient; + @PostConstruct public void initNewestPrice() { log.info("#=======价格更新开启=======#"); - SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); - subscriptionOptions.setConnectionDelayOnFailure(5); - subscriptionOptions.setUri("wss://api.hadax.com/ws"); - SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); +// SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); +// subscriptionOptions.setConnectionDelayOnFailure(5); +// subscriptionOptions.setUri("wss://api.hadax.com/ws"); +// SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { Candlestick data = candlestickEvent.getData(); diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java index cd6cae0..2463a60 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java @@ -26,22 +26,49 @@ @Autowired WebSocketServer webSocketServer; - @PostConstruct + @Autowired + private SubscriptionClient subscriptionClient; + +// @PostConstruct public void data() throws Exception { webSocketServer.start(); log.info("=================="); - SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); - subscriptionOptions.setConnectionDelayOnFailure(5); - subscriptionOptions.setUri("wss://api.hadax.com/ws"); - SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { Candlestick data = candlestickEvent.getData(); }); - subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> { +// subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> { // log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids())); // log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks())); +// }); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN1, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); + }); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN5, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); + }); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN15, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); + }); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN60, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); + }); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.HOUR4, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); + }); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); + }); + + subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.WEEK1, (candlestickEvent) -> { + Candlestick data = candlestickEvent.getData(); }); } } diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java index 99ec8a9..32131b6 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java @@ -1,5 +1,6 @@ package com.xcong.excoin.quartz.job; +import com.alibaba.fastjson.JSONObject; import com.huobi.client.SubscriptionClient; import com.huobi.client.SubscriptionOptions; import com.huobi.client.model.Candlestick; @@ -11,6 +12,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -36,27 +38,31 @@ @Resource private WebsocketPriceService websocketPriceService; + @Autowired + private RedisTemplate redisTemplate; + + @Autowired + private SubscriptionClient subscriptionClient; + @PostConstruct public void initNewestPrice() { log.info("#=======价格更新开启=======#"); - SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); - subscriptionOptions.setConnectionDelayOnFailure(5); - subscriptionOptions.setUri("wss://api.hadax.com/ws"); - SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); + subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> { String symbol = tradeEvent.getSymbol(); - // 根据symbol判断做什么操作 +// // 根据symbol判断做什么操作 symbol = CoinTypeConvert.convert(symbol); if (null != symbol) { String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString(); - // TODO 测试环境关闭这个插入redis - redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); - // 比较 - websocketPriceService.comparePriceAsc(symbol, price); - websocketPriceService.comparePriceDesc(symbol, price); - websocketPriceService.wholeBomb(); - //System.out.println("比较完毕:"+symbol+"-"+price); - + redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price); +// // TODO 测试环境关闭这个插入redis +// redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); +// // 比较 +// websocketPriceService.comparePriceAsc(symbol, price); +// websocketPriceService.comparePriceDesc(symbol, price); +// websocketPriceService.wholeBomb(); +// //System.out.println("比较完毕:"+symbol+"-"+price); +// } }); -- Gitblit v1.9.1