From 3b333e4a4ab9a358912dbd5041e68ff5ef3697af Mon Sep 17 00:00:00 2001 From: xiaoyong931011 <15274802129@163.com> Date: Fri, 28 May 2021 11:05:03 +0800 Subject: [PATCH] Merge branch 'otc' of http://120.27.238.55:7000/r/exchange into otc --- src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java | 9 +++- src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java | 29 +++++++++----- src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 14 +++++++ src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java | 22 ++++++++++- src/test/java/com/xcong/excoin/TrcTest.java | 9 ++++ 5 files changed, 68 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index 00dcc4d..d43efca 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -131,6 +131,9 @@ public static final String QUEUE_MSG_HISTORY = "queue_msg_history"; public static final String ROUTING_KEY_MSG_HISTORY = "routing_key_msg_history"; + public static final String QUEUE_TRC20_BLOCK = "QUEUE_TRC20_BLOCK"; + public static final String ROUTING_TRC20_BLOCK = "ROUTING_TRC20_BLOCK"; + @Resource private ConnectionFactory connectionFactory; @@ -196,6 +199,17 @@ } @Bean + public Queue trc20Queue() { + return new Queue(QUEUE_TRC20_BLOCK); + } + + @Bean + public Binding trc20Binding() { + return BindingBuilder.bind(trc20Queue()).to(defaultExchange()).with(ROUTING_TRC20_BLOCK); + } + + + @Bean public Queue testQueue() { return new Queue(QUEUE_TEST, true); } diff --git a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java index 8f90542..1ad0f32 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java @@ -3,8 +3,10 @@ import cn.hutool.http.HttpException; import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.modules.coin.service.BlockCoinService; +import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; @@ -34,6 +36,9 @@ @Resource RedisUtils redisUtils; + @Autowired + private UsdtUpdateProducer usdtUpdateProducer; + public static ConcurrentLinkedQueue<Long> TRC_BLOCK = new ConcurrentLinkedQueue<>(); @@ -41,24 +46,25 @@ * TRC20_USDT 同步 */ @Scheduled(cron = "0/2 * * * * ? ") - @Async +// @Async public void usdtTc20Update() { // 波场3秒出一个块 Long blocnNum = TRC_BLOCK.poll(); - log.info("------->{}", blocnNum); if (blocnNum == null) { return; } + log.info("=====>>{}", blocnNum); + usdtUpdateProducer.sendTrc20BlockMsg(blocnNum.toString()); redisUtils.set("USDT_TRC20_CURRENT_BLOCK_NUM", blocnNum); - try { - trxUsdtUpdateService.monitorCoinListener(blocnNum); - } catch (RestClientException | HttpException e) { - // 此时是连接问题 这个块需要重新扫描 - log.info("查询区块超时:" + blocnNum); - TRC_BLOCK.add(blocnNum); - } catch (Exception e) { - e.printStackTrace(); - } +// try { +// trxUsdtUpdateService.monitorCoinListener(blocnNum); +// } catch (RestClientException | HttpException e) { +// // 此时是连接问题 这个块需要重新扫描 +// log.info("查询区块超时:" + blocnNum); +// TRC_BLOCK.add(blocnNum); +// } catch (Exception e) { +// e.printStackTrace(); +// } } @@ -66,6 +72,7 @@ public void usdtTc20UpdateQueue() { // 查询最新区块号 long getnowblock = trxUsdtUpdateService.getnowblockFromTronScan() - 25; + log.info("=======>{}", getnowblock); // 拿到redis里最新区块 Object trc20BlockNum = redisUtils.get("USDT_TRC20_BLOCK_NUM"); if (trc20BlockNum == null) { diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java index a6a1158..6fab80b 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java @@ -1,5 +1,6 @@ package com.xcong.excoin.rabbit.consumer; +import cn.hutool.http.HttpException; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; @@ -7,14 +8,18 @@ import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService; import com.xcong.excoin.modules.coin.service.BlockCoinService; +import com.xcong.excoin.quartz.job.BlockCoinUpdateJob; +import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; +import org.springframework.web.client.RestClientException; import javax.annotation.Resource; import java.math.BigDecimal; + /** * @author wzy @@ -31,7 +36,6 @@ @Resource TrxUsdtUpdateService trxUsdtUpdateService; - @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE) public void doSomething(String content) { @@ -52,7 +56,7 @@ @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS) public void addUsdtAddress(String content) { if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){ - log.debug("#添加新地址---->{}#", content); + log.info("#添加新地址---->{}#", content); if(StringUtils.isBlank(content)){ return; } @@ -73,4 +77,18 @@ } } + + @RabbitListener(queues = RabbitMqConfig.QUEUE_TRC20_BLOCK) + public void trc20BlockMsg(String content) { + Long blocnNum = Long.parseLong(content); + try { + trxUsdtUpdateService.monitorCoinListener(blocnNum); + } catch (RestClientException | HttpException e) { + // 此时是连接问题 这个块需要重新扫描 + log.info("查询区块超时:" + blocnNum); + BlockCoinUpdateJob.TRC_BLOCK.add(blocnNum); + } catch (Exception e) { + e.printStackTrace(); + } + } } diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java index c89f4c9..6065f63 100644 --- a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java +++ b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java @@ -34,12 +34,17 @@ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_ADDRESS, RabbitMqConfig.ROUTING_KEY_USDT_ADDRESS, content, correlationData); } + public void sendTrc20BlockMsg(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_TRC20_BLOCK, content, correlationData); + } + @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { - log.info("#----->{}#", correlationData); +// log.info("#----->{}#", correlationData); if (ack) { - log.info("success"); +// log.info("success"); } else { log.info("--->{}", cause); } diff --git a/src/test/java/com/xcong/excoin/TrcTest.java b/src/test/java/com/xcong/excoin/TrcTest.java index 85ae927..3f15749 100644 --- a/src/test/java/com/xcong/excoin/TrcTest.java +++ b/src/test/java/com/xcong/excoin/TrcTest.java @@ -4,6 +4,7 @@ import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSON; import com.xcong.excoin.modules.blackchain.service.Trc20Service; +import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -45,4 +46,12 @@ } } } + + @Autowired + private TrxUsdtUpdateService trxUsdtUpdateService; + + @Test + public void urlTest() { + trxUsdtUpdateService.getnowblockFromTronScan(); + } } -- Gitblit v1.9.1