xiaoyong931011
2023-03-24 a6595316ae375294e41cc502701814a2c7f7b994
src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java
@@ -4,8 +4,10 @@
import cc.mrbird.febs.mall.chain.enums.ChainEnum;
import cc.mrbird.febs.mall.chain.enums.EthService;
import cn.hutool.core.util.ObjectUtil;
import com.alipay.api.java_websocket.enums.ReadyState;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.WebSocket;
import org.java_websocket.exceptions.WebsocketNotConnectedException;
@@ -26,6 +28,9 @@
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
 * @author
@@ -85,41 +90,42 @@
        Web3j web3j = null;
        ChainEnum chain = ChainEnum.getValueByName(type);
//        try {
//            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
//            ws = new WebSocketService(webSocketClient, true);
//            ws.connect();
//        } catch (Exception e) {
//            try {
//                ws.close();
//                //TODO
//                if(ObjectUtil.isEmpty(webSocketClient)){
//                    webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
//                    ws = new WebSocketService(webSocketClient, true);
//                    ws.connect();
//                }
//                WebSocket.READYSTATE readyState = webSocketClient.getReadyState();
//                if((WebSocket.READYSTATE.OPEN != readyState
//                        && WebSocket.READYSTATE.CONNECTING != readyState)){
//                    webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
//                    ws = new WebSocketService(webSocketClient, true);
//                    ws.connect();
//                }
//            } catch (Exception connectException) {
//                connectException.printStackTrace();
//            }
//        }
        try {
            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
            ws = new WebSocketService(webSocketClient, true);
            ws.connect();
        } catch (Exception e) {
            try {
                ws.close();
                //TODO
                if(ObjectUtil.isEmpty(webSocketClient)){
                    webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
                    ws = new WebSocketService(webSocketClient, true);
                    ws.connect();
                }
                WebSocket.READYSTATE readyState = webSocketClient.getReadyState();
                if((WebSocket.READYSTATE.OPEN != readyState
                        && WebSocket.READYSTATE.CONNECTING != readyState)){
                    webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
                    ws = new WebSocketService(webSocketClient, true);
                    ws.connect();
                }
            } catch (Exception connectException) {
                connectException.printStackTrace();
            }
        }
        try {
            web3j = Web3j.build(ws);
            assert chain != null;
            try{
                EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j);
                EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress());
                Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter);
                Disposable subscribe = eventFlowable.subscribe(event::compile, error -> {
                    log.error("币安监听异常", error);
@@ -127,24 +133,13 @@
                });
            }catch(WebsocketNotConnectedException e){
                ws.close();
                if(ObjectUtil.isEmpty(webSocketClient)){
                    webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
                    ws = new WebSocketService(webSocketClient, true);
                    ws.connect();
                }
                WebSocket.READYSTATE readyState = webSocketClient.getReadyState();
                if((WebSocket.READYSTATE.OPEN != readyState
                        && WebSocket.READYSTATE.CONNECTING != readyState)){
                    webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
                    ws = new WebSocketService(webSocketClient, true);
                    ws.connect();
                }
                webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
                ws = new WebSocketService(webSocketClient, true);
                ws.connect();
                web3j = Web3j.build(ws);
                assert chain != null;
                EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j);
                EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress());
                Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter);
                Disposable subscribe = eventFlowable.subscribe(event::compile, error -> {
                    log.error("币安监听异常", error);
@@ -158,6 +153,98 @@
    }
    public static void wssContractEventListener2(BigInteger startBlock, ContractEventService event, String type) {
        ChainEnum chain = ChainEnum.getValueByName(type);
//        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        WebSocketService ws = null;
        try{
            WebSocketClient webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/fcb1ae31845147dcabb183db57336218"));
            ws = new WebSocketService(webSocketClient, true);
            ws.connect();
            Web3j web3j = Web3j.build(ws);
            EthUsdtContract ethUsdtContract = wssContract(chain.getPrivateKey(), chain.getContractAddress(), web3j);
            EthFilter filter = getFilter(startBlock, startBlock, chain.getContractAddress());
            Flowable<EthUsdtContract.TransferEventResponse> eventFlowable = ethUsdtContract.transferEventFlowable(filter);
            // 判断websocket是否连接成功
            if (!webSocketClient.isOpen()) {
                log.error("WebSocket连接失败");
                int retries = 3;
                while (retries-- > 0) {
                    try {
                        Thread.sleep(1000);
                        wssContractEventListener2(startBlock, event, type);
                        return;
                    } catch (Exception ex) {
                        log.error("币安监听异常", ex);
                    }
                }
                return;
            }
            // 订阅事件流,将事件编译和处理放到单独的线程中执行
            Disposable subscribe = eventFlowable.subscribe(event::compile, error -> {
                        log.error("币安监听异常", error);
                        error = error.getCause();
                        if (error instanceof WebsocketNotConnectedException) {
                            log.error("WebSocket连接已断开,正在尝试重新连接...");
                            // 异常情况下,递归重试连接和订阅
                            int retries = 3;
                            while (retries-- > 0) {
                                try {
                                    Thread.sleep(1000);
                                    wssContractEventListener2(startBlock, event, type);
                                    return;
                                } catch (Exception ex) {
                                    log.error("币安监听异常", ex);
                                }
                            }
                        } else {
                            log.error("币安监听异常", error);
                        }
                    });
            // 使用try-with-resources语句,确保WebSocket连接和Disposable对象能够被及时关闭和释放
//            try{
//                // 等待订阅事件流的线程结束,然后关闭连接和释放资源
////                executor.schedule(ws::close, 1, TimeUnit.SECONDS);
////                executor.awaitTermination(10, TimeUnit.SECONDS);
////                subscribe.dispose();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//                // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况
//                int retries = 3;
//                while (retries-- > 0) {
//                    try {
//                        Thread.sleep(1000);
//                        wssContractEventListener2(startBlock, event, type);
//                        return;
//                    } catch (Exception ex) {
//                        log.error("币安监听异常", ex);
//                    }
//                }
//            }
        } catch (Exception e) {
            log.error("币安监听异常", e);
            // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况
            int retries = 3;
            while (retries-- > 0) {
                try {
                    Thread.sleep(1000);
                    wssContractEventListener2(startBlock, event, type);
                    return;
                } catch (Exception ex) {
                    log.error("币安监听异常", ex);
                }
            }
        }
//        finally {
//            executor.shutdownNow();
//        }
    }
    public static void wssBaseCoinEventListener(BigInteger startBlock, BaseCoinService event) {
@@ -166,7 +253,7 @@
        Web3j web3j = null;
        try {
            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/fcb1ae31845147dcabb183db57336218"));
            ws = new WebSocketService(webSocketClient, true);
            ws.connect();
            web3j = Web3j.build(ws);