src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -131,6 +131,9 @@ public static final String QUEUE_MSG_HISTORY = "queue_msg_history"; public static final String ROUTING_KEY_MSG_HISTORY = "routing_key_msg_history"; public static final String QUEUE_TRC20_BLOCK = "QUEUE_TRC20_BLOCK"; public static final String ROUTING_TRC20_BLOCK = "ROUTING_TRC20_BLOCK"; @Resource private ConnectionFactory connectionFactory; @@ -196,6 +199,17 @@ } @Bean public Queue trc20Queue() { return new Queue(QUEUE_TRC20_BLOCK); } @Bean public Binding trc20Binding() { return BindingBuilder.bind(trc20Queue()).to(defaultExchange()).with(ROUTING_TRC20_BLOCK); } @Bean public Queue testQueue() { return new Queue(QUEUE_TEST, true); } src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java
@@ -3,8 +3,10 @@ import cn.hutool.http.HttpException; import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.modules.coin.service.BlockCoinService; import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; @@ -34,6 +36,9 @@ @Resource RedisUtils redisUtils; @Autowired private UsdtUpdateProducer usdtUpdateProducer; public static ConcurrentLinkedQueue<Long> TRC_BLOCK = new ConcurrentLinkedQueue<>(); @@ -45,20 +50,21 @@ public void usdtTc20Update() { // 波场3秒出一个块 Long blocnNum = TRC_BLOCK.poll(); log.info("------->{}", blocnNum); if (blocnNum == null) { return; } redisUtils.set("USDT_TRC20_CURRENT_BLOCK_NUM", blocnNum); try { trxUsdtUpdateService.monitorCoinListener(blocnNum); } catch (RestClientException | HttpException e) { // 此时是连接问题 这个块需要重新扫描 log.info("查询区块超时:" + blocnNum); TRC_BLOCK.add(blocnNum); } catch (Exception e) { e.printStackTrace(); } // log.info("=====>>{}", blocnNum); usdtUpdateProducer.sendTrc20BlockMsg(blocnNum.toString()); // redisUtils.set("USDT_TRC20_CURRENT_BLOCK_NUM", blocnNum); // try { // trxUsdtUpdateService.monitorCoinListener(blocnNum); // } catch (RestClientException | HttpException e) { // // 此时是连接问题 这个块需要重新扫描 // log.info("查询区块超时:" + blocnNum); // TRC_BLOCK.add(blocnNum); // } catch (Exception e) { // e.printStackTrace(); // } } @@ -66,6 +72,7 @@ public void usdtTc20UpdateQueue() { // 查询最新区块号 long getnowblock = trxUsdtUpdateService.getnowblockFromTronScan() - 25; log.info("=======>{}", getnowblock); // 拿到redis里最新区块 Object trc20BlockNum = redisUtils.get("USDT_TRC20_BLOCK_NUM"); if (trc20BlockNum == null) { src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -1,5 +1,6 @@ package com.xcong.excoin.rabbit.consumer; import cn.hutool.http.HttpException; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; @@ -7,14 +8,18 @@ import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService; import com.xcong.excoin.modules.coin.service.BlockCoinService; import com.xcong.excoin.quartz.job.BlockCoinUpdateJob; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; import javax.annotation.Resource; import java.math.BigDecimal; /** * @author wzy @@ -31,6 +36,9 @@ @Resource TrxUsdtUpdateService trxUsdtUpdateService; @Resource private RedisUtils redisUtils; @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE) @@ -52,7 +60,7 @@ @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS) public void addUsdtAddress(String content) { if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){ log.debug("#添加新地址---->{}#", content); log.info("#添加新地址---->{}#", content); if(StringUtils.isBlank(content)){ return; } @@ -73,4 +81,19 @@ } } @RabbitListener(queues = RabbitMqConfig.QUEUE_TRC20_BLOCK) public void trc20BlockMsg(String content) { Long blocnNum = Long.parseLong(content); redisUtils.set("USDT_TRC20_CURRENT_BLOCK_NUM", blocnNum); try { trxUsdtUpdateService.monitorCoinListener(blocnNum); } catch (RestClientException | HttpException e) { // 此时是连接问题 这个块需要重新扫描 log.info("查询区块超时:" + blocnNum); BlockCoinUpdateJob.TRC_BLOCK.add(blocnNum); } catch (Exception e) { e.printStackTrace(); } } } src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
@@ -34,12 +34,17 @@ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_ADDRESS, RabbitMqConfig.ROUTING_KEY_USDT_ADDRESS, content, correlationData); } public void sendTrc20BlockMsg(String content) { CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_TRC20_BLOCK, content, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("#----->{}#", correlationData); // log.info("#----->{}#", correlationData); if (ack) { log.info("success"); // log.info("success"); } else { log.info("--->{}", cause); } src/test/java/com/xcong/excoin/TrcTest.java
@@ -4,6 +4,7 @@ import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSON; import com.xcong.excoin.modules.blackchain.service.Trc20Service; import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -45,4 +46,12 @@ } } } @Autowired private TrxUsdtUpdateService trxUsdtUpdateService; @Test public void urlTest() { trxUsdtUpdateService.getnowblockFromTronScan(); } }