From c24fc100ef9966495dc706e110fc37f13e003448 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Thu, 08 Oct 2020 21:34:28 +0800
Subject: [PATCH] 优化usdt同步

---
 src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java                     |   14 +-
 src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java                     |   25 ++++
 src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java                |   33 +++++
 src/main/resources/mapper/member/MemberCoinAddressDao.xml                             |    7 +
 src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java                    |    1 
 src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java   |    3 
 src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java    |   73 +++++++++--
 src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java             |    3 
 src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java           |    1 
 src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java                     |   10 
 src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java         |   25 ++++
 src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java                |   42 +++++++
 src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java |   51 ++++++-
 src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java                       |   31 +++++
 src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java                      |   20 ---
 15 files changed, 281 insertions(+), 58 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
index 3549a5f..501d615 100644
--- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -29,6 +29,14 @@
 
     public static final String EXCHANGE_A = "biue-exchange-A";
 
+
+    public static final String EXCHANGE_USDT_UPDATE = "exchange_usdt_update";
+
+    public static final String QUEUE_USDT_UPDATE = "queue_usdt_update";
+
+    public static final String ROUTING_KEY_USDT_UPDATE = "routing_key_usdt_update";
+
+
     /**
      * 撮合交易
      */
@@ -131,6 +139,23 @@
     }
 
 
+   @Bean
+    public DirectExchange usdtUpdateExchange() {
+        return new DirectExchange(EXCHANGE_USDT_UPDATE);
+    }
+
+
+    @Bean
+    public Queue usdtUpdateQueue() {
+        return new Queue(QUEUE_USDT_UPDATE, true);
+    }
+
+    @Bean
+    public Binding usdtUpdatebinding() {
+        return BindingBuilder.bind(usdtUpdateQueue()).to(usdtUpdateExchange()).with(ROUTING_KEY_USDT_UPDATE);
+    }
+
+
     /**
      * 交换器A 可以继续添加交换器B C
      *
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java b/src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java
new file mode 100644
index 0000000..8d07d27
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java
@@ -0,0 +1,25 @@
+package com.xcong.excoin.modules.blackchain.model;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+/**
+ *  充值扫块dto
+ */
+@Data
+public class EthUsdtChargeDto {
+
+    private String address;
+    private String hash;
+    private BigDecimal balance;
+
+    public EthUsdtChargeDto() {
+    }
+
+    public EthUsdtChargeDto(String address, String hash, BigDecimal balance) {
+        this.address = address;
+        this.hash = hash;
+        this.balance = balance;
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
index cd653c2..33c2c20 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
@@ -178,6 +178,9 @@
                                 coinAddress.setLabel(uuid);
                                 memberMapper.insert(coinAddress);
                             }
+                            if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(address)){
+                                UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address);
+                            }
                         }
                         break;
                     case "ROC":
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
index a67b220..6daf22c 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -1,6 +1,12 @@
 package com.xcong.excoin.modules.blackchain.service;
 
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.common.enumerates.CoinTypeEnum;
+import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
+import com.xcong.excoin.modules.member.dao.MemberCoinAddressDao;
+import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer;
 import com.xcong.excoin.utils.RedisUtils;
+import org.apache.commons.lang.StringUtils;
 import org.springframework.stereotype.Service;
 import org.web3j.crypto.Credentials;
 import org.web3j.protocol.Web3j;
