xiaoyong931011
2023-03-24 4d181e48d4730e94557d30fbe5a4b4fca65f8912
后台修改
2 files modified
50 ■■■■ changed files
src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java 48 ●●●● patch | view | raw | blame | history
src/test/java/cc/mrbird/febs/ProfitTest.java 2 ●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/mall/chain/service/ChainService.java
@@ -155,7 +155,7 @@
    public static void wssContractEventListener2(BigInteger startBlock, ContractEventService event, String type) {
        ChainEnum chain = ChainEnum.getValueByName(type);
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
//        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        WebSocketService ws = null;
        try{
            WebSocketClient webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/fcb1ae31845147dcabb183db57336218"));
@@ -172,8 +172,7 @@
                return;
            }
            // 订阅事件流,将事件编译和处理放到单独的线程中执行
            Disposable subscribe = eventFlowable.observeOn(Schedulers.from(executor))
                    .subscribe(event::compile, error -> {
            Disposable subscribe = eventFlowable.subscribe(event::compile, error -> {
                        log.error("币安监听异常", error);
                        error = error.getCause();
                        if (error instanceof WebsocketNotConnectedException) {
@@ -195,25 +194,25 @@
                    });
            // 使用try-with-resources语句,确保WebSocket连接和Disposable对象能够被及时关闭和释放
            try{
                // 等待订阅事件流的线程结束,然后关闭连接和释放资源
                executor.schedule(ws::close, 1, TimeUnit.SECONDS);
                executor.awaitTermination(10, TimeUnit.SECONDS);
//                subscribe.dispose();
            } catch (InterruptedException e) {
                e.printStackTrace();
                // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况
                int retries = 3;
                while (retries-- > 0) {
                    try {
                        Thread.sleep(1000);
                        wssContractEventListener2(startBlock, event, type);
                        return;
                    } catch (Exception ex) {
                        log.error("币安监听异常", ex);
                    }
                }
            }
//            try{
//                // 等待订阅事件流的线程结束,然后关闭连接和释放资源
////                executor.schedule(ws::close, 1, TimeUnit.SECONDS);
////                executor.awaitTermination(10, TimeUnit.SECONDS);
////                subscribe.dispose();
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//                // 将递归重试连接和订阅改为有限次数的重试,避免出现无限递归的情况
//                int retries = 3;
//                while (retries-- > 0) {
//                    try {
//                        Thread.sleep(1000);
//                        wssContractEventListener2(startBlock, event, type);
//                        return;
//                    } catch (Exception ex) {
//                        log.error("币安监听异常", ex);
//                    }
//                }
//            }
        } catch (Exception e) {
            log.error("币安监听异常", e);
@@ -228,9 +227,10 @@
                    log.error("币安监听异常", ex);
                }
            }
        } finally {
            executor.shutdownNow();
        }
//        finally {
//            executor.shutdownNow();
//        }
    }
src/test/java/cc/mrbird/febs/ProfitTest.java
@@ -510,7 +510,7 @@
            block = newest;
        }
//        ChainService.wssBaseCoinEventListener(BigInteger.valueOf(26737044), baseCoinService);
        ChainService.wssContractEventListener2(BigInteger.valueOf(26737928), bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
        ChainService.wssContractEventListener2(BigInteger.valueOf(26738831), bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
    }
//
//    @Test