Helius
2021-02-26 e5e3bc5b8f863ce8f1069a12350432ea8cfbf965
modify
3 files added
7 files modified
251 ■■■■ changed files
src/main/java/com/xcong/excoin/configurations/RedisConfig.java 18 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java 47 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java 47 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java 2 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java 22 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java 12 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java 39 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java 32 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RedisConfig.java
@@ -3,11 +3,15 @@
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.xcong.excoin.configurations.listener.RedisReceiver;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@@ -44,4 +48,18 @@
        return template;
    }
    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver receiver) {
        return new MessageListenerAdapter(receiver, "onMessage");
    }
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listenerAdapter, new PatternTopic("channel:newprice"));
        return container;
    }
}
src/main/java/com/xcong/excoin/configurations/WebMvcConfig.java
@@ -2,6 +2,8 @@
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.xcong.excoin.configurations.properties.AliOssProperties;
import com.xcong.excoin.configurations.security.UserAuthenticationArgumentResolver;
import com.xcong.excoin.utils.SpringContextHolder;
@@ -51,6 +53,14 @@
        return new OSSClientBuilder().build(aliOssProperties.getEndPoint(), aliOssProperties.getAccessKeyId(), aliOssProperties.getAccessKeySecret());
    }
    @Bean
    public SubscriptionClient subscriptionClient() {
        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
        subscriptionOptions.setConnectionDelayOnFailure(3);
        subscriptionOptions.setUri("wss://api.hadax.com/ws");
        return SubscriptionClient.create("", "", subscriptionOptions);
    }
//    @Bean
//    public SpringContextHolder springContextHolder() {
//        return new SpringContextHolder();
src/main/java/com/xcong/excoin/configurations/listener/KeyUpdateEventMessageListener.java
New file
@@ -0,0 +1,47 @@
package com.xcong.excoin.configurations.listener;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisKeyExpiredEvent;
import org.springframework.data.redis.listener.KeyspaceEventMessageListener;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.lang.Nullable;
/**
 * @author wzy
 * @date 2021-02-25
 **/
public class KeyUpdateEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
    private static final Topic KEYEVENT_DELETE_TOPIC = new PatternTopic("__keyevent@*__:set");
    @Nullable
    private ApplicationEventPublisher publisher;
    public KeyUpdateEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, KEYEVENT_DELETE_TOPIC);
    }
    @Override
    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
    @Override
    protected void doHandleMessage(Message message) {
        this.publishEvent(new RedisKeyExpiredEvent(message.getBody()));
    }
    protected void publishEvent(RedisKeyExpiredEvent event) {
        if (this.publisher != null) {
            this.publisher.publishEvent(event);
        }
    }
}
src/main/java/com/xcong/excoin/configurations/listener/RedisKeyUpdateListener.java
New file
@@ -0,0 +1,22 @@
package com.xcong.excoin.configurations.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
/**
 * @author wzy
 * @date 2021-02-25
 **/
@Slf4j
public class RedisKeyUpdateListener extends KeyUpdateEventMessageListener{
    public RedisKeyUpdateListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }
    @Override
    public void onMessage(Message message, byte[] pattern) {
        log.info("{}", message.toString());
    }
}
src/main/java/com/xcong/excoin/configurations/listener/RedisReceiver.java
New file
@@ -0,0 +1,47 @@
package com.xcong.excoin.configurations.listener;
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.event.TradeEvent;
import com.xcong.excoin.modules.symbols.service.SymbolsService;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wzy
 * @date 2021-02-25
 **/
