| | |
| | | import cn.hutool.core.util.ObjectUtil; |
| | | import io.reactivex.Flowable; |
| | | import io.reactivex.disposables.Disposable; |
| | | import io.reactivex.schedulers.Schedulers; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.java_websocket.WebSocket; |
| | | import org.java_websocket.exceptions.WebsocketNotConnectedException; |
| | |
| | | import java.net.URISyntaxException; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | | * @author |
| | |
| | | |
| | | } |
| | | |
| | | public static void wssContractEventListener2(BigInteger startBlock, ContractEventService event, String type) { |
| | | ChainEnum chain = ChainEnum.getValueByName(type); |
| | | |
| | | ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); |
| | | WebSocketService ws = null; |
| | | try{ |
| | | WebSocketClient webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16")); |
| | | ws = new WebSocketService(webSocketClient, true); |
| | | ws.connect(); |
| | | Web3j web3j = Web3j.build(ws); |
| | | |
| | | EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j); |
| | | EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress()); |
| | | Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter); |
| | | |
| | | // 订阅事件流,将事件编译和处理放到单独的线程中执行 |
| | | Disposable subscribe = eventFlowable.observeOn(Schedulers.from(executor)) |
| | | .subscribe(event::compile, error -> { |
| | | log.error("币安监听异常", error); |
| | | // 异常情况下,递归重试连接和订阅 |
| | | int retries = 3; |
| | | while (retries-- > 0) { |
| | | try { |
| | | Thread.sleep(1000); |
| | | wssContractEventListener(startBlock, event, type); |
| | | return; |
| | | } catch (Exception ex) { |
| | | log.error("币安监听异常", ex); |
| | | } |
| | | } |
| | | }); |
| | | |
| | | // 使用try-with-resources语句,确保WebSocket连接和Disposable对象能够被及时关闭和释放 |
| | | try{ |
| | | // 等待订阅事件流的线程结束,然后关闭连接和释放资源 |
| | | executor.schedule(ws::close, 1, TimeUnit.SECONDS); |
| | | executor.awaitTermination(2, TimeUnit.SECONDS); |
| | | subscribe.dispose(); |
| | | } catch (InterruptedException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("币安监听异常", e); |
| | | |
| | | // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况 |
| | | int retries = 3; |
| | | while (retries-- > 0) { |
| | | try { |
| | | Thread.sleep(1000); |
| | | wssContractEventListener(startBlock, event, type); |
| | | return; |
| | | } catch (Exception ex) { |
| | | log.error("币安监听异常", ex); |
| | | } |
| | | } |
| | | } finally { |
| | | executor.shutdownNow(); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | public static void wssBaseCoinEventListener(BigInteger startBlock, BaseCoinService event) { |