From 953f7999c90d5bfddba501d64f6f89bd6d95c427 Mon Sep 17 00:00:00 2001 From: Helius <wangdoubleone@gmail.com> Date: Mon, 06 Jun 2022 14:18:22 +0800 Subject: [PATCH] fix block listener --- src/main/java/cc/mrbird/febs/dapp/chain/EthService.java | 41 ++++++++++--- src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java | 16 +++- src/main/java/cc/mrbird/febs/job/ChainListenerJob.java | 76 ++++++++++++++++++++----- src/main/java/cc/mrbird/febs/common/contants/AppContants.java | 2 src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java | 2 src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java | 5 + 6 files changed, 111 insertions(+), 31 deletions(-) diff --git a/src/main/java/cc/mrbird/febs/common/contants/AppContants.java b/src/main/java/cc/mrbird/febs/common/contants/AppContants.java index 0d07cff..1a2ccce 100644 --- a/src/main/java/cc/mrbird/febs/common/contants/AppContants.java +++ b/src/main/java/cc/mrbird/febs/common/contants/AppContants.java @@ -139,6 +139,8 @@ public static final String REDIS_KEY_BLOCK_COIN_NUM = "BLOCK_COIN_NUM"; public static final String REDIS_KEY_BLOCK_USDT_NUM = "BLOCK_USDT_NUM"; + public static final String REDIS_KEY_BLOCK_ETH_NEWEST_NUM = "BLOCK_ETH_NEWEST_NUM"; + public static final String REDIS_KEY_BLOCK_ETH_INCREMENT_NUM = "BLOCK_ETH_INCREMENT_NUM"; public static final String DIC_TYPE_DISTRIBUTE_PROP = "DISTRIBUTE_PROP"; diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java index b4cda76..641d88b 100644 --- a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java +++ b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java @@ -4,6 +4,7 @@ import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSONObject; +import io.reactivex.Flowable; import lombok.extern.slf4j.Slf4j; import org.springframework.data.repository.query.ParameterOutOfBoundsException; import org.web3j.crypto.Credentials; @@ -60,25 +61,30 @@ * @param startBlock 开始区块 */ public static void contractEventListener(BigInteger startBlock, ContractEventService event, String type) { + contractEventListener(startBlock, null, event, type); + } + + public static void contractEventListener(BigInteger startBlock, BigInteger endBlock, ContractEventService event, String type) { ChainEnum chain = ChainEnum.getValueByName(type); assert chain != null; EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl()); - EthFilter filter = getFilter(startBlock, chain.getContractAddress()); + EthFilter filter = getFilter(startBlock, endBlock, chain.getContractAddress()); - contract.transferEventFlowable(filter).subscribe(e -> { + Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = contract.transferEventFlowable(filter); + eventFlowable.subscribe(e -> { event.compile(e); }, error -> { - log.error("--->", error); + log.error("合约监听启动报错", error); }); } + private static EthUsdtContract contract(String privateKey, String contractAddress, String url) { Credentials credentials = Credentials.create(privateKey); return EthUsdtContract.load(contractAddress, Web3j.build(new HttpService(url)), credentials, new StaticGasProvider(BigInteger.valueOf(4500000L), BigInteger.valueOf(200000L))); } - // 18097238 18098663 private static EthFilter getFilter(BigInteger startBlock, String contractAddress) { return getFilter(startBlock, null, contractAddress); } @@ -106,7 +112,7 @@ assert chain != null; EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl()); - EthFilter filter = getFilter(new BigInteger("18097238"), new BigInteger("18098663"), chain.getContractAddress()); + EthFilter filter = getFilter(new BigInteger("18097238"), chain.getContractAddress()); contract.transferEventFlowable(filter).subscribe(e -> { System.out.println(1); diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java index 3791a67..67af948 100644 --- a/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java +++ b/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java @@ -20,4 +20,6 @@ int allowanceCnt(String address); int decimals(); + + BigInteger blockNumber(); } diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java b/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java index 9eda396..a30bf95 100644 --- a/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java +++ b/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java @@ -59,18 +59,19 @@ public static void main(String[] args) throws IOException { HttpService service = new HttpService("https://bsc-dataseed1.ninicoin.io"); Web3j web3j = Web3j.build(service); -// Request<?, EthBlockNumber> request = web3j.ethBlockNumber(); -// EthBlockNumber send = request.send(); -// BigInteger bigInteger = Numeric.decodeQuantity(send.getResult()); -// System.out.println("0x113d6d0"); + long start = System.currentTimeMillis(); + Request<?, EthBlockNumber> request = web3j.ethBlockNumber(); + EthBlockNumber send = request.send(); + BigInteger bigInteger = Numeric.decodeQuantity(send.getResult()); + long end = System.currentTimeMillis(); + System.out.println(end - start); - - String address = "0x971c09aa9735eb98459b17ec8b48932d24cbb931"; - String nonce = "0x1d5f7444107bc02e980deda39d0fce21b06c9da4233a19cb11124cb5bfefc9ec"; - String sign = "0x8f92cee24906122e26c3cc6cbd72f851cfe2c9574aa03bf3371e5d506fbec68b2ad22bbbc19b00ed21d26ab5a6871507831e2c902d8ed8c33301addc2b57a7731b"; - - String result = address + ":" + nonce + ":" + sign; - System.out.println(Hash.sha3(result)); +// String address = "0x971c09aa9735eb98459b17ec8b48932d24cbb931"; +// String nonce = "0x1d5f7444107bc02e980deda39d0fce21b06c9da4233a19cb11124cb5bfefc9ec"; +// String sign = "0x8f92cee24906122e26c3cc6cbd72f851cfe2c9574aa03bf3371e5d506fbec68b2ad22bbbc19b00ed21d26ab5a6871507831e2c902d8ed8c33301addc2b57a7731b"; +// +// String result = address + ":" + nonce + ":" + sign; +// System.out.println(Hash.sha3(result)); // Web3Sha3 send = web3j.web3Sha3(result).send(); @@ -275,4 +276,22 @@ JSONObject result = JSONObject.parseObject(jsonObject.getString("d")); return result.getIntValue("recordsTotal"); } + + @Override + public BigInteger blockNumber() { + Request<?, EthBlockNumber> request = web3j.ethBlockNumber(); + EthBlockNumber send = null; + try { + send = request.send(); + } catch (IOException e) { + e.printStackTrace(); + } + + if (send != null) { + return Numeric.decodeQuantity(send.getResult()); + } + + return new BigInteger("1"); + } + } diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java b/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java index 864c65d..f75b057 100644 --- a/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java +++ b/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java @@ -103,4 +103,9 @@ public static void main(String[] args) { // System.out.println(INSTANCE.transfer("TFGbYzGv4Zt2nzFM3uU3uCJZY67WKSveG9", BigDecimal.valueOf(5)));; } + + @Override + public BigInteger blockNumber() { + return null; + } } diff --git a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java b/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java index 04c0bbb..a12e001 100644 --- a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java +++ b/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java @@ -7,6 +7,8 @@ import cc.mrbird.febs.dapp.chain.ContractEventService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.scheduling.annotation.Async; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -27,24 +29,68 @@ @PostConstruct public void chainListenerJob() { - log.info("监听打开"); - BigInteger usdtBlock; - BigInteger coinBlock; - Object usdt = redisUtils.get(AppContants.REDIS_KEY_BLOCK_USDT_NUM); - if (usdt == null) { - usdtBlock = new BigInteger("19811973"); + long start = System.currentTimeMillis(); + log.info("区块链监听开始启动"); + Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM); + BigInteger newest = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + BigInteger block; + if (incrementObj == null) { + block = newest; } else { - usdtBlock = (BigInteger) usdt; + block = (BigInteger) incrementObj; } - Object coin = redisUtils.get(AppContants.REDIS_KEY_BLOCK_COIN_NUM); - if (coin == null) { - coinBlock = new BigInteger("19811973"); - } else { - coinBlock = (BigInteger) coin; - } + BigInteger section = BigInteger.valueOf(5000); + while (newest.subtract(block).compareTo(section) > -1) { + BigInteger end = block.add(section); + log.info("监听:[{} - {}]", block, end); + ChainService.contractEventListener(block, end, bscUsdtContractEvent, ChainEnum.BSC_USDT.name()); + ChainService.contractEventListener(block, end, bscCoinContractEvent, ChainEnum.BSC_TFC.name()); - ChainService.contractEventListener(usdtBlock, bscUsdtContractEvent, ChainEnum.BSC_USDT.name()); - ChainService.contractEventListener(coinBlock, bscCoinContractEvent, ChainEnum.BSC_TFC.name()); + block = block.add(section); + if (block.compareTo(newest) > 0) { + block = newest; + } + } + ChainService.contractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT.name()); + ChainService.contractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC.name()); + + long end = System.currentTimeMillis(); + log.info("区块链监听启动完成, 消耗时间{}", end - start); } + + @Scheduled(cron = "0 0/5 * * * ? ") + public void chainBlockUpdate() { + BigInteger blockNumber = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + + redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM, blockNumber); + } + + @Scheduled(cron = "0/2 * * * * ? ") + public void chainIncrementBlock() { + Object newestBlockObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM); + BigInteger newestBlock; + if (newestBlockObj == null) { + newestBlock = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + } else { + newestBlock = (BigInteger) newestBlockObj; + } + + Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM); + BigInteger toIncrement; + if (incrementObj == null) { + toIncrement = newestBlock; + } else { + BigInteger incrementBlock = (BigInteger) incrementObj; + + // 最新区块小于增加区块 + if (newestBlock.compareTo(incrementBlock) < 0) { + return; + } + toIncrement = incrementBlock.add(BigInteger.ONE); + } + + redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, toIncrement); + } + } -- Gitblit v1.9.1