From 953f7999c90d5bfddba501d64f6f89bd6d95c427 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Mon, 06 Jun 2022 14:18:22 +0800
Subject: [PATCH] fix block listener

---
 src/main/java/cc/mrbird/febs/dapp/chain/EthService.java           |   41 ++++++++++---
 src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java         |   16 +++-
 src/main/java/cc/mrbird/febs/job/ChainListenerJob.java            |   76 ++++++++++++++++++++-----
 src/main/java/cc/mrbird/febs/common/contants/AppContants.java     |    2 
 src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java |    2 
 src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java           |    5 +
 6 files changed, 111 insertions(+), 31 deletions(-)

diff --git a/src/main/java/cc/mrbird/febs/common/contants/AppContants.java b/src/main/java/cc/mrbird/febs/common/contants/AppContants.java
index 0d07cff..1a2ccce 100644
--- a/src/main/java/cc/mrbird/febs/common/contants/AppContants.java
+++ b/src/main/java/cc/mrbird/febs/common/contants/AppContants.java
@@ -139,6 +139,8 @@
 
     public static final String REDIS_KEY_BLOCK_COIN_NUM = "BLOCK_COIN_NUM";
     public static final String REDIS_KEY_BLOCK_USDT_NUM = "BLOCK_USDT_NUM";
+    public static final String REDIS_KEY_BLOCK_ETH_NEWEST_NUM = "BLOCK_ETH_NEWEST_NUM";
+    public static final String REDIS_KEY_BLOCK_ETH_INCREMENT_NUM = "BLOCK_ETH_INCREMENT_NUM";
 
 
     public static final String DIC_TYPE_DISTRIBUTE_PROP = "DISTRIBUTE_PROP";
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
index b4cda76..641d88b 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
@@ -4,6 +4,7 @@
 import cn.hutool.core.util.StrUtil;
 import cn.hutool.http.HttpUtil;
 import com.alibaba.fastjson.JSONObject;
+import io.reactivex.Flowable;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.data.repository.query.ParameterOutOfBoundsException;
 import org.web3j.crypto.Credentials;
@@ -60,25 +61,30 @@
      * @param startBlock 开始区块
      */
     public static void contractEventListener(BigInteger startBlock, ContractEventService event, String type) {
+        contractEventListener(startBlock, null, event, type);
+    }
+
+    public static void contractEventListener(BigInteger startBlock, BigInteger endBlock, ContractEventService event, String type) {
         ChainEnum chain = ChainEnum.getValueByName(type);
         assert chain != null;
 
         EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl());
-        EthFilter filter = getFilter(startBlock, chain.getContractAddress());
+        EthFilter filter = getFilter(startBlock, endBlock, chain.getContractAddress());
 
-        contract.transferEventFlowable(filter).subscribe(e -> {
+        Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = contract.transferEventFlowable(filter);
+        eventFlowable.subscribe(e -> {
             event.compile(e);
         }, error -> {
-            log.error("--->", error);
+            log.error("合约监听启动报错", error);
         });
     }
+
 
     private static EthUsdtContract contract(String privateKey, String contractAddress, String url) {
         Credentials credentials = Credentials.create(privateKey);
         return EthUsdtContract.load(contractAddress, Web3j.build(new HttpService(url)), credentials, new StaticGasProvider(BigInteger.valueOf(4500000L), BigInteger.valueOf(200000L)));
     }
 
-    // 18097238  18098663
     private static EthFilter getFilter(BigInteger startBlock, String contractAddress) {
         return getFilter(startBlock, null, contractAddress);
     }
@@ -106,7 +112,7 @@
         assert chain != null;
 
         EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl());
-        EthFilter filter = getFilter(new BigInteger("18097238"), new BigInteger("18098663"), chain.getContractAddress());
+        EthFilter filter = getFilter(new BigInteger("18097238"), chain.getContractAddress());
 
         contract.transferEventFlowable(filter).subscribe(e -> {
             System.out.println(1);
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java
index 3791a67..67af948 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java
@@ -20,4 +20,6 @@
     int allowanceCnt(String address);
 
     int decimals();
+
+    BigInteger blockNumber();
 }
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java b/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java
index 9eda396..a30bf95 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java
@@ -59,18 +59,19 @@
     public static void main(String[] args) throws IOException {
         HttpService service = new HttpService("https://bsc-dataseed1.ninicoin.io");
         Web3j web3j = Web3j.build(service);
-//        Request<?, EthBlockNumber> request = web3j.ethBlockNumber();
-//        EthBlockNumber send = request.send();
-//        BigInteger bigInteger = Numeric.decodeQuantity(send.getResult());
-//        System.out.println("0x113d6d0");
+        long start = System.currentTimeMillis();
+        Request<?, EthBlockNumber> request = web3j.ethBlockNumber();
+        EthBlockNumber send = request.send();
+        BigInteger bigInteger = Numeric.decodeQuantity(send.getResult());
+        long end = System.currentTimeMillis();
+        System.out.println(end - start);
 
-
-        String address = "0x971c09aa9735eb98459b17ec8b48932d24cbb931";
-        String nonce = "0x1d5f7444107bc02e980deda39d0fce21b06c9da4233a19cb11124cb5bfefc9ec";
-        String sign = "0x8f92cee24906122e26c3cc6cbd72f851cfe2c9574aa03bf3371e5d506fbec68b2ad22bbbc19b00ed21d26ab5a6871507831e2c902d8ed8c33301addc2b57a7731b";
-
-        String result = address + ":" + nonce + ":" + sign;
-        System.out.println(Hash.sha3(result));
+//        String address = "0x971c09aa9735eb98459b17ec8b48932d24cbb931";
+//        String nonce = "0x1d5f7444107bc02e980deda39d0fce21b06c9da4233a19cb11124cb5bfefc9ec";
+//        String sign = "0x8f92cee24906122e26c3cc6cbd72f851cfe2c9574aa03bf3371e5d506fbec68b2ad22bbbc19b00ed21d26ab5a6871507831e2c902d8ed8c33301addc2b57a7731b";
+//
+//        String result = address + ":" + nonce + ":" + sign;
+//        System.out.println(Hash.sha3(result));
 
 
 //        Web3Sha3 send = web3j.web3Sha3(result).send();
@@ -275,4 +276,22 @@
         JSONObject result = JSONObject.parseObject(jsonObject.getString("d"));
         return result.getIntValue("recordsTotal");
     }
+
+    @Override
+    public BigInteger blockNumber() {
+        Request<?, EthBlockNumber> request = web3j.ethBlockNumber();
+        EthBlockNumber send = null;
+        try {
+            send = request.send();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        if (send != null) {
+            return Numeric.decodeQuantity(send.getResult());
+        }
+
+        return new BigInteger("1");
+    }
+
 }
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java b/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java
index 864c65d..f75b057 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java
@@ -103,4 +103,9 @@
     public static void main(String[] args) {
 //        System.out.println(INSTANCE.transfer("TFGbYzGv4Zt2nzFM3uU3uCJZY67WKSveG9", BigDecimal.valueOf(5)));;
     }
+
+    @Override
+    public BigInteger blockNumber() {
+        return null;
+    }
 }
diff --git a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java b/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java
index 04c0bbb..a12e001 100644
--- a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java
+++ b/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java
@@ -7,6 +7,8 @@
 import cc.mrbird.febs.dapp.chain.ContractEventService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.PostConstruct;
@@ -27,24 +29,68 @@
 
     @PostConstruct
     public void chainListenerJob() {
-        log.info("监听打开");
-        BigInteger usdtBlock;
-        BigInteger coinBlock;
-        Object usdt = redisUtils.get(AppContants.REDIS_KEY_BLOCK_USDT_NUM);
-        if (usdt == null) {
-            usdtBlock = new BigInteger("19811973");
+        long start = System.currentTimeMillis();
+        log.info("区块链监听开始启动");
+        Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM);
+        BigInteger newest = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber();
+        BigInteger block;
+        if (incrementObj == null) {
+            block = newest;
         } else {
-            usdtBlock = (BigInteger) usdt;
+            block = (BigInteger) incrementObj;
         }
 
-        Object coin = redisUtils.get(AppContants.REDIS_KEY_BLOCK_COIN_NUM);
-        if (coin == null) {
-            coinBlock = new BigInteger("19811973");
-        } else {
-            coinBlock = (BigInteger) coin;
-        }
+        BigInteger section = BigInteger.valueOf(5000);
+        while (newest.subtract(block).compareTo(section) > -1) {
+            BigInteger end = block.add(section);
+            log.info("监听:[{} - {}]", block, end);
+            ChainService.contractEventListener(block, end, bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
+            ChainService.contractEventListener(block, end, bscCoinContractEvent, ChainEnum.BSC_TFC.name());
 
-        ChainService.contractEventListener(usdtBlock, bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
-        ChainService.contractEventListener(coinBlock, bscCoinContractEvent, ChainEnum.BSC_TFC.name());
+            block = block.add(section);
+            if (block.compareTo(newest) > 0) {
+                block = newest;
+            }
+        }
+        ChainService.contractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
+        ChainService.contractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC.name());
+
+        long end = System.currentTimeMillis();
+        log.info("区块链监听启动完成, 消耗时间{}", end - start);
     }
+
+    @Scheduled(cron = "0 0/5 * * * ? ")
+    public void chainBlockUpdate() {
+        BigInteger blockNumber = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber();
+
+        redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM, blockNumber);
+    }
+
+    @Scheduled(cron = "0/2 * * * * ? ")
+    public void chainIncrementBlock() {
+        Object newestBlockObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM);
+        BigInteger newestBlock;
+        if (newestBlockObj == null) {
+            newestBlock = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber();
+        } else {
+            newestBlock = (BigInteger) newestBlockObj;
+        }
+
+        Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM);
+        BigInteger toIncrement;
+        if (incrementObj == null) {
+            toIncrement = newestBlock;
+        } else {
+            BigInteger incrementBlock = (BigInteger) incrementObj;
+
+            // 最新区块小于增加区块
+            if (newestBlock.compareTo(incrementBlock) < 0) {
+                return;
+            }
+            toIncrement = incrementBlock.add(BigInteger.ONE);
+        }
+
+        redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, toIncrement);
+    }
+
 }

--
Gitblit v1.9.1