From f38db4c91a3655a9024aa2f1ada81a153741b855 Mon Sep 17 00:00:00 2001 From: xiaoyong931011 <15274802129@163.com> Date: Fri, 24 Mar 2023 14:29:28 +0800 Subject: [PATCH] 后台修改 --- src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java | 144 +++++++++++++++++++++++++++++++++++++++++++++-- 1 files changed, 136 insertions(+), 8 deletions(-) diff --git a/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java b/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java index 732ec34..295e271 100644 --- a/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java +++ b/src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java @@ -3,9 +3,13 @@ import cc.mrbird.febs.common.exception.FebsException; import cc.mrbird.febs.mall.chain.enums.ChainEnum; import cc.mrbird.febs.mall.chain.enums.EthService; +import cn.hutool.core.util.ObjectUtil; import io.reactivex.Flowable; import io.reactivex.disposables.Disposable; +import io.reactivex.schedulers.Schedulers; import lombok.extern.slf4j.Slf4j; +import org.java_websocket.WebSocket; +import org.java_websocket.exceptions.WebsocketNotConnectedException; import org.web3j.crypto.Credentials; import org.web3j.protocol.Web3j; import org.web3j.protocol.core.DefaultBlockParameter; @@ -18,9 +22,14 @@ import org.web3j.tx.gas.StaticGasProvider; import java.math.BigInteger; +import java.net.ConnectException; import java.net.URI; +import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * @author @@ -78,27 +87,146 @@ WebSocketService ws = null; WebSocketClient webSocketClient = null; Web3j web3j = null; + ChainEnum chain = ChainEnum.getValueByName(type); + +// try { +// webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16")); +// ws = new WebSocketService(webSocketClient, true); +// ws.connect(); +// } catch (Exception e) { +// try { +// ws.close(); +// //TODO +// if(ObjectUtil.isEmpty(webSocketClient)){ +// webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16")); +// ws = new WebSocketService(webSocketClient, true); +// ws.connect(); +// } +// WebSocket.READYSTATE readyState = webSocketClient.getReadyState(); +// if((WebSocket.READYSTATE.OPEN != readyState +// && WebSocket.READYSTATE.CONNECTING != readyState)){ +// webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16")); +// ws = new WebSocketService(webSocketClient, true); +// ws.connect(); +// } +// } catch (Exception connectException) { +// connectException.printStackTrace(); +// } +// } + try { webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16")); ws = new WebSocketService(webSocketClient, true); ws.connect(); web3j = Web3j.build(ws); - ChainEnum chain = ChainEnum.getValueByName(type); + assert chain != null; + try{ + EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j); + EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress()); + Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter); + Disposable subscribe = eventFlowable.subscribe(event::compile, error -> { + log.error("币安监听异常", error); - EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j); - EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress()); + }); + }catch(WebsocketNotConnectedException e){ + ws.close(); + webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16")); + ws = new WebSocketService(webSocketClient, true); + ws.connect(); + web3j = Web3j.build(ws); + assert chain != null; + EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j); + EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress()); + Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter); + Disposable subscribe = eventFlowable.subscribe(event::compile, error -> { + log.error("币安监听异常", error); + }); + } - Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter); - Disposable subscribe = eventFlowable.subscribe(event::compile, error -> { - log.error("币安监听异常", error); - }); } catch (Exception e) { e.printStackTrace(); } + } + + public static void wssContractEventListener2(BigInteger startBlock, ContractEventService event, String type) { + ChainEnum chain = ChainEnum.getValueByName(type); + + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + WebSocketService ws = null; + try{ + WebSocketClient webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/fcb1ae31845147dcabb183db57336218")); + ws = new WebSocketService(webSocketClient, true); + ws.connect(); + Web3j web3j = Web3j.build(ws); + + EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j); + EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress()); + Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter); + + // 订阅事件流,将事件编译和处理放到单独的线程中执行 + Disposable subscribe = eventFlowable.observeOn(Schedulers.from(executor)) + .subscribe(event::compile, error -> { + log.error("币安监听异常", error); + error = error.getCause(); + if (error instanceof WebsocketNotConnectedException) { + log.error("WebSocket连接已断开,正在尝试重新连接..."); + // 异常情况下,递归重试连接和订阅 + int retries = 3; + while (retries-- > 0) { + try { + Thread.sleep(1000); + wssContractEventListener2(startBlock, event, type); + return; + } catch (Exception ex) { + log.error("币安监听异常", ex); + } + } + } else { + log.error("币安监听异常", error); + } + }); + + // 使用try-with-resources语句,确保WebSocket连接和Disposable对象能够被及时关闭和释放 + try{ + // 等待订阅事件流的线程结束,然后关闭连接和释放资源 + executor.schedule(ws::close, 1, TimeUnit.SECONDS); + executor.awaitTermination(2, TimeUnit.SECONDS); + subscribe.dispose(); + } catch (InterruptedException e) { + e.printStackTrace(); + // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况 + int retries = 3; + while (retries-- > 0) { + try { + Thread.sleep(1000); + wssContractEventListener2(startBlock, event, type); + return; + } catch (Exception ex) { + log.error("币安监听异常", ex); + } + } + } + } catch (Exception e) { + log.error("币安监听异常", e); + + // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况 + int retries = 3; + while (retries-- > 0) { + try { + Thread.sleep(1000); + wssContractEventListener2(startBlock, event, type); + return; + } catch (Exception ex) { + log.error("币安监听异常", ex); + } + } + } finally { + executor.shutdownNow(); + } } @@ -109,7 +237,7 @@ Web3j web3j = null; try { - webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16")); + webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/fcb1ae31845147dcabb183db57336218")); ws = new WebSocketService(webSocketClient, true); ws.connect(); web3j = Web3j.build(ws); -- Gitblit v1.9.1