@@ -18,11 +24,18 @@
 
 @Service
 public class UsdtErc20UpdateService {
+
+    @Resource
+    private UsdtUpdateProducer usdtUpdateProducer;
+
+    @Resource
+    private MemberCoinAddressDao  coinWalletDao;
+
     public final static List<String> ALL_ADDRESS_LIST = new ArrayList<>();
 
-    public final static String USDT_BLOCK_NUM = "USDT_BLOCK_NUM";
+    public final static String USDT_BLOCK_NUM_GOLDEN = "USDT_BLOCK_NUM_GOLDEN";
 
-    private final static BigInteger DIVIDE_USDT = new BigInteger("1000000");
+    private final static BigDecimal DIVIDE_USDT = new BigDecimal("1000000");
 
     private static Web3j web3;
 
@@ -58,22 +71,39 @@
 
     @Resource
     private RedisUtils redisUtils;
+
+
     public void updateUsdt(){
+        // 首先查询所有的钱包地址
+        List<String> tdCoinWallets = coinWalletDao.selectAllSymbolAddress(CoinTypeEnum.USDT.toString(),"ERC20");
+        if(tdCoinWallets!=null){
+            ALL_ADDRESS_LIST.addAll(tdCoinWallets);
+        }
         // 获取最新区块
-        String string = redisUtils.getString(USDT_BLOCK_NUM);
+        String string = redisUtils.getString(USDT_BLOCK_NUM_GOLDEN);
+        if(string==null){
+            string = "11014249";
+        }
         BigInteger blockNum = new BigInteger(string);
         Credentials credentials = Credentials.create(privateKey);
-        EthUsdtContract contract = EthUsdtContract.load(contractAddr, web3, credentials, getStaticGasProvider());
-        EthFilter filter = getFilter(new BigInteger("10943021"));
+        EthUsdtContract contract = EthUsdtContract.load(contractAddr, getInstance(), credentials, getStaticGasProvider());
+        EthFilter filter = getFilter(blockNum);
         contract.transferEventFlowable(filter).subscribe(e->{
-            if(e!=null){
+            if(e!=null && StringUtils.isNotBlank(e.to)){
                 String transactionHash = e.log.getTransactionHash();
+                BigInteger blockNumber1 = e.log.getBlockNumber();
                 String toAddress = e.to;
                 BigInteger tokenBalance = e.tokens;
-                // 金额
-                BigInteger divide = tokenBalance.divide(DIVIDE_USDT);
-                // 发送消息队列 TODO
+                if(ALL_ADDRESS_LIST.contains(toAddress)){
+                    System.out.println("存在本地的地址:"+toAddress);
+                    // 金额
+                    BigDecimal divide = new BigDecimal(tokenBalance.toString()).divide(DIVIDE_USDT);
+                    // 发送消息队列
+                    EthUsdtChargeDto dto = new EthUsdtChargeDto(toAddress,transactionHash,divide);
+                    usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto));
+                }
 
+                redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString());
             }
         });
 
@@ -88,7 +118,6 @@
             return new EthFilter(DefaultBlockParameterName.EARLIEST,
                     DefaultBlockParameterName.LATEST, contractAddr);
         }
-
-
     }
+
 }
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java
index 1567cce..5113425 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java
@@ -1,5 +1,6 @@
 package com.xcong.excoin.modules.coin.service;
 
+import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
 import com.xcong.excoin.modules.blackchain.model.RocTransferDetail;
 
 public interface BlockCoinService {
@@ -20,4 +21,6 @@
 
     public void updateRoc(RocTransferDetail transferDetail);
 
+    void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto);
+
 }
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java
index 5d1e101..73d4a1e 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java
@@ -101,7 +101,7 @@
                         BigDecimal newBalance = balance.subtract(early);
                         memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), newBalance, balance, 0);
 
-                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance,null);
+                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance, null);
                         // 插入财务记录
                         LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.USDT.name(), 1, 1);
 
@@ -151,7 +151,7 @@
 
                         BigDecimal newBalance = balance.subtract(early);
                         memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0);
-                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.ETH.name(), null, balance,null);
+                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.ETH.name(), null, balance, null);
 
                         // 插入财务记录
                         LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.ETH.name(), 1, 1);
@@ -201,7 +201,7 @@
                         BigDecimal newBalance = balance.subtract(early);
 
                         memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0);
-                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "OMNI", balance,null);
+                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "OMNI", balance, null);
 
                         ThreadPoolUtils.sendDingTalk(5);
                         MemberEntity member = memberDao.selectById(memberId);
@@ -249,7 +249,7 @@
                         BigDecimal newBalance = balance.subtract(early);
                         memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0);
 
-                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.BTC.name(), null, balance,null);
+                        String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.BTC.name(), null, balance, null);
                         LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.BTC.name(), 1, 1);
 
                         ThreadPoolUtils.sendDingTalk(5);
@@ -311,7 +311,7 @@
                         if (memberCoinAddressEntity != null) {
                             memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0);
                             // 添加冲币记录
