src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java
@@ -60,7 +60,7 @@ private final PnLPriceMode unrealizedPnlPriceMode; private final BigDecimal maxPosSize; private BigDecimal step; private String instIdCode; private Long instIdCode; private OkxConfig(Builder builder) { this.apiKey = builder.apiKey; @@ -141,8 +141,8 @@ public BigDecimal getStep() { return step; } public void setStep(BigDecimal step) { this.step = step; } public String getInstIdCode() { return instIdCode; } public void setInstIdCode(String instIdCode) { this.instIdCode = instIdCode; } public Long getInstIdCode() { return instIdCode; } public void setInstIdCode(Long instIdCode) { this.instIdCode = instIdCode; } // ==================== 环境 ==================== src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
@@ -63,6 +63,8 @@ private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); private final AtomicBoolean isDestroyed = new AtomicBoolean(false); private final AtomicBoolean reconnecting = new AtomicBoolean(false); private final List<OkxChannelHandler> channelHandlers = new ArrayList<>(); @@ -132,6 +134,7 @@ } public void destroy() { isDestroyed.set(true); log.info("[WS] 开始销毁..."); if (webSocketClient != null && webSocketClient.isOpen()) { @@ -169,8 +172,7 @@ } private void connect() { if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[WS] 连接进行中,跳过重复请求"); if (!isConnecting.compareAndSet(false, true)) { return; } try { @@ -214,13 +216,12 @@ isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { if (isDestroyed.get() || sharedExecutor == null || sharedExecutor.isShutdown() || sharedExecutor.isTerminated()) { return; } sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); } }); } else { log.warn("[WS] 线程池已关闭,不执行重连"); } } @Override @@ -311,14 +312,17 @@ private void checkHeartbeatTimeout() { if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) { log.info("[WS] 心跳超时,发送ping"); sendPing(); } } private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { webSocketClient.send("ping"); log.debug("[WS] 发送 ping 请求"); log.info("[WS] 发送ping请求"); } } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); } } @@ -327,13 +331,43 @@ if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } private int reconnectAttempt = 0; private void reconnectWithBackoff() throws InterruptedException { int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } if (!reconnecting.compareAndSet(false, true)) { return; } log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts); try { long delayMs = 5000L; while (!isDestroyed.get() && !isConnected.get()) { try { connect(); for (int i = 0; i < 20 && !isDestroyed.get() && !isConnected.get(); i++) { Thread.sleep(1000); } if (isConnected.get()) { reconnectAttempt = 0; log.info("[WS] 重连成功"); return; } if (isDestroyed.get()) { return; } } catch (Exception e) { log.warn("[WS] 第{}次重连失败: {}", reconnectAttempt + 1, e.getMessage()); } reconnectAttempt++; log.info("[WS] 第{}次重连等待{}秒...", reconnectAttempt, delayMs / 1000); for (long waited = 0; waited < delayMs && !isDestroyed.get(); waited += 1000) { Thread.sleep(1000); if (isConnected.get()) return; } long jitter = (long) (Math.random() * 3000); delayMs = Math.min(delayMs * 2, 60000L) + jitter; } } finally { reconnecting.set(false); } } private void shutdownExecutorGracefully(ExecutorService executor) { src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java
@@ -44,7 +44,24 @@ return isSuccess(result, "设置杠杆"); } public String fetchInstIdCode(String instType, String instId) { public boolean cancelAllOrders(String instId) { JSONObject params = new JSONObject(); params.put("instType", "SWAP"); params.put("instId", instId); JSONObject result = post("/api/v5/trade/cancel-all-orders", params); return isSuccess(result, "撤销所有委托单"); } public boolean closeAllPositions(String instId, String mgnMode) { JSONObject params = new JSONObject(); params.put("instType", "SWAP"); params.put("instId", instId); params.put("mgnMode", mgnMode); JSONObject result = post("/api/v5/trade/close-all-positions", params); return isSuccess(result, "平仓ETH"); } public Long fetchInstIdCode(String instType, String instId) { String path = "/api/v5/account/instruments?instType=" + instType + "&instId=" + instId; JSONObject result = get(path); if (result == null || !"0".equals(result.getString("code"))) { @@ -59,7 +76,7 @@ return null; } JSONObject first = data.getJSONObject(0); String instIdCode = first.getString("instIdCode"); Long instIdCode = first.getLong("instIdCode"); log.info("[REST] 获取instIdCode成功, instId:{}, instIdCode:{}", instId, instIdCode); return instIdCode; } src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java
@@ -48,13 +48,13 @@ private final String contract; private final String marginMode; private final String accountName; private final String instIdCode; private final Long instIdCode; private volatile WebSocketClient wsClient; private final ExecutorService executor; public OkxTradeExecutor(String contract, String marginMode, String accountName, String instIdCode) { public OkxTradeExecutor(String contract, String marginMode, String accountName, Long instIdCode) { this.contract = contract; this.marginMode = marginMode; this.accountName = accountName; src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java
@@ -104,6 +104,10 @@ config.getPassphrase(), !config.isProduction()); log.info("[管理器] 清理原有持仓和委托..."); restClient.cancelAllOrders(config.getContract()); restClient.closeAllPositions(config.getContract(), config.getMarginMode()); boolean posModeOk = restClient.setPositionMode(config.getPosMode()); if (!posModeOk) { log.error("[管理器] 设置持仓方式失败,策略可能无法正常运作"); @@ -115,7 +119,7 @@ log.error("[管理器] 设置杠杆倍数失败,策略可能无法正常运作"); } String instIdCode = restClient.fetchInstIdCode("SWAP", config.getContract()); Long instIdCode = restClient.fetchInstIdCode("SWAP", config.getContract()); if (instIdCode != null) { config.setInstIdCode(instIdCode); } else {