From afc858458cbdde1d5f3f2cfce6d056656bf75c16 Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Wed, 13 May 2026 21:39:39 +0800
Subject: [PATCH] refactor(okxNewPrice): 账户配置
---
src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java | 6 +-
src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java | 66 +++++++++++++++++++++++++--------
src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java | 21 +++++++++-
src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java | 4 +-
src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java | 6 ++
5 files changed, 79 insertions(+), 24 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java
index cdcd8de..50332dd 100644
--- a/src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java
@@ -60,7 +60,7 @@
private final PnLPriceMode unrealizedPnlPriceMode;
private final BigDecimal maxPosSize;
private BigDecimal step;
- private String instIdCode;
+ private Long instIdCode;
private OkxConfig(Builder builder) {
this.apiKey = builder.apiKey;
@@ -141,8 +141,8 @@
public BigDecimal getStep() { return step; }
public void setStep(BigDecimal step) { this.step = step; }
- public String getInstIdCode() { return instIdCode; }
- public void setInstIdCode(String instIdCode) { this.instIdCode = instIdCode; }
+ public Long getInstIdCode() { return instIdCode; }
+ public void setInstIdCode(Long instIdCode) { this.instIdCode = instIdCode; }
// ==================== 环境 ====================
diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
index 7af9646..19a423f 100644
--- a/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
@@ -63,6 +63,8 @@
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
+ private final AtomicBoolean isDestroyed = new AtomicBoolean(false);
+ private final AtomicBoolean reconnecting = new AtomicBoolean(false);
private final List<OkxChannelHandler> channelHandlers = new ArrayList<>();
@@ -132,6 +134,7 @@
}
public void destroy() {
+ isDestroyed.set(true);
log.info("[WS] 开始销毁...");
if (webSocketClient != null && webSocketClient.isOpen()) {
@@ -169,8 +172,7 @@
}
private void connect() {
- if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
- log.info("[WS] 连接进行中,跳过重复请求");
+ if (!isConnecting.compareAndSet(false, true)) {
return;
}
try {
@@ -214,13 +216,12 @@
isConnected.set(false);
isConnecting.set(false);
cancelPongTimeout();
- if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
- sharedExecutor.execute(() -> {
- try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); }
- });
- } else {
- log.warn("[WS] 线程池已关闭,不执行重连");
+ if (isDestroyed.get() || sharedExecutor == null || sharedExecutor.isShutdown() || sharedExecutor.isTerminated()) {
+ return;
}
+ sharedExecutor.execute(() -> {
+ try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); }
+ });
}
@Override
@@ -311,29 +312,62 @@
private void checkHeartbeatTimeout() {
if (!isConnected.get()) return;
- if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing();
+ if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) {
+ log.info("[WS] 心跳超时,发送ping");
+ sendPing();
+ }
}
private void sendPing() {
try {
if (webSocketClient != null && webSocketClient.isOpen()) {
webSocketClient.send("ping");
- log.debug("[WS] 发送 ping 请求");
+ log.info("[WS] 发送ping请求");
}
- } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); }
+ } catch (Exception e) { log.warn("[WS] 发送ping失败", e); }
}
private synchronized void cancelPongTimeout() {
if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
}
+ private int reconnectAttempt = 0;
+
private void reconnectWithBackoff() throws InterruptedException {
- int attempt = 0, maxAttempts = 3;
- long delayMs = 5000;
- while (attempt < maxAttempts) {
- try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; }
+ if (!reconnecting.compareAndSet(false, true)) {
+ return;
}
- log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts);
+ try {
+ long delayMs = 5000L;
+ while (!isDestroyed.get() && !isConnected.get()) {
+ try {
+ connect();
+ for (int i = 0; i < 20 && !isDestroyed.get() && !isConnected.get(); i++) {
+ Thread.sleep(1000);
+ }
+ if (isConnected.get()) {
+ reconnectAttempt = 0;
+ log.info("[WS] 重连成功");
+ return;
+ }
+ if (isDestroyed.get()) {
+ return;
+ }
+ } catch (Exception e) {
+ log.warn("[WS] 第{}次重连失败: {}", reconnectAttempt + 1, e.getMessage());
+ }
+ reconnectAttempt++;
+ log.info("[WS] 第{}次重连等待{}秒...", reconnectAttempt, delayMs / 1000);
+ for (long waited = 0; waited < delayMs && !isDestroyed.get(); waited += 1000) {
+ Thread.sleep(1000);
+ if (isConnected.get()) return;
+ }
+ long jitter = (long) (Math.random() * 3000);
+ delayMs = Math.min(delayMs * 2, 60000L) + jitter;
+ }
+ } finally {
+ reconnecting.set(false);
+ }
}
private void shutdownExecutorGracefully(ExecutorService executor) {
diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java
index 35f9a9f..9bc24f5 100644
--- a/src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java
@@ -44,7 +44,24 @@
return isSuccess(result, "设置杠杆");
}
- public String fetchInstIdCode(String instType, String instId) {
+ public boolean cancelAllOrders(String instId) {
+ JSONObject params = new JSONObject();
+ params.put("instType", "SWAP");
+ params.put("instId", instId);
+ JSONObject result = post("/api/v5/trade/cancel-all-orders", params);
+ return isSuccess(result, "撤销所有委托单");
+ }
+
+ public boolean closeAllPositions(String instId, String mgnMode) {
+ JSONObject params = new JSONObject();
+ params.put("instType", "SWAP");
+ params.put("instId", instId);
+ params.put("mgnMode", mgnMode);
+ JSONObject result = post("/api/v5/trade/close-all-positions", params);
+ return isSuccess(result, "平仓ETH");
+ }
+
+ public Long fetchInstIdCode(String instType, String instId) {
String path = "/api/v5/account/instruments?instType=" + instType + "&instId=" + instId;
JSONObject result = get(path);
if (result == null || !"0".equals(result.getString("code"))) {
@@ -59,7 +76,7 @@
return null;
}
JSONObject first = data.getJSONObject(0);
- String instIdCode = first.getString("instIdCode");
+ Long instIdCode = first.getLong("instIdCode");
log.info("[REST] 获取instIdCode成功, instId:{}, instIdCode:{}", instId, instIdCode);
return instIdCode;
}
diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java
index d079ac8..05047a6 100644
--- a/src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java
@@ -48,13 +48,13 @@
private final String contract;
private final String marginMode;
private final String accountName;
- private final String instIdCode;
+ private final Long instIdCode;
private volatile WebSocketClient wsClient;
private final ExecutorService executor;
- public OkxTradeExecutor(String contract, String marginMode, String accountName, String instIdCode) {
+ public OkxTradeExecutor(String contract, String marginMode, String accountName, Long instIdCode) {
this.contract = contract;
this.marginMode = marginMode;
this.accountName = accountName;
diff --git a/src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java
index bee8c45..6dc10ac 100644
--- a/src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java
@@ -104,6 +104,10 @@
config.getPassphrase(),
!config.isProduction());
+ log.info("[管理器] 清理原有持仓和委托...");
+ restClient.cancelAllOrders(config.getContract());
+ restClient.closeAllPositions(config.getContract(), config.getMarginMode());
+
boolean posModeOk = restClient.setPositionMode(config.getPosMode());
if (!posModeOk) {
log.error("[管理器] 设置持仓方式失败,策略可能无法正常运作");
@@ -115,7 +119,7 @@
log.error("[管理器] 设置杠杆倍数失败,策略可能无法正常运作");
}
- String instIdCode = restClient.fetchInstIdCode("SWAP", config.getContract());
+ Long instIdCode = restClient.fetchInstIdCode("SWAP", config.getContract());
if (instIdCode != null) {
config.setInstIdCode(instIdCode);
} else {
--
Gitblit v1.9.1