From f85e2f339dc29bf0abd2e02c2696af73137b6d5d Mon Sep 17 00:00:00 2001 From: KKSU <15274802129@163.com> Date: Fri, 07 Jun 2024 11:32:45 +0800 Subject: [PATCH] 将监听充值新建一个项目 --- src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java | 95 ++++++-- src/main/java/cc/mrbird/febs/job/ChainSDMListenerJob.java | 101 +++++++++ src/main/resources/application-charge.yml | 62 +++++ src/main/resources/application-prod.yml | 1 src/main/resources/application-chain.yml | 1 /dev/null | 103 --------- src/main/java/cc/mrbird/febs/dapp/service/impl/BscUsdtContractEvent.java | 86 +++++++ src/main/java/cc/mrbird/febs/dapp/service/impl/BscCoinContractEvent.java | 27 - src/main/java/cc/mrbird/febs/common/contants/AppContants.java | 3 src/main/java/cc/mrbird/febs/job/ChainSDMRunner.java | 59 +++++ src/main/java/cc/mrbird/febs/dapp/chain/ContractEventService.java | 2 src/main/java/cc/mrbird/febs/job/ChainSDMChargeListenerJob.java | 57 +++++ src/main/java/cc/mrbird/febs/job/ChainSDMChargeRunner.java | 59 +++++ 13 files changed, 513 insertions(+), 143 deletions(-) diff --git a/src/main/java/cc/mrbird/febs/common/contants/AppContants.java b/src/main/java/cc/mrbird/febs/common/contants/AppContants.java index 3b227c1..14e9f05 100644 --- a/src/main/java/cc/mrbird/febs/common/contants/AppContants.java +++ b/src/main/java/cc/mrbird/febs/common/contants/AppContants.java @@ -144,6 +144,9 @@ public static final String REDIS_KEY_BLOCK_ETH_NEWEST_NUM = "BLOCK_ETH_NEWEST_NUM"; public static final String REDIS_KEY_BLOCK_ETH_INCREMENT_NUM = "BLOCK_ETH_INCREMENT_NUM"; + public static final String REDIS_KEY_BLOCK_ETH_NEWEST_NUM_CHARGE = "BLOCK_ETH_NEWEST_NUM_CHARGE"; + public static final String REDIS_KEY_BLOCK_ETH_INCREMENT_NUM_CHARGE = "BLOCK_ETH_INCREMENT_NUM_CHARGE"; + public static final String REDIS_KEY_MAKE_POOL_CNT = "MAKE_POOL_CNT"; public static final String REDIS_KEY_IDO_USDT_MAX_BUY_DAILY = "USDT_MAX_BUY_DAILY_"; diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java index cb25b7c..f21a835 100644 --- a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java +++ b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java @@ -1,47 +1,27 @@ package cc.mrbird.febs.dapp.chain; import cc.mrbird.febs.common.exception.FebsException; -import cn.hutool.core.codec.Base64; -import cn.hutool.core.util.StrUtil; -import cn.hutool.http.HttpUtil; -import com.alibaba.fastjson.JSONObject; import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; +import io.reactivex.schedulers.Schedulers; import lombok.extern.slf4j.Slf4j; -import okhttp3.Interceptor; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import org.springframework.data.repository.query.ParameterOutOfBoundsException; -import org.springframework.util.Base64Utils; -import org.web3j.abi.FunctionReturnDecoder; -import org.web3j.abi.TypeReference; -import org.web3j.abi.datatypes.Address; -import org.web3j.abi.datatypes.Type; -import org.web3j.abi.datatypes.generated.Uint256; import org.web3j.crypto.Credentials; import org.web3j.protocol.Web3j; import org.web3j.protocol.core.DefaultBlockParameter; import org.web3j.protocol.core.DefaultBlockParameterName; import org.web3j.protocol.core.DefaultBlockParameterNumber; import org.web3j.protocol.core.methods.request.EthFilter; -import org.web3j.protocol.core.methods.response.TransactionReceipt; import org.web3j.protocol.http.HttpService; import org.web3j.protocol.websocket.WebSocketClient; import org.web3j.protocol.websocket.WebSocketService; import org.web3j.tx.gas.StaticGasProvider; -import java.io.IOException; -import java.math.BigDecimal; import java.math.BigInteger; import java.net.URI; -import java.nio.charset.StandardCharsets; -import java.rmi.activation.UnknownObjectException; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; /** * @author @@ -99,6 +79,77 @@ }); } + public static void sdmUSDTEventListener(BigInteger startBlock, BigInteger endBlock, ContractEventService event, String type) { + ChainEnum chain = ChainEnum.getValueByName(type); + assert chain != null; + + EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl()); + EthFilter filter = getFilter(startBlock, endBlock, chain.getContractAddress()); + + Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = contract.transferEventFlowable(filter) + .doOnError(throwable -> + log.error("合约事件监听发生错误: " + throwable.getMessage(), throwable)) // 更具体的错误日志记录 + .retryWhen(errors -> { + AtomicInteger counter = new AtomicInteger(); + return errors.takeWhile(e -> counter.getAndIncrement() != 3) + .flatMap(e -> { + System.out.println("delay retry by " + counter.get() + " second(s)"); + return Flowable.timer(counter.get(), TimeUnit.SECONDS); + }); + }) + .subscribeOn(Schedulers.io()); // 指定subscribe操作在IO线程中执行,避免阻塞主线程 + + eventFlowable.subscribe( + e -> { + try { + event.sdmUSDT(e); // 处理事件 + } catch (Exception ex) { + // 处理事件时可能出现的异常 + log.error("处理合约事件时出错", ex); + } + }, + Throwable::printStackTrace, // 打印错误堆栈,或者可以替换为更具体的错误处理逻辑 + () -> log.info("合约事件监听已完成") // 在Flowable完成时执行的逻辑,如记录日志等 + ); + + } + + + public static void sdmChargeEventListener(BigInteger startBlock, BigInteger endBlock, ContractEventService event, String type) { + ChainEnum chain = ChainEnum.getValueByName(type); + assert chain != null; + + EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl()); + EthFilter filter = getFilter(startBlock, endBlock, chain.getContractAddress()); + + Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = contract.transferEventFlowable(filter) + .doOnError(throwable -> + log.error("合约事件监听发生错误: " + throwable.getMessage(), throwable)) // 更具体的错误日志记录 + .retryWhen(errors -> { + AtomicInteger counter = new AtomicInteger(); + return errors.takeWhile(e -> counter.getAndIncrement() != 3) + .flatMap(e -> { + System.out.println("delay retry by " + counter.get() + " second(s)"); + return Flowable.timer(counter.get(), TimeUnit.SECONDS); + }); + }) + .subscribeOn(Schedulers.io()); // 指定subscribe操作在IO线程中执行,避免阻塞主线程 + + eventFlowable.subscribe( + e -> { + try { + event.compile(e); // 处理事件 + } catch (Exception ex) { + // 处理事件时可能出现的异常 + log.error("处理合约事件时出错", ex); + } + }, + Throwable::printStackTrace, // 打印错误堆栈,或者可以替换为更具体的错误处理逻辑 + () -> log.info("合约事件监听已完成") // 在Flowable完成时执行的逻辑,如记录日志等 + ); + + } + public static void wssContractEventListener(BigInteger startBlock, ContractEventService event, String type) { WebSocketService ws = null; WebSocketClient webSocketClient = null; diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ContractEventService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ContractEventService.java index 481edd4..9ca7936 100644 --- a/src/main/java/cc/mrbird/febs/dapp/chain/ContractEventService.java +++ b/src/main/java/cc/mrbird/febs/dapp/chain/ContractEventService.java @@ -3,4 +3,6 @@ public interface ContractEventService { void compile(EthUsdtContract.TransferEventResponse e); + + void sdmUSDT(EthUsdtContract.TransferEventResponse e); } diff --git a/src/main/java/cc/mrbird/febs/dapp/service/impl/BscCoinContractEvent.java b/src/main/java/cc/mrbird/febs/dapp/service/impl/BscCoinContractEvent.java index c1747b9..1dc76ac 100644 --- a/src/main/java/cc/mrbird/febs/dapp/service/impl/BscCoinContractEvent.java +++ b/src/main/java/cc/mrbird/febs/dapp/service/impl/BscCoinContractEvent.java @@ -2,29 +2,19 @@ import cc.mrbird.febs.common.contants.AppContants; import cc.mrbird.febs.common.utils.RedisUtils; -import cc.mrbird.febs.common.utils.ShareCodeUtil; import cc.mrbird.febs.dapp.chain.ChainEnum; import cc.mrbird.febs.dapp.chain.ChainService; import cc.mrbird.febs.dapp.chain.ContractEventService; import cc.mrbird.febs.dapp.chain.EthUsdtContract; import cc.mrbird.febs.dapp.entity.DappFundFlowEntity; import cc.mrbird.febs.dapp.entity.DappMemberEntity; -import cc.mrbird.febs.dapp.entity.DappOnlineTransferEntity; import cc.mrbird.febs.dapp.entity.DappTransferRecordEntity; import cc.mrbird.febs.dapp.mapper.DappFundFlowDao; -import cc.mrbird.febs.dapp.mapper.DappMemberDao; -import cc.mrbird.febs.dapp.mapper.DappOnlineTransferDao; import cc.mrbird.febs.dapp.service.DappMemberService; -import cc.mrbird.febs.dapp.service.DappSystemService; import cc.mrbird.febs.dapp.service.DappWalletService; import cc.mrbird.febs.dapp.utils.OnlineTransferUtil; -import cc.mrbird.febs.rabbit.producer.ChainProducer; import cn.hutool.core.collection.CollUtil; -import cn.hutool.core.util.StrUtil; -import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.web3j.utils.Numeric; @@ -33,9 +23,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.math.RoundingMode; -import java.util.HashMap; import java.util.List; -import java.util.Map; @Slf4j @Service @@ -63,11 +51,11 @@ if (e.to != null && e.to.equals(ChainEnum.BSC_TFC.getAddress().toLowerCase())) { log.info("触发TFC监听"); - try { - Thread.sleep(5000); - } catch (InterruptedException ex) { - ex.printStackTrace(); - } +// try { +// Thread.sleep(5000); +// } catch (InterruptedException ex) { +// ex.printStackTrace(); +// } redisUtils.set(AppContants.REDIS_KEY_BLOCK_COIN_NUM, e.log.getBlockNumber()); redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, e.log.getBlockNumber()); int decimals = ChainService.getInstance(ChainEnum.BSC_TFC.name()).decimals(); @@ -117,4 +105,9 @@ dappWalletService.updateWalletMineWithLock(amount, fromMember.getId(), 1); } } + + @Override + public void sdmUSDT(EthUsdtContract.TransferEventResponse e) { + return; + } } diff --git a/src/main/java/cc/mrbird/febs/dapp/service/impl/BscUsdtContractEvent.java b/src/main/java/cc/mrbird/febs/dapp/service/impl/BscUsdtContractEvent.java index 49575d1..00513c8 100644 --- a/src/main/java/cc/mrbird/febs/dapp/service/impl/BscUsdtContractEvent.java +++ b/src/main/java/cc/mrbird/febs/dapp/service/impl/BscUsdtContractEvent.java @@ -49,6 +49,90 @@ @Override public void compile(EthUsdtContract.TransferEventResponse e) { + return; +// if (e.to == null) { +// return; +// } +// +// redisUtils.set(AppContants.REDIS_KEY_BLOCK_USDT_NUM, e.log.getBlockNumber()); +// // 判断对方打款地址是否为源池地址 +// if (ChainEnum.BSC_USDT.getAddress().toLowerCase().equals(e.to)) { +// +// redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, e.log.getBlockNumber()); +// +// // 如果得到触发,则休眠10秒。 因为此处监听器触发可能优先于前端调用transfer接口 +//// try { +//// Thread.sleep(10000); +//// } catch (InterruptedException ex) { +//// ex.printStackTrace(); +//// } +// +// ContractChainService sourceUsdtInstance = ChainService.getInstance(ChainEnum.BSC_USDT.name()); +// int decimals = sourceUsdtInstance.decimals(); +// if (e.from.equals("0xaa25aa7a19f9c426e07dee59b12f944f4d9f1dd3")) { +// return; +// } +// +// BigInteger tokens = e.tokens; +// BigDecimal amount = new BigDecimal(tokens.toString()).divide(BigDecimal.TEN.pow(decimals), decimals, RoundingMode.HALF_DOWN); +// +// DappFundFlowEntity fundFlow = dappFundFlowDao.selectByFromHash(e.log.getTransactionHash(), null); +// if(ObjectUtil.isNotEmpty(fundFlow) && 1 == fundFlow.getStatus()){ +// log.info("触发USDT合约监听事件-买入贡献值,金额:{}",amount); +// if(1 == fundFlow.getType()){//认购贡献值 1 +// if (fundFlow == null) { +// List<DappFundFlowEntity> flows = dappFundFlowDao.selectFundFlowListByAddress(e.from, 1); +// if (CollUtil.isEmpty(flows)) { +// OnlineTransferUtil.addTransferRecord(e.from, e.to, amount, e.log.getTransactionHash(), DappTransferRecordEntity.TRANSFER_SOURCE_FLAG_ONLINE, "USDT"); +// log.info("本地无交易:{}", e.log.getTransactionHash()); +// return; +// } +// +// for (DappFundFlowEntity flow : flows) { +// if (flow.getStatus() == 1) { +// if (amount.compareTo(flow.getAmount().multiply(flow.getNewestPrice()).setScale(4, RoundingMode.HALF_UP)) == 0) { +// fundFlow = flow; +// fundFlow.setFromHash(e.log.getTransactionHash()); +// break; +// } +// } +// } +// } +// +// if (fundFlow == null) { +// return; +// } +// +// fundFlow.setAmount(fundFlow.getAmount().negate()); +// // 更改状态为已同步 +// fundFlow.setStatus(2); +// dappFundFlowDao.updateById(fundFlow); +// //生成业绩数 +// chainProducer.sendAchieveTreeMsg(fundFlow.getMemberId()); +// //分发手续费给节点 +// buyNodePerk(amount); +// +// }else if(13 == fundFlow.getType()){//认购节点 13 +// +// log.info("触发USDT合约监听事件-认购节点,金额:{}",amount); +// fundFlow.setAmount(fundFlow.getAmount().negate()); +// // 更改状态为已同步 +// fundFlow.setStatus(2); +// dappFundFlowDao.updateById(fundFlow); +// +// Long memberId = fundFlow.getMemberId(); +// DappMemberEntity dappMemberEntity = dappMemberDao.selectById(memberId); +// dappMemberEntity.setBuyNode(1); +// dappMemberDao.updateById(dappMemberEntity); +// } +// }else{ +// return; +// } +// } + } + + @Override + public void sdmUSDT(EthUsdtContract.TransferEventResponse e) { if (e.to == null) { return; } @@ -129,7 +213,7 @@ } } } - + public void buyNodePerk(BigDecimal amount){ /** * 获取节点平分百分比 perkPercent diff --git a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java b/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java deleted file mode 100644 index 2149097..0000000 --- a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java +++ /dev/null @@ -1,103 +0,0 @@ -package cc.mrbird.febs.job; - -import cc.mrbird.febs.common.contants.AppContants; -import cc.mrbird.febs.common.utils.RedisUtils; -import cc.mrbird.febs.dapp.chain.ChainEnum; -import cc.mrbird.febs.dapp.chain.ChainService; -import cc.mrbird.febs.dapp.chain.ContractEventService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.ApplicationArguments; -import org.springframework.boot.ApplicationRunner; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import java.math.BigInteger; - -@Slf4j -@Component -@ConditionalOnProperty(prefix = "system", name = "chain-listener", havingValue = "true") -public class ChainListenerJob implements ApplicationRunner { - - @Autowired - private ContractEventService bscUsdtContractEvent; - @Autowired - private ContractEventService bscCoinContractEvent; - - @Autowired - private RedisUtils redisUtils; - - @Scheduled(cron = "0 0/5 * * * ? ") - public void chainBlockUpdate() { - BigInteger blockNumber = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); - - redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM, blockNumber); - } - - @Scheduled(cron = "0/2 * * * * ? ") - public void chainIncrementBlock() { - Object newestBlockObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM); - BigInteger newestBlock; - if (newestBlockObj == null) { - newestBlock = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); - } else { - newestBlock = (BigInteger) newestBlockObj; - } - - Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM); - BigInteger toIncrement; - if (incrementObj == null) { - toIncrement = newestBlock; - } else { - BigInteger incrementBlock = (BigInteger) incrementObj; - - // 最新区块小于增加区块 - if (newestBlock.compareTo(incrementBlock) <= 0) { - return; - } - toIncrement = incrementBlock.add(BigInteger.ONE); - } - - redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, toIncrement); - } - - @Override - public void run(ApplicationArguments args) throws Exception { - long start = System.currentTimeMillis(); - log.info("区块链监听开始启动"); - Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM); - BigInteger newest = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); - BigInteger block; - if (incrementObj == null) { - block = newest; - } else { - block = (BigInteger) incrementObj; - } - - ChainService.wssContractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name()); - ChainService.wssContractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name()); - -// BigInteger section = BigInteger.valueOf(5000); -// while (newest.subtract(block).compareTo(section) > -1) { -// BigInteger end = block.add(section); -// -// BigInteger finalBlock = block; -// new Thread(() -> { -// log.info("监听:[{} - {}]", finalBlock, end); -// ChainService.contractEventListener(finalBlock, end, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name()); -// ChainService.contractEventListener(finalBlock, end, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name()); -// }).start(); -// -// block = block.add(section); -// if (block.compareTo(newest) > 0) { -// block = newest; -// } -// } -// ChainService.contractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name()); -// ChainService.contractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name()); - - long end = System.currentTimeMillis(); - log.info("区块链监听启动完成, 消耗时间{}", end - start); - } -} diff --git a/src/main/java/cc/mrbird/febs/job/ChainSDMChargeListenerJob.java b/src/main/java/cc/mrbird/febs/job/ChainSDMChargeListenerJob.java new file mode 100644 index 0000000..97b7388 --- /dev/null +++ b/src/main/java/cc/mrbird/febs/job/ChainSDMChargeListenerJob.java @@ -0,0 +1,57 @@ +package cc.mrbird.febs.job; + +import cc.mrbird.febs.common.contants.AppContants; +import cc.mrbird.febs.common.utils.RedisUtils; +import cc.mrbird.febs.dapp.chain.ChainEnum; +import cc.mrbird.febs.dapp.chain.ChainService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.math.BigInteger; + +@Slf4j +@Component +@ConditionalOnProperty(prefix = "system", name = "charge-transfer", havingValue = "true") +public class ChainSDMChargeListenerJob { + + @Autowired + private RedisUtils redisUtils; + + @Scheduled(cron = "0 0/5 * * * ? ") + public void chainBlockUpdate() { + BigInteger blockNumber = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + + redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM_CHARGE, blockNumber); + } + + @Scheduled(cron = "0/2 * * * * ? ") + public void chainIncrementBlock() { + Object newestBlockObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM_CHARGE); + BigInteger newestBlock; + if (newestBlockObj == null) { + newestBlock = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + } else { + newestBlock = (BigInteger) newestBlockObj; + } + + Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM_CHARGE); + BigInteger toIncrement; + if (incrementObj == null) { + toIncrement = newestBlock; + } else { + BigInteger incrementBlock = (BigInteger) incrementObj; + + // 最新区块小于增加区块 + if (newestBlock.compareTo(incrementBlock) <= 0) { + return; + } + toIncrement = incrementBlock.add(BigInteger.ONE); + } + + redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM_CHARGE, toIncrement); + } + +} diff --git a/src/main/java/cc/mrbird/febs/job/ChainSDMChargeRunner.java b/src/main/java/cc/mrbird/febs/job/ChainSDMChargeRunner.java new file mode 100644 index 0000000..ad62b86 --- /dev/null +++ b/src/main/java/cc/mrbird/febs/job/ChainSDMChargeRunner.java @@ -0,0 +1,59 @@ +package cc.mrbird.febs.job; + +import cc.mrbird.febs.common.contants.AppContants; +import cc.mrbird.febs.common.utils.RedisUtils; +import cc.mrbird.febs.dapp.chain.ChainEnum; +import cc.mrbird.febs.dapp.chain.ChainService; +import cc.mrbird.febs.dapp.chain.ContractEventService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.math.BigInteger; + +@Slf4j +@Component +@ConditionalOnProperty(prefix = "system", name = "charge-transfer", havingValue = "true") +public class ChainSDMChargeRunner implements ApplicationRunner { + @Autowired + private ContractEventService bscCoinContractEvent; + + @Autowired + private RedisUtils redisUtils; + + @Override + public void run(ApplicationArguments args) throws Exception { + long start = System.currentTimeMillis(); + log.info("区块链充值开始启动"); + + Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM_CHARGE); + BigInteger newest = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + BigInteger block; + if (incrementObj == null) { + block = newest; + } else { + block = (BigInteger) incrementObj; + } + + BigInteger section = BigInteger.valueOf(5000); + log.info("监听:[{} - {} - {}]", newest,block,newest.subtract(block).compareTo(section) > -1); + while (newest.subtract(block).compareTo(section) > -1) { + BigInteger end = block.add(section); + log.info("监听:[{} - {}]", block, end); + ChainService.sdmChargeEventListener(block, end, bscCoinContractEvent, ChainEnum.BSC_TFC.name()); + + block = block.add(section); + if (block.compareTo(newest) > 0) { + block = newest; + } + } + + ChainService.sdmChargeEventListener(block, null, bscCoinContractEvent, ChainEnum.BSC_TFC.name()); + + long end = System.currentTimeMillis(); + log.info("区块链滑点启动完成, 消耗时间{}", end - start); + } +} diff --git a/src/main/java/cc/mrbird/febs/job/ChainSDMListenerJob.java b/src/main/java/cc/mrbird/febs/job/ChainSDMListenerJob.java new file mode 100644 index 0000000..722d91c --- /dev/null +++ b/src/main/java/cc/mrbird/febs/job/ChainSDMListenerJob.java @@ -0,0 +1,101 @@ +package cc.mrbird.febs.job; + +import cc.mrbird.febs.common.contants.AppContants; +import cc.mrbird.febs.common.utils.RedisUtils; +import cc.mrbird.febs.dapp.chain.ChainEnum; +import cc.mrbird.febs.dapp.chain.ChainService; +import cc.mrbird.febs.dapp.chain.ContractEventService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.math.BigInteger; + +@Slf4j +@Component +@ConditionalOnProperty(prefix = "system", name = "chain-listener", havingValue = "true") +public class ChainSDMListenerJob{ + + @Autowired + private ContractEventService bscUsdtContractEvent; + @Autowired + private ContractEventService bscCoinContractEvent; + + @Autowired + private RedisUtils redisUtils; + + @Scheduled(cron = "0 0/5 * * * ? ") + public void chainBlockUpdate() { + BigInteger blockNumber = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + + redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM, blockNumber); + } + + @Scheduled(cron = "0/2 * * * * ? ") + public void chainIncrementBlock() { + Object newestBlockObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_NEWEST_NUM); + BigInteger newestBlock; + if (newestBlockObj == null) { + newestBlock = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); + } else { + newestBlock = (BigInteger) newestBlockObj; + } + + Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM); + BigInteger toIncrement; + if (incrementObj == null) { + toIncrement = newestBlock; + } else { + BigInteger incrementBlock = (BigInteger) incrementObj; + + // 最新区块小于增加区块 + if (newestBlock.compareTo(incrementBlock) <= 0) { + return; + } + toIncrement = incrementBlock.add(BigInteger.ONE); + } + + redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, toIncrement); + } + +// @Override +// public void run(ApplicationArguments args) throws Exception { +// long start = System.currentTimeMillis(); +// log.info("区块链监听开始启动"); +// Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM); +// BigInteger newest = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber(); +// BigInteger block; +// if (incrementObj == null) { +// block = newest; +// } else { +// block = (BigInteger) incrementObj; +// } +// +// ChainService.wssContractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name()); +// ChainService.wssContractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name()); +// +//// BigInteger section = BigInteger.valueOf(5000); +//// while (newest.subtract(block).compareTo(section) > -1) { +//// BigInteger end = block.add(section); +//// +//// BigInteger finalBlock = block; +//// new Thread(() -> { +//// log.info("监听:[{} - {}]", finalBlock, end); +//// ChainService.contractEventListener(finalBlock, end, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name()); +//// ChainService.contractEventListener(finalBlock, end, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name()); +//// }).start(); +//// +//// block = block.add(section); +//// if (block.compareTo(newest) > 0) { +//// block = newest; +//// } +//// } +//// ChainService.contractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name()); +//// ChainService.contractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name()); +// +// long end = System.currentTimeMillis(); +// log.info("区块链监听启动完成, 消耗时间{}", end - start); +// } +} diff --git a/src/main/java/cc/mrbird/febs/job/ChainSDMRunner.java b/src/main/java/cc/mrbird/febs/job/ChainSDMRunner.java new file mode 100644 index 0000000..8e4c9d3 --- /dev/null +++ b/src/main/java/cc/mrbird/febs/job/ChainSDMRunner.java @@ -0,0 +1,59 @@ +package cc.mrbird.febs.job; + +import cc.mrbird.febs.common.contants.AppContants; +import cc.mrbird.febs.common.utils.RedisUtils; +import cc.mrbird.febs.dapp.chain.ChainEnum; +import cc.mrbird.febs.dapp.chain.ChainService; +import cc.mrbird.febs.dapp.chain.ContractEventService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.math.BigInteger; + +@Slf4j +@Component +@ConditionalOnProperty(prefix = "system", name = "chain-listener", havingValue = "true") +public class ChainSDMRunner implements ApplicationRunner { + @Autowired + private ContractEventService bscUsdtContractEvent; + + @Autowired + private RedisUtils redisUtils; + + @Override + public void run(ApplicationArguments args) throws Exception { + long start = System.currentTimeMillis(); + log.info("区块链USDT开始启动"); + + Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM); + BigInteger newest = ChainService.getInstance(ChainEnum.BSC_USDT.name()).blockNumber(); + BigInteger block; + if (incrementObj == null) { + block = newest; + } else { + block = (BigInteger) incrementObj; + } + + BigInteger section = BigInteger.valueOf(5000); + log.info("监听:[{} - {} - {}]", newest,block,newest.subtract(block).compareTo(section) > -1); + while (newest.subtract(block).compareTo(section) > -1) { + BigInteger end = block.add(section); + log.info("监听:[{} - {}]", block, end); + ChainService.sdmUSDTEventListener(block, end, bscUsdtContractEvent, ChainEnum.BSC_USDT.name()); + + block = block.add(section); + if (block.compareTo(newest) > 0) { + block = newest; + } + } + + ChainService.sdmUSDTEventListener(block, null, bscUsdtContractEvent, ChainEnum.BSC_USDT.name()); + + long end = System.currentTimeMillis(); + log.info("区块链滑点启动完成, 消耗时间{}", end - start); + } +} diff --git a/src/main/resources/application-chain.yml b/src/main/resources/application-chain.yml index 8c3551b..27cd187 100644 --- a/src/main/resources/application-chain.yml +++ b/src/main/resources/application-chain.yml @@ -54,6 +54,7 @@ time-zone: GMT+8 system: + charge-transfer: false online-transfer: true chain-listener: true reset-job: true diff --git a/src/main/resources/application-charge.yml b/src/main/resources/application-charge.yml new file mode 100644 index 0000000..a630dae --- /dev/null +++ b/src/main/resources/application-charge.yml @@ -0,0 +1,62 @@ +spring: + datasource: + dynamic: + # 是否开启 SQL日志输出,生产环境建议关闭,有性能损耗 + p6spy: false + hikari: + connection-timeout: 30000 + max-lifetime: 1800000 + max-pool-size: 15 + min-idle: 5 + connection-test-query: select 1 + pool-name: FebsHikariCP + # 配置默认数据源 + primary: base + datasource: + # 数据源-1,名称为 base + base: + username: db_sdm + password: sdm123!@# + # 8.210.56.119 + driver-class-name: com.mysql.cj.jdbc.Driver + url: jdbc:mysql://127.0.0.1:3306/db_sdm?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2b8 + + redis: + # Redis数据库索引(默认为 0) + database: 15 + # Redis服务器地址 + host: 127.0.0.1 + # Redis服务器连接端口 + port: 6379 + # Redis 密码 + password: 1234!@#$!QAZ + lettuce: + pool: + # 连接池中的最小空闲连接 + min-idle: 8 + # 连接池中的最大空闲连接 + max-idle: 500 + # 连接池最大连接数(使用负值表示没有限制) + max-active: 2000 + # 连接池最大阻塞等待时间(使用负值表示没有限制) + max-wait: 10000 + # 连接超时时间(毫秒) + timeout: 5000 + rabbitmq: + host: 127.0.0.1 + port: 5672 + username: xc_rabbit + password: xuncong123 + publisher-confirm-type: correlated + + jackson: + date-format: yyyy-MM-dd HH:mm:ss + time-zone: GMT+8 + +system: + charge-transfer: true + online-transfer: false + chain-listener: false + reset-job: false + quartz-job: false + debug: false \ No newline at end of file diff --git a/src/main/resources/application-prod.yml b/src/main/resources/application-prod.yml index 58fc337..ee361ed 100644 --- a/src/main/resources/application-prod.yml +++ b/src/main/resources/application-prod.yml @@ -54,6 +54,7 @@ time-zone: GMT+8 system: + charge-transfer: false online-transfer: false chain-listener: false reset-job: false -- Gitblit v1.9.1