@Slf4j
@Component
public class RedisReceiver implements MessageListener {
    @Resource
    private RedisUtils redisUtils;
    @Resource
    private SymbolsService symbolsService;
    @Resource
    private WebsocketPriceService websocketPriceService;
    @Override
    public void onMessage(Message message, byte[] bytes) {
        String data = message.toString().replaceAll("\"", "");
        String[] dataArr = data.split("_");
        String symbol = dataArr[0];
        String price = dataArr[1];
        redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
        // 比较
        websocketPriceService.comparePriceAsc(symbol, price);
        websocketPriceService.comparePriceDesc(symbol, price);
        websocketPriceService.wholeBomb();
    }
}
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -85,7 +85,7 @@
        // 获取最新区块
        String string = redisUtils.getString(ETH_USDT_BLOCK_NUM);
        if(string==null){
            string = "11892420";
            string = "11925303";
        }
        BigInteger blockNum = new BigInteger(string);
        Credentials credentials = Credentials.create(privateKey);
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -61,6 +61,12 @@
    }
    public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        String t = "";
        if (type.contains("depth")) {
            String[] s = type.split("_");
            type = s[0];
            t = s[1];
        }
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -71,12 +77,13 @@
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                String key = symbol + "_" + t;
                ChannelGroup depth = DEPTH_MAP.get(key);
                if (depth == null) {
                    depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
                }
                depth.add(channel);
                DEPTH_MAP.put(symbol, depth);
                DEPTH_MAP.put(key, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
@@ -92,6 +99,12 @@
    }
    public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) {
        String t = "";
        if (type.contains("depth")) {
            String[] s = type.split("_");
            type = s[0];
            t = s[1];
        }
        switch (type) {
            case "kline" :
                ChannelGroup kline = KLINE_MAP.get(symbol);
@@ -102,12 +115,13 @@
                KLINE_MAP.put(symbol, kline);
                break;
            case "depth" :
                ChannelGroup depth = DEPTH_MAP.get(symbol);
                String key = symbol + "_" + t;
                ChannelGroup depth = DEPTH_MAP.get(key);
                if (depth == null) {
                    return;
                }
                depth.remove(channel);
                DEPTH_MAP.put(symbol, depth);
                DEPTH_MAP.put(key, depth);
                break;
            case "trade" :
                ChannelGroup trade = TRADE_MAP.get(symbol);
src/main/java/com/xcong/excoin/quartz/job/DayLineDataUpdateJob.java
@@ -9,6 +9,7 @@
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
@@ -35,13 +36,16 @@
    @Resource
    private WebsocketPriceService websocketPriceService;
    @Autowired
    private SubscriptionClient subscriptionClient;
    @PostConstruct
    public void initNewestPrice() {
        log.info("#=======价格更新开启=======#");
        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
        subscriptionOptions.setConnectionDelayOnFailure(5);
        subscriptionOptions.setUri("wss://api.hadax.com/ws");
        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
//        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
//        subscriptionOptions.setConnectionDelayOnFailure(5);
//        subscriptionOptions.setUri("wss://api.hadax.com/ws");
//        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
src/main/java/com/xcong/excoin/quartz/job/KLineDataJob.java
@@ -26,22 +26,49 @@
    @Autowired
    WebSocketServer webSocketServer;
    @PostConstruct
    @Autowired
    private SubscriptionClient subscriptionClient;
//    @PostConstruct
    public void data() throws Exception {
        webSocketServer.start();
        log.info("==================");
        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
        subscriptionOptions.setConnectionDelayOnFailure(5);
        subscriptionOptions.setUri("wss://api.hadax.com/ws");
        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
//        subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> {
//            log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids()));
//            log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks()));
//        });
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN1, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN5, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN15, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.MIN60, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.HOUR4, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.WEEK1, (candlestickEvent) -> {
            Candlestick data = candlestickEvent.getData();
        });
    }
}
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -1,5 +1,6 @@
package com.xcong.excoin.quartz.job;
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.huobi.client.model.Candlestick;
@@ -11,6 +12,7 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@@ -36,27 +38,31 @@
    @Resource
    private WebsocketPriceService websocketPriceService;
    @Autowired
    private RedisTemplate redisTemplate;
    @Autowired
    private SubscriptionClient subscriptionClient;
    @PostConstruct
    public void initNewestPrice() {
        log.info("#=======价格更新开启=======#");
        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
        subscriptionOptions.setConnectionDelayOnFailure(5);
        subscriptionOptions.setUri("wss://api.hadax.com/ws");
        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
        subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
            String symbol = tradeEvent.getSymbol();
            // 根据symbol判断做什么操作
//            // 根据symbol判断做什么操作
            symbol = CoinTypeConvert.convert(symbol);
            if (null != symbol) {
                String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
                // TODO 测试环境关闭这个插入redis
                redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
                // 比较
                websocketPriceService.comparePriceAsc(symbol, price);
                websocketPriceService.comparePriceDesc(symbol, price);
                websocketPriceService.wholeBomb();
                //System.out.println("比较完毕:"+symbol+"-"+price);
                redisTemplate.convertAndSend("channel:newprice", symbol + "_" + price);
//                // TODO 测试环境关闭这个插入redis
//                redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
//                // 比较
//                websocketPriceService.comparePriceAsc(symbol, price);
//                websocketPriceService.comparePriceDesc(symbol, price);
//                websocketPriceService.wholeBomb();
//                //System.out.println("比较完毕:"+symbol+"-"+price);
//
            }
        });