-                            String orderNo = insertCoinCharge(EosService.ACCOUNT, memberId, amount, CoinTypeEnum.EOS.name(), memo, BigDecimal.ZERO,null);
+                            String orderNo = insertCoinCharge(EosService.ACCOUNT, memberId, amount, CoinTypeEnum.EOS.name(), memo, BigDecimal.ZERO, null);
                             LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.EOS.name(), 1, 1);
 
                             ThreadPoolUtils.sendDingTalk(5);
@@ -388,7 +388,7 @@
                     if (memberCoinAddressEntity != null) {
                         memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0);
                         // 添加冲币记录
-                        String orderNo = insertCoinCharge(XrpService.ACCOUNT, memberId, amount, CoinTypeEnum.XRP.name(), memo, BigDecimal.ZERO,null);
+                        String orderNo = insertCoinCharge(XrpService.ACCOUNT, memberId, amount, CoinTypeEnum.XRP.name(), memo, BigDecimal.ZERO, null);
                         LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.XRP.name(), 1, 1);
 
                         ThreadPoolUtils.sendDingTalk(5);
@@ -462,7 +462,7 @@
 
                         memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0);
                         // 添加冲币记录
-                        String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO,transactionId);
+                        String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO, transactionId);
                         LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.USDT.name(), 1, 1);
 
                         ThreadPoolUtils.sendDingTalk(5);
@@ -488,23 +488,23 @@
         String address = transferDetail.getAddress();
         BigDecimal balance = transferDetail.getBalance();
         String symbol = transferDetail.getSymbol();
-        if(org.apache.commons.lang.StringUtils.isBlank(address) || org.apache.commons.lang.StringUtils.isBlank(symbol) || balance ==null ){
+        if (org.apache.commons.lang.StringUtils.isBlank(address) || org.apache.commons.lang.StringUtils.isBlank(symbol) || balance == null) {
             return;
         }
 
-        if(balance.compareTo(new BigDecimal("0.0001"))<=0){
+        if (balance.compareTo(new BigDecimal("0.0001")) <= 0) {
             return;
         }
 
         MemberCoinAddressEntity memberCoinAddress = memberCoinAddressDao.selectCoinAddressByAddressAndSymbol(address, symbol);
-        if(memberCoinAddress==null){
+        if (memberCoinAddress == null) {
             return;
         }
         Long memberId = memberCoinAddress.getMemberId();
         // 查询钱包 并更新
         MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.ROC.name());
         if (walletCoinEntity == null) {
-           // 创建一个钱包
+            // 创建一个钱包
             // 创建这个钱包
             walletCoinEntity = new MemberWalletCoinEntity();
             walletCoinEntity.setAvailableBalance(BigDecimal.ZERO);
@@ -518,11 +518,11 @@
 
         memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), balance, BigDecimal.ZERO, 0);
 
-        String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.ROC.name(), "", BigDecimal.ZERO,null);
+        String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.ROC.name(), "", BigDecimal.ZERO, null);
         // 插入财务记录
         LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", balance, CoinTypeEnum.ROC.name(), 1, 1);
 
-        try{
+        try {
             ThreadPoolUtils.sendDingTalk(5);
             MemberEntity member = memberDao.selectById(memberId);
             if (StrUtil.isNotBlank(member.getPhone())) {
@@ -531,10 +531,53 @@
             } else {
                 SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo);
             }
-        }catch (Exception e){
+        } catch (Exception e) {
             //e.printStackTrace();
         }
 
+    }
+
+    @Override
+    public void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto) {
+        String address = ethUsdtChargeDto.getAddress();
+        String hash = ethUsdtChargeDto.getHash();
+        // hash没有用过
+        Map<String,Object> param = new HashMap<>();
+        param.put("hash",hash);
+        param.put("address",address);
+        List<MemberCoinChargeEntity> memberCoinChargeEntities = memberCoinChargeDao.selectByMap(param);
+        if(CollectionUtils.isNotEmpty(memberCoinChargeEntities)){
+            return;
+        }
+        MemberCoinAddressEntity coinAddressEntity = memberCoinAddressDao.selectBlockAddressWithTag(null, CoinTypeEnum.USDT.toString(), "ERC20");
+        if (coinAddressEntity == null) {
+            return;
+        }
+        Long memberId = coinAddressEntity.getMemberId();
+        BigDecimal balance = ethUsdtChargeDto.getBalance();
+        if (balance != null && balance.compareTo(new BigDecimal("0.1")) > 0) {
+            balance = balance.setScale(8, RoundingMode.CEILING);
+            BigDecimal early = BigDecimal.ZERO;
+
+            MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.USDT.name());
+            if (walletCoinEntity == null) {
+                return;
+            }
+            BigDecimal newBalance = balance.subtract(early);
+            memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), newBalance, balance, 0);
+            String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance, ethUsdtChargeDto.getHash());
+            // 插入财务记录
+            LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.USDT.name(), 1, 1);
+            ThreadPoolUtils.sendDingTalk(5);
+            MemberEntity member = memberDao.selectById(memberId);
+            if (StrUtil.isNotBlank(member.getPhone())) {
+                String amount = newBalance.toPlainString() + "USDT-ERC20";
+                Sms106Send.sendRechargeMsg(member.getPhone(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo);
+            } else {
+                SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo);
+            }
+
+        }
     }
 
     private String generateNo() {
@@ -545,7 +588,7 @@
         return String.valueOf(timestamp).substring(2) + random;
     }
 
-    public String insertCoinCharge(String address, Long memberId, BigDecimal newBalance, String symbol, String tag, BigDecimal lastAmount,String hash) {
+    public String insertCoinCharge(String address, Long memberId, BigDecimal newBalance, String symbol, String tag, BigDecimal lastAmount, String hash) {
         MemberCoinChargeEntity memberCoinChargeEntity = new MemberCoinChargeEntity();
         memberCoinChargeEntity.setAddress(address);
         memberCoinChargeEntity.setMemberId(memberId);
diff --git a/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java b/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java
index a53bba9..baf9016 100644
--- a/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java
+++ b/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java
@@ -23,5 +23,6 @@
 
     List<MemberCoinAddressEntity> selectAllBlockAddressBySymbol(@Param("symbol") String symbol);
 
+    List<String> selectAllSymbolAddress(@Param("symbol")String symbol,@Param("tag") String tag);
 
 }
diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
index 3953174..872cae4 100644
--- a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
+++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -362,6 +362,7 @@
 
         // 存储昨日K线
         if("day".equals(rangeUnit)){
+            System.out.println("存储日K线");
             redisUtils.set("ROC/USDT",kLine);
         }
     }
diff --git a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java
index 65e8c1e..f91418b 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java
@@ -24,10 +24,10 @@
     /**
      * ETH_USDT 同步
      */
-    @Scheduled(cron = "0 0/10 * * * ? ")
-    public void ethUsdtUpdate() {
-        blockCoinService.updateEthUsdt();
-    }
+    //@Scheduled(cron = "0 0/10 * * * ? ")
+//    public void ethUsdtUpdate() {
+//        blockCoinService.updateEthUsdt();
+//    }
 
     /**
      * eth 同步
@@ -40,7 +40,7 @@
     /**
      * BTC_USDT 同步
      */
-    @Scheduled(cron = "0 2/10 * * * ? ")
+    //@Scheduled(cron = "0 2/10 * * * ? ")
     public void btcUsdtUpdate() {
         blockCoinService.updateBtcUsdt();
     }
@@ -63,8 +63,8 @@
     /**
      * ETH_USDT 同步
      */
-    @Scheduled(cron = "0 0/5 * * * ? ")
+    //Scheduled(cron = "0 0/5 * * * ? ")
     public void rocUpdate() {
-        blockCoinService.updateEthUsdt();
+       // blockCoinService.updateEthUsdt();
     }
 }
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
index 8fb4f5c..6099153 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -23,26 +23,6 @@
     @Resource
     private CoinProcessorFactory processorFactory;
 
-    @Resource
-	private OrderCoinService orderCoinService;
-
-
-
-	//@Scheduled(cron = "0/40 * * * * *")
-	public void test(){
-		Random random = new Random();
-		Integer type = OrderCoinsDealEntity.ORDERTYPE_BUY;
-		Integer tradeType = OrderCoinsDealEntity.TRADETYPE_FIXEDPRICE;
-		double random1 = Math.random();
-		BigDecimal price = new BigDecimal(random1).setScale(4, RoundingMode.HALF_UP).multiply(new BigDecimal("2"));
-		if(price.compareTo(BigDecimal.ZERO)==0){
-			price = BigDecimal.ONE;
-		}
-		System.out.println(price);
-		orderCoinService.initOrders("ROC",type,tradeType,price,new BigDecimal(2),null);
-		orderCoinService.initOrders("ROC",OrderCoinsDealEntity.ORDERTYPE_SELL,tradeType,price,new BigDecimal(2),null);
-	}
-
 
     /**
      * 每分钟定时器,处理分钟K线
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
index 262123e..25b7fdc 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
@@ -38,11 +38,11 @@
         }
     }
 
-    @Scheduled(cron = "0 2/8 * * * ? ")
-    public void usdtEthPoolCheck() {
-        log.info("USDTETH归集结果扫描开始");
-        usdtEthService.usdtEthPoolCheck();
-    }
+//    @Scheduled(cron = "0 2/8 * * * ? ")
+//    public void usdtEthPoolCheck() {
+//        log.info("USDTETH归集结果扫描开始");
+//        usdtEthService.usdtEthPoolCheck();
+//    }
 
     @Scheduled(cron = "0 2/30 * * * ? ")
     public void poolEth() {
diff --git a/src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java b/src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java
new file mode 100644
index 0000000..aacf3ff
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java
@@ -0,0 +1,31 @@
+package com.xcong.excoin.quartz.job;
+
+import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.Resource;
+
+/**
+ *  开启撮合交易
+ *
+ * @author wzy
+ * @date 2020-05-28
+ **/
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true")
+public class UsdtErc20InitJob {
+
+
+    @Resource
+    private UsdtErc20UpdateService usdtErc20UpdateService;
+
+    @PostConstruct
+    public void initCoinTrade() {
+        System.out.println("开启USDT同步");
+        usdtErc20UpdateService.updateUsdt();
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
new file mode 100644
index 0000000..2e375be
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -0,0 +1,33 @@
+package com.xcong.excoin.rabbit.consumer;
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
+import com.xcong.excoin.modules.coin.service.BlockCoinService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+
+/**
+ * @author wzy
+ * @date 2020-05-25
+ **/
+@Slf4j
+@Component
+public class UsdtUpdateConsumer {
+
+
+    @Resource
+    private BlockCoinService blockCoinService;
+
+
+    @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE)
+    public void doSomething(String content) {
+        log.info("#---->{}#", content);
+        EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class);
+        // 更新这个用户的余额
+        blockCoinService.updateEthUsdtNew(ethUsdtChargeDto);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
new file mode 100644
index 0000000..8f12cda
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
@@ -0,0 +1,42 @@
+package com.xcong.excoin.rabbit.producer;
+
+import cn.hutool.core.util.IdUtil;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * @author wzy
+ * @date 2020-05-25
+ **/
+@Slf4j
+@Component
+public class UsdtUpdateProducer implements RabbitTemplate.ConfirmCallback {
+
+    private RabbitTemplate rabbitTemplate;
+
+    @Autowired
+    public UsdtUpdateProducer(RabbitTemplate rabbitTemplate) {
+        this.rabbitTemplate = rabbitTemplate;
+        rabbitTemplate.setConfirmCallback(this);
+    }
+
+    public void sendMsg(String content) {
+        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
+        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_UPDATE, RabbitMqConfig.ROUTING_KEY_USDT_UPDATE, content, correlationData);
+    }
+
+
+    @Override
+    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+        log.info("#----->{}#", correlationData);
+        if (ack) {
+            log.info("success");
+        } else {
+            log.info("--->{}", cause);
+        }
+    }
+}
diff --git a/src/main/resources/mapper/member/MemberCoinAddressDao.xml b/src/main/resources/mapper/member/MemberCoinAddressDao.xml
index edb4a57..0dc07a9 100644
--- a/src/main/resources/mapper/member/MemberCoinAddressDao.xml
+++ b/src/main/resources/mapper/member/MemberCoinAddressDao.xml
@@ -81,4 +81,11 @@
 		select * from member_coin_address
 		where symbol=#{symbol}
 	</select>
+
+	<select id="selectAllSymbolAddress" resultType="string" parameterType="map">
+		select address from member_coin_address where symbol =#{symbol}
+		<if test="tag!=null and tag !=''">
+			and tag = #{tag}
+		</if>
+	</select>
 </mapper>
\ No newline at end of file

--
Gitblit v1.9.1