From f85e2f339dc29bf0abd2e02c2696af73137b6d5d Mon Sep 17 00:00:00 2001
From: KKSU <15274802129@163.com>
Date: Fri, 07 Jun 2024 11:32:45 +0800
Subject: [PATCH] 将监听充值新建一个项目

---
 src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java |   95 ++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 73 insertions(+), 22 deletions(-)

diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
index cb25b7c..f21a835 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
@@ -1,47 +1,27 @@
 package cc.mrbird.febs.dapp.chain;
 
 import cc.mrbird.febs.common.exception.FebsException;
-import cn.hutool.core.codec.Base64;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.http.HttpUtil;
-import com.alibaba.fastjson.JSONObject;
 import io.reactivex.Flowable;
 import io.reactivex.disposables.Disposable;
+import io.reactivex.schedulers.Schedulers;
 import lombok.extern.slf4j.Slf4j;
-import okhttp3.Interceptor;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.Response;
-import org.springframework.data.repository.query.ParameterOutOfBoundsException;
-import org.springframework.util.Base64Utils;
-import org.web3j.abi.FunctionReturnDecoder;
-import org.web3j.abi.TypeReference;
-import org.web3j.abi.datatypes.Address;
-import org.web3j.abi.datatypes.Type;
-import org.web3j.abi.datatypes.generated.Uint256;
 import org.web3j.crypto.Credentials;
 import org.web3j.protocol.Web3j;
 import org.web3j.protocol.core.DefaultBlockParameter;
 import org.web3j.protocol.core.DefaultBlockParameterName;
 import org.web3j.protocol.core.DefaultBlockParameterNumber;
 import org.web3j.protocol.core.methods.request.EthFilter;
-import org.web3j.protocol.core.methods.response.TransactionReceipt;
 import org.web3j.protocol.http.HttpService;
 import org.web3j.protocol.websocket.WebSocketClient;
 import org.web3j.protocol.websocket.WebSocketService;
 import org.web3j.tx.gas.StaticGasProvider;
 
-import java.io.IOException;
-import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.rmi.activation.UnknownObjectException;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * @author
@@ -99,6 +79,77 @@
         });
     }
 
+    public static void sdmUSDTEventListener(BigInteger startBlock, BigInteger endBlock, ContractEventService event, String type) {
+        ChainEnum chain = ChainEnum.getValueByName(type);
+        assert chain != null;
+
+        EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl());
+        EthFilter filter = getFilter(startBlock, endBlock, chain.getContractAddress());
+
+        Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = contract.transferEventFlowable(filter)
+                .doOnError(throwable ->
+                        log.error("合约事件监听发生错误: " + throwable.getMessage(), throwable)) // 更具体的错误日志记录
+                .retryWhen(errors -> {
+                    AtomicInteger counter = new AtomicInteger();
+                    return errors.takeWhile(e -> counter.getAndIncrement() != 3)
+                            .flatMap(e -> {
+                                System.out.println("delay retry by " + counter.get() + " second(s)");
+                                return Flowable.timer(counter.get(), TimeUnit.SECONDS);
+                            });
+                })
+                .subscribeOn(Schedulers.io()); // 指定subscribe操作在IO线程中执行,避免阻塞主线程
+
+        eventFlowable.subscribe(
+                e -> {
+                    try {
+                        event.sdmUSDT(e); // 处理事件
+                    } catch (Exception ex) {
+                        // 处理事件时可能出现的异常
+                        log.error("处理合约事件时出错", ex);
+                    }
+                },
+                Throwable::printStackTrace, // 打印错误堆栈,或者可以替换为更具体的错误处理逻辑
+                () -> log.info("合约事件监听已完成") // 在Flowable完成时执行的逻辑,如记录日志等
+        );
+
+    }
+
+
+    public static void sdmChargeEventListener(BigInteger startBlock, BigInteger endBlock, ContractEventService event, String type) {
+        ChainEnum chain = ChainEnum.getValueByName(type);
+        assert chain != null;
+
+        EthUsdtContract contract = contract(chain.getPrivateKey(), chain.getContractAddress(), chain.getUrl());
+        EthFilter filter = getFilter(startBlock, endBlock, chain.getContractAddress());
+
+        Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = contract.transferEventFlowable(filter)
+                .doOnError(throwable ->
+                        log.error("合约事件监听发生错误: " + throwable.getMessage(), throwable)) // 更具体的错误日志记录
+                .retryWhen(errors -> {
+                    AtomicInteger counter = new AtomicInteger();
+                    return errors.takeWhile(e -> counter.getAndIncrement() != 3)
+                            .flatMap(e -> {
+                                System.out.println("delay retry by " + counter.get() + " second(s)");
+                                return Flowable.timer(counter.get(), TimeUnit.SECONDS);
+                            });
+                })
+                .subscribeOn(Schedulers.io()); // 指定subscribe操作在IO线程中执行,避免阻塞主线程
+
+        eventFlowable.subscribe(
+                e -> {
+                    try {
+                        event.compile(e); // 处理事件
+                    } catch (Exception ex) {
+                        // 处理事件时可能出现的异常
+                        log.error("处理合约事件时出错", ex);
+                    }
+                },
+                Throwable::printStackTrace, // 打印错误堆栈,或者可以替换为更具体的错误处理逻辑
+                () -> log.info("合约事件监听已完成") // 在Flowable完成时执行的逻辑,如记录日志等
+        );
+
+    }
+
     public static void wssContractEventListener(BigInteger startBlock, ContractEventService event, String type) {
         WebSocketService ws = null;
         WebSocketClient webSocketClient = null;

--
Gitblit v1.9.1