From b50ce4f17ad242f8e27d9fae71ee4c01623a5983 Mon Sep 17 00:00:00 2001
From: xiaoyong931011 <15274802129@163.com>
Date: Wed, 29 Mar 2023 10:45:50 +0800
Subject: [PATCH] 后台修改
---
src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java | 66 ++++++++++++++++++++------------
1 files changed, 41 insertions(+), 25 deletions(-)
diff --git a/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java b/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java
index 295e271..ee03afd 100644
--- a/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java
+++ b/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java
@@ -4,6 +4,7 @@
import cc.mrbird.febs.mall.chain.enums.ChainEnum;
import cc.mrbird.febs.mall.chain.enums.EthService;
import cn.hutool.core.util.ObjectUtil;
+import com.alipay.api.java_websocket.enums.ReadyState;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
@@ -155,7 +156,7 @@
public static void wssContractEventListener2(BigInteger startBlock, ContractEventService event, String type) {
ChainEnum chain = ChainEnum.getValueByName(type);
- ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
+// ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
WebSocketService ws = null;
try{
WebSocketClient webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/fcb1ae31845147dcabb183db57336218"));
@@ -164,12 +165,26 @@
Web3j web3j = Web3j.build(ws);
EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j);
+
EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress());
Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter);
-
+ // 判断websocket是否连接成功
+ if (!webSocketClient.isOpen()) {
+ log.error("WebSocket连接失败");
+ int retries = 3;
+ while (retries-- > 0) {
+ try {
+ Thread.sleep(1000);
+ wssContractEventListener2(startBlock, event, type);
+ return;
+ } catch (Exception ex) {
+ log.error("币安监听异常", ex);
+ }
+ }
+ return;
+ }
// 订阅事件流,将事件编译和处理放到单独的线程中执行
- Disposable subscribe = eventFlowable.observeOn(Schedulers.from(executor))
- .subscribe(event::compile, error -> {
+ Disposable subscribe = eventFlowable.subscribe(event::compile, error -> {
log.error("币安监听异常", error);
error = error.getCause();
if (error instanceof WebsocketNotConnectedException) {
@@ -191,25 +206,25 @@
});
// 使用try-with-resources语句,确保WebSocket连接和Disposable对象能够被及时关闭和释放
- try{
- // 等待订阅事件流的线程结束,然后关闭连接和释放资源
- executor.schedule(ws::close, 1, TimeUnit.SECONDS);
- executor.awaitTermination(2, TimeUnit.SECONDS);
- subscribe.dispose();
- } catch (InterruptedException e) {
- e.printStackTrace();
- // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况
- int retries = 3;
- while (retries-- > 0) {
- try {
- Thread.sleep(1000);
- wssContractEventListener2(startBlock, event, type);
- return;
- } catch (Exception ex) {
- log.error("币安监听异常", ex);
- }
- }
- }
+// try{
+// // 等待订阅事件流的线程结束,然后关闭连接和释放资源
+//// executor.schedule(ws::close, 1, TimeUnit.SECONDS);
+//// executor.awaitTermination(10, TimeUnit.SECONDS);
+//// subscribe.dispose();
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况
+// int retries = 3;
+// while (retries-- > 0) {
+// try {
+// Thread.sleep(1000);
+// wssContractEventListener2(startBlock, event, type);
+// return;
+// } catch (Exception ex) {
+// log.error("币安监听异常", ex);
+// }
+// }
+// }
} catch (Exception e) {
log.error("币安监听异常", e);
@@ -224,9 +239,10 @@
log.error("币安监听异常", ex);
}
}
- } finally {
- executor.shutdownNow();
}
+// finally {
+// executor.shutdownNow();
+// }
}
--
Gitblit v1.9.1