package cc.mrbird.febs.mall.chain.ercCoin; import cc.mrbird.febs.common.exception.FebsException; import io.reactivex.Flowable; import io.reactivex.schedulers.Schedulers; import lombok.extern.slf4j.Slf4j; import org.web3j.crypto.Credentials; import org.web3j.protocol.Web3j; import org.web3j.protocol.core.DefaultBlockParameter; import org.web3j.protocol.core.DefaultBlockParameterName; import org.web3j.protocol.core.DefaultBlockParameterNumber; import org.web3j.protocol.core.methods.request.EthFilter; import org.web3j.protocol.http.HttpService; import org.web3j.tx.gas.StaticGasProvider; import java.math.BigInteger; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @Slf4j public class ChainService { private final static Map contractMap = new HashMap<>(); static { for (ChainEnum chain : ChainEnum.values()) { contractMap.put(chain.name(), new EthService(chain.getUrl(), chain.getAddress(), chain.getPrivateKey(), chain.getContractAddress())); } } private ChainService() { } public final static ChainService INSTANCE = new ChainService(); public static ContractChainService getInstance(String chainType) { ContractChainService contract = contractMap.get(chainType); if (contract == null) { throw new FebsException("参数错误"); } return contract; } public static void sdmUSDTEventListener(BigInteger startBlock, BigInteger endBlock, ContractEventService event, String type) { ChainEnum chain = ChainEnum.getValueByName(type); assert chain != null; EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl()); EthFilter filter = getFilter(startBlock, endBlock, chain.getContractAddress()); Flowable eventFlowable = contract.transferEventFlowable(filter) .doOnError(throwable -> log.error("合约事件监听发生错误: " + throwable.getMessage(), throwable)) // 更具体的错误日志记录 .retryWhen(errors -> { AtomicInteger counter = new AtomicInteger(); return errors.takeWhile(e -> counter.getAndIncrement() != 3) .flatMap(e -> { System.out.println("delay retry by " + counter.get() + " second(s)"); return Flowable.timer(counter.get(), TimeUnit.SECONDS); }); }) .subscribeOn(Schedulers.io()); // 指定subscribe操作在IO线程中执行,避免阻塞主线程 eventFlowable.subscribe( e -> { try { event.sdmUSDT(e); // 处理事件 } catch (Exception ex) { // 处理事件时可能出现的异常 log.error("处理合约事件时出错", ex); } }, Throwable::printStackTrace, // 打印错误堆栈,或者可以替换为更具体的错误处理逻辑 () -> log.info("合约事件监听已完成") // 在Flowable完成时执行的逻辑,如记录日志等 ); } private static EthUsdtContract contract(String privateKey, String contractAddress, String url) { Credentials credentials = Credentials.create(privateKey); HttpService httpService = new HttpService(url); return EthUsdtContract.load(contractAddress, Web3j.build(httpService), credentials, new StaticGasProvider(BigInteger.valueOf(4500000L), BigInteger.valueOf(200000L))); } private static EthFilter getFilter(BigInteger startBlock, BigInteger endBlock, String contractAddress) { DefaultBlockParameter startParameterName = null; DefaultBlockParameter endParameterName = null; if (startBlock != null) { startParameterName = new DefaultBlockParameterNumber(startBlock); } else { startParameterName = DefaultBlockParameterName.EARLIEST; } if (endBlock != null) { endParameterName = new DefaultBlockParameterNumber(endBlock); } else { endParameterName = DefaultBlockParameterName.LATEST; } return new EthFilter(startParameterName, endParameterName, contractAddress); } }