package com.xcong.excoin.modules.okxApi;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
|
import lombok.extern.slf4j.Slf4j;
|
import org.java_websocket.client.WebSocketClient;
|
import org.java_websocket.handshake.ServerHandshake;
|
|
import javax.crypto.Mac;
|
import javax.crypto.spec.SecretKeySpec;
|
import java.net.URI;
|
import java.net.URISyntaxException;
|
import java.nio.charset.StandardCharsets;
|
import java.time.ZoneId;
|
import java.time.format.DateTimeFormatter;
|
import java.util.ArrayList;
|
import java.util.Base64;
|
import java.util.List;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicReference;
|
|
/**
|
* OKX WebSocket 连接管理器 — 双通道架构。
|
*
|
* <h3>与 Gate 版本的关键区别</h3>
|
* OKX 使用<b>两条独立的 WebSocket 连接</b>:
|
* <ul>
|
* <li><b>公开 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/public}):
|
* 无需认证,订阅 K 线等公开数据。</li>
|
* <li><b>私有 WS</b> ({@code wss://ws.okx.com:8443/ws/v5/private}):
|
* 需要登录认证(login 消息),订阅仓位、条件订单等私有数据。</li>
|
* </ul>
|
* 而 Gate 只有一条 WS 连接,通过签名区分公开/私有频道。
|
*
|
* <h3>登录认证(私有 WS)</h3>
|
* <pre>
|
* {
|
* "op": "login",
|
* "args": [{
|
* "apiKey": "...",
|
* "passphrase": "...",
|
* "timestamp": "1734567890", // Unix 秒级时间戳
|
* "sign": "base64(HMAC-SHA256(timestamp + 'GET' + '/users/self/verify'))"
|
* }]
|
* }
|
* </pre>
|
*
|
* <h3>心跳机制</h3>
|
* OKX 标准格式为 JSON {@code {"op":"ping"}} / {@code {"op":"pong"}},
|
* 同时兼容纯文本 {@code "ping"} / {@code "pong"} 格式。
|
*
|
* <h3>消息路由</h3>
|
* <pre>
|
* onMessage → handleMessage(message, isPrivate):
|
* 1. "pong" (纯文本) → 日志 + cancelPongTimeout
|
* 2. "ping" (纯文本) → 回复 "pong"
|
* 3. {"op":"pong"} (JSON) → 日志 + cancelPongTimeout
|
* 4. {"op":"ping"} (JSON) → 回复 {"op":"pong"}
|
* 5. {"event":"login"} → 登录成功 → 订阅所有私有 handlers
|
* 6. {"event":"subscribe"} → 标记对应 handler subscribed=true
|
* 7. {"event":"error"} → 错误日志
|
* 8. {"arg":{...}, "data":[...]} → 遍历 handlers 路由
|
* </pre>
|
*
|
* <h3>生命周期</h3>
|
* <pre>
|
* init() → connect(public) + connect(private,true) → startHeartbeat()
|
* destroy() → unsubscribe 所有 handler → closeBlocking() 两条连接 → shutdown 线程池
|
* onClose() → reconnectWithBackoff() 重连对应连接(最多 3 次,指数退避)
|
* </pre>
|
*
|
* <h3>线程安全</h3>
|
* 连接状态用 AtomicBoolean(isPublicConnected, isPrivateConnected, isConnecting, isInitialized)。
|
* 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
|
*
|
* @author Administrator
|
*/
|
@SuppressWarnings("ALL")
|
@Slf4j
|
public class OkxKlineWebSocketClient {
|
|
// ==================== 常量 ====================
|
|
/** 心跳超时时间(秒) */
|
private static final int HEARTBEAT_TIMEOUT = 10;
|
|
/** ISO 8601 时间格式化器(毫秒精度,UTC 时区) */
|
private static final DateTimeFormatter ISO_8601_FORMATTER =
|
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
|
.withZone(ZoneId.of("UTC"));
|
|
// ==================== 配置 ====================
|
|
/** OKX 配置(提供 WS URL、API 密钥等) */
|
private final OkxConfig config;
|
|
/** OKX API Key */
|
private final String apiKey;
|
|
/** OKX API Secret */
|
private final String apiSecret;
|
|
/** OKX API Passphrase */
|
private final String passphrase;
|
|
// ==================== WebSocket 客户端 ====================
|
|
/** 公开频道 WebSocket 客户端(K线等) */
|
private WebSocketClient publicWsClient;
|
|
/** 私有频道 WebSocket 客户端(仓位、条件单等) */
|
private WebSocketClient privateWsClient;
|
|
/** 心跳检测调度器 */
|
private ScheduledExecutorService heartbeatExecutor;
|
|
/** 心跳超时 Future */
|
private volatile ScheduledFuture<?> pongTimeoutFuture;
|
|
/** 最后收到消息的时间戳(毫秒) */
|
private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
|
|
// ==================== 连接状态 ====================
|
|
/** 公开频道连接状态 */
|
private final AtomicBoolean isPublicConnected = new AtomicBoolean(false);
|
|
/** 私有频道连接状态 */
|
private final AtomicBoolean isPrivateConnected = new AtomicBoolean(false);
|
|
/** 连接中标记,防重入 */
|
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
|
|
/** 初始化标记,防重复 init */
|
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
|
|
/** 私有频道登录成功标记 */
|
private final AtomicBoolean isPrivateLoggedIn = new AtomicBoolean(false);
|
|
// ==================== 频道处理器 ====================
|
|
/** 公开频道处理器列表(如 K线) */
|
private final List<OkxChannelHandler> publicHandlers = new ArrayList<>();
|
|
/** 私有频道处理器列表(如 仓位、条件单) */
|
private final List<OkxChannelHandler> privateHandlers = new ArrayList<>();
|
|
// ==================== 异步线程池 ====================
|
|
/** 重连等异步任务的缓存线程池(daemon 线程) */
|
private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "okx-ws-worker");
|
t.setDaemon(true);
|
return t;
|
});
|
|
// ==================== 重连配置 ====================
|
|
/** 重连最大次数 */
|
private static final int MAX_RECONNECT_ATTEMPTS = 3;
|
|
/** 重连初始延迟(毫秒) */
|
private static final long INITIAL_RECONNECT_DELAY_MS = 5000;
|
|
// ==================== 构造器 ====================
|
|
/**
|
* 构造 OKX WebSocket 客户端。
|
*
|
* @param config OKX 配置(提供 WS URL、API 密钥等)
|
*/
|
public OkxKlineWebSocketClient(OkxConfig config) {
|
this.config = config;
|
this.apiKey = config.getApiKey();
|
this.apiSecret = config.getApiSecret();
|
this.passphrase = config.getPassphrase();
|
}
|
|
// ==================== Handler 注册 ====================
|
|
/**
|
* 注册公开频道处理器(如 K线)。需在 init() 前调用。
|
*
|
* @param handler 实现了 {@link OkxChannelHandler} 接口的公开频道处理器
|
*/
|
public void addPublicHandler(OkxChannelHandler handler) {
|
publicHandlers.add(handler);
|
log.info("[OKX-WS] 注册公开频道处理器: {}", handler.getChannelName());
|
}
|
|
/**
|
* 注册私有频道处理器(如 仓位、条件单)。需在 init() 前调用。
|
*
|
* @param handler 实现了 {@link OkxChannelHandler} 接口的私有频道处理器
|
*/
|
public void addPrivateHandler(OkxChannelHandler handler) {
|
privateHandlers.add(handler);
|
log.info("[OKX-WS] 注册私有频道处理器: {}", handler.getChannelName());
|
}
|
|
// ==================== 生命周期 ====================
|
|
/**
|
* 初始化:建立公开 WS 连接 + 私有 WS 连接 → 启动心跳检测。
|
* 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。
|
*/
|
public void init() {
|
if (!isInitialized.compareAndSet(false, true)) {
|
log.warn("[OKX-WS] 已初始化过,跳过重复初始化");
|
return;
|
}
|
connect(false); // 公开 WS
|
connect(true); // 私有 WS
|
startHeartbeat();
|
}
|
|
/**
|
* 销毁:取消所有频道订阅 → 关闭两条 WebSocket 连接 → 关闭线程池。
|
*
|
* <h3>执行顺序</h3>
|
* 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接,
|
* 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。
|
*/
|
public void destroy() {
|
log.info("[OKX-WS] 开始销毁...");
|
|
// 取消公开频道订阅
|
if (publicWsClient != null && publicWsClient.isOpen()) {
|
for (OkxChannelHandler handler : publicHandlers) {
|
handler.unsubscribe(publicWsClient);
|
}
|
}
|
// 取消私有频道订阅
|
if (privateWsClient != null && privateWsClient.isOpen()) {
|
for (OkxChannelHandler handler : privateHandlers) {
|
handler.unsubscribe(privateWsClient);
|
}
|
}
|
|
// 等待取消订阅消息发出
|
try {
|
Thread.sleep(500);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.warn("[OKX-WS] 取消订阅等待被中断");
|
}
|
|
// 关闭公开 WS
|
closeWebSocket(publicWsClient);
|
publicWsClient = null;
|
|
// 关闭私有 WS
|
closeWebSocket(privateWsClient);
|
privateWsClient = null;
|
|
// 关闭心跳
|
shutdownExecutorGracefully(heartbeatExecutor);
|
if (pongTimeoutFuture != null) {
|
pongTimeoutFuture.cancel(true);
|
}
|
|
// 关闭共享线程池
|
shutdownExecutorGracefully(sharedExecutor);
|
|
log.info("[OKX-WS] 销毁完成");
|
}
|
|
/**
|
* 安全关闭 WebSocket 连接。
|
*/
|
private void closeWebSocket(WebSocketClient ws) {
|
if (ws != null && ws.isOpen()) {
|
try {
|
ws.closeBlocking();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.warn("[OKX-WS] 关闭连接时被中断");
|
}
|
}
|
}
|
|
// ==================== 连接管理 ====================
|
|
/**
|
* 建立 WebSocket 连接。
|
*
|
* <h3>公开 WS 连接成功回调</h3>
|
* 订阅所有公开 handlers(K线等)。
|
*
|
* <h3>私有 WS 连接成功回调</h3>
|
* 先发送 login 认证消息,登录成功后再订阅所有私有 handlers。
|
*
|
* <h3>连接关闭回调</h3>
|
* 设置断连状态 → 异步触发指数退避重连(最多3次)。
|
*
|
* @param isPrivate true=私有 WS(需登录),false=公开 WS
|
*/
|
private void connect(boolean isPrivate) {
|
String wsUrl = isPrivate ? config.getWsPrivateUrl() : config.getWsPublicUrl();
|
String label = isPrivate ? "私有" : "公开";
|
|
if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
|
log.info("[OKX-WS] 连接进行中,跳过重复{} WS请求", label);
|
return;
|
}
|
try {
|
SSLConfig.configureSSL();
|
System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
|
URI uri = new URI(wsUrl);
|
|
WebSocketClient client = new WebSocketClient(uri) {
|
@Override
|
public void onOpen(ServerHandshake handshake) {
|
log.info("[OKX-WS] {} WS连接成功", label);
|
isConnecting.set(false);
|
|
if (isPrivate) {
|
isPrivateConnected.set(true);
|
// 私有 WS 需要先登录
|
sendLogin(this);
|
} else {
|
isPublicConnected.set(true);
|
// 公开 WS 直接订阅
|
if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
|
resetHeartbeatTimer();
|
for (OkxChannelHandler handler : publicHandlers) {
|
handler.subscribe(this);
|
}
|
} else {
|
log.warn("[OKX-WS] 应用正在关闭,忽略{} WS连接成功回调", label);
|
}
|
}
|
}
|
|
@Override
|
public void onMessage(String message) {
|
lastMessageTime.set(System.currentTimeMillis());
|
handleMessage(message, isPrivate, this);
|
resetHeartbeatTimer();
|
}
|
|
@Override
|
public void onClose(int code, String reason, boolean remote) {
|
log.warn("[OKX-WS] {} WS连接关闭, code:{}, reason:{}, remote:{}", label, code, reason, remote);
|
if (isPrivate) {
|
isPrivateConnected.set(false);
|
isPrivateLoggedIn.set(false);
|
} else {
|
isPublicConnected.set(false);
|
}
|
isConnecting.set(false);
|
cancelPongTimeout();
|
|
if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
|
sharedExecutor.execute(() -> {
|
try {
|
reconnectWithBackoff(isPrivate);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
} catch (Exception e) {
|
log.error("[OKX-WS] {} WS重连失败", label, e);
|
}
|
});
|
} else {
|
log.warn("[OKX-WS] 线程池已关闭,不执行{} WS重连", label);
|
}
|
}
|
|
@Override
|
public void onError(Exception ex) {
|
log.error("[OKX-WS] {} WS发生错误", label, ex);
|
if (isPrivate) {
|
isPrivateConnected.set(false);
|
} else {
|
isPublicConnected.set(false);
|
}
|
}
|
};
|
client.setConnectionLostTimeout(0);
|
client.connect();
|
|
if (isPrivate) {
|
this.privateWsClient = client;
|
} else {
|
this.publicWsClient = client;
|
}
|
} catch (URISyntaxException e) {
|
log.error("[OKX-WS] URI格式错误: {}", wsUrl, e);
|
isConnecting.set(false);
|
}
|
}
|
|
// ==================== 登录认证 ====================
|
|
/**
|
* 发送 OKX 私有 WS 登录消息。
|
*
|
* <h3>签名算法</h3>
|
* <pre>
|
* timestamp = ISO 8601 当前时间 (UTC, 毫秒精度)
|
* message = timestamp + "GET" + "/users/self/verify" + ""
|
* sign = Base64(HMAC-SHA256(apiSecret, message))
|
* </pre>
|
*
|
* <h3>登录消息格式</h3>
|
* <pre>
|
* {
|
* "op": "login",
|
* "args": [{
|
* "apiKey": "...",
|
* "passphrase": "...",
|
* "timestamp": "2023-01-01T00:00:00.000Z",
|
* "sign": "..."
|
* }]
|
* }
|
* </pre>
|
*
|
* @param ws 私有频道 WebSocket 客户端
|
*/
|
private void sendLogin(WebSocketClient ws) {
|
try {
|
// OKX WS 登录必须使用 Unix 秒级时间戳(非 ISO 8601!)
|
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
|
String message = timestamp + "GET" + "/users/self/verify";
|
String sign = hmacSha256Base64(apiSecret, message);
|
|
JSONObject loginMsg = new JSONObject();
|
loginMsg.put("op", "login");
|
|
JSONArray args = new JSONArray();
|
JSONObject arg = new JSONObject();
|
arg.put("apiKey", apiKey);
|
arg.put("passphrase", passphrase);
|
arg.put("timestamp", timestamp);
|
arg.put("sign", sign);
|
args.add(arg);
|
loginMsg.put("args", args);
|
|
ws.send(loginMsg.toJSONString());
|
log.info("[OKX-WS] 发送登录消息, timestamp: {}", timestamp);
|
} catch (Exception e) {
|
log.error("[OKX-WS] 发送登录消息失败", e);
|
}
|
}
|
|
/**
|
* HMAC-SHA256 签名并 Base64 编码。
|
*
|
* @param secret 密钥
|
* @param message 待签名消息
|
* @return Base64 编码的签名字符串
|
*/
|
private String hmacSha256Base64(String secret, String message) {
|
try {
|
Mac mac = Mac.getInstance("HmacSHA256");
|
SecretKeySpec spec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
|
mac.init(spec);
|
byte[] hash = mac.doFinal(message.getBytes(StandardCharsets.UTF_8));
|
return Base64.getEncoder().encodeToString(hash);
|
} catch (Exception e) {
|
log.error("[OKX-WS] HMAC-SHA256签名失败", e);
|
return "";
|
}
|
}
|
|
// ==================== 消息路由 ====================
|
|
/**
|
* 消息分发:先处理系统事件(ping/pong/login/subscribe/error),
|
* 再把数据推送路由到对应的 channelHandler。
|
*
|
* <h3>路由规则</h3>
|
* <ol>
|
* <li>"pong" → 日志(忽略)</li>
|
* <li>"ping" → 回复 "pong"</li>
|
* <li>{"event":"login"} → 登录成功 → 订阅所有私有 handlers</li>
|
* <li>{"event":"subscribe"} → 标记对应 handler subscribed=true</li>
|
* <li>{"event":"error"} → 错误日志</li>
|
* <li>{"arg":{...}, "data":[...]} → 遍历 handlers 路由</li>
|
* </ol>
|
*
|
* @param message 原始消息文本
|
* @param isPrivate true=私有频道消息,false=公开频道消息
|
* @param ws 接收消息的 WebSocket 客户端
|
*/
|
private void handleMessage(String message, boolean isPrivate, WebSocketClient ws) {
|
try {
|
// OKX ping/pong 混合格式兼容:JSON {"op":"ping"} 与纯文本 "ping" 均支持
|
if ("pong".equals(message)) {
|
log.debug("[OKX-WS] 收到 pong 响应(纯文本)");
|
cancelPongTimeout();
|
return;
|
}
|
if ("ping".equals(message)) {
|
log.debug("[OKX-WS] 收到 ping(纯文本),回复 pong");
|
if (ws != null && ws.isOpen()) {
|
ws.send("pong");
|
}
|
return;
|
}
|
|
JSONObject response = JSON.parseObject(message);
|
|
// JSON 格式的 ping/pong (OKX 文档标准格式)
|
String op = response.getString("op");
|
if ("pong".equals(op)) {
|
log.debug("[OKX-WS] 收到 pong 响应");
|
cancelPongTimeout();
|
return;
|
}
|
if ("ping".equals(op)) {
|
log.debug("[OKX-WS] 收到 ping,回复 pong");
|
if (ws != null && ws.isOpen()) {
|
ws.send("{\"op\":\"pong\"}");
|
}
|
return;
|
}
|
|
// 登录响应
|
String event = response.getString("event");
|
if ("login".equals(event)) {
|
String code = response.getString("code");
|
if ("0".equals(code)) {
|
log.info("[OKX-WS] 私有频道登录成功");
|
isPrivateLoggedIn.set(true);
|
// 登录成功后订阅所有私有频道
|
for (OkxChannelHandler handler : privateHandlers) {
|
handler.subscribe(ws);
|
}
|
} else {
|
log.error("[OKX-WS] 私有频道登录失败, code:{}, msg:{}",
|
code, response.getString("msg"));
|
}
|
return;
|
}
|
|
// 订阅确认
|
if ("subscribe".equals(event)) {
|
JSONObject arg = response.getJSONObject("arg");
|
if (arg != null) {
|
String channel = arg.getString("channel");
|
log.info("[OKX-WS] 订阅成功: {}", channel);
|
List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
|
for (OkxChannelHandler handler : handlers) {
|
if (channel.equals(handler.getChannelName())) {
|
handler.setSubscribed(true);
|
break;
|
}
|
}
|
}
|
return;
|
}
|
|
// 取消订阅确认
|
if ("unsubscribe".equals(event)) {
|
JSONObject arg = response.getJSONObject("arg");
|
log.info("[OKX-WS] 取消订阅成功: {}",
|
arg != null ? arg.getString("channel") : "unknown");
|
return;
|
}
|
|
// 错误
|
if ("error".equals(event)) {
|
log.error("[OKX-WS] 错误, code:{}, msg:{}",
|
response.getString("code"), response.getString("msg"));
|
return;
|
}
|
|
// 数据推送: {"arg":{"channel":"positions",...}, "data":[...]}
|
JSONObject arg = response.getJSONObject("arg");
|
if (arg != null && response.getJSONArray("data") != null) {
|
String channel = arg.getString("channel");
|
if (channel == null) {
|
return;
|
}
|
List<OkxChannelHandler> handlers = isPrivate ? privateHandlers : publicHandlers;
|
for (OkxChannelHandler handler : handlers) {
|
if (handler.handleMessage(response)) {
|
return;
|
}
|
}
|
}
|
} catch (Exception e) {
|
log.error("[OKX-WS] 处理消息失败: {}", message, e);
|
}
|
}
|
|
// ==================== 订阅状态检查 ====================
|
|
/**
|
* 检查所有已注册的频道是否都已收到订阅成功确认。
|
* 同时检查公开和私有频道的 handlers。
|
*
|
* @return true 如果所有 handlers 都已订阅确认
|
*/
|
public boolean areAllSubscribed() {
|
List<OkxChannelHandler> allHandlers = new ArrayList<>();
|
allHandlers.addAll(publicHandlers);
|
allHandlers.addAll(privateHandlers);
|
|
if (allHandlers.isEmpty()) {
|
return false;
|
}
|
for (OkxChannelHandler h : allHandlers) {
|
if (!h.isSubscribed()) {
|
return false;
|
}
|
}
|
return true;
|
}
|
|
// ==================== 心跳机制 ====================
|
|
/**
|
* 启动心跳检测器。
|
* 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。
|
*/
|
private void startHeartbeat() {
|
if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) {
|
heartbeatExecutor.shutdownNow();
|
}
|
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
|
Thread t = new Thread(r, "okx-ws-heartbeat");
|
t.setDaemon(true);
|
return t;
|
});
|
heartbeatExecutor.scheduleWithFixedDelay(
|
this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
|
}
|
|
/**
|
* 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。
|
*/
|
private synchronized void resetHeartbeatTimer() {
|
cancelPongTimeout();
|
if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
|
pongTimeoutFuture = heartbeatExecutor.schedule(
|
this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
|
}
|
}
|
|
/**
|
* 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 ping。
|
* OKX 服务端收到 ping 后会回复 pong。
|
*/
|
private void checkHeartbeatTimeout() {
|
boolean isAnyConnected = isPublicConnected.get() || isPrivateConnected.get();
|
if (!isAnyConnected) {
|
return;
|
}
|
long elapsed = System.currentTimeMillis() - lastMessageTime.get();
|
if (elapsed >= HEARTBEAT_TIMEOUT * 1000L) {
|
log.debug("[OKX-WS] 心跳超时 {}ms, 主动发送ping", elapsed);
|
sendPing();
|
}
|
}
|
|
/**
|
* 向两条 WS 连接主动发送 ping(OKX 文档标准 JSON 格式)。
|
*/
|
private void sendPing() {
|
try {
|
String pingMsg = "{\"op\":\"ping\"}";
|
if (publicWsClient != null && publicWsClient.isOpen()) {
|
publicWsClient.send(pingMsg);
|
}
|
if (privateWsClient != null && privateWsClient.isOpen()) {
|
privateWsClient.send(pingMsg);
|
}
|
} catch (Exception e) {
|
log.warn("[OKX-WS] 发送ping失败", e);
|
}
|
}
|
|
/**
|
* 取消心跳超时检测任务。
|
*/
|
private synchronized void cancelPongTimeout() {
|
if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) {
|
pongTimeoutFuture.cancel(true);
|
}
|
}
|
|
// ==================== 重连机制 ====================
|
|
/**
|
* 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。
|
*
|
* @param isPrivate true=重连私有 WS,false=重连公开 WS
|
* @throws InterruptedException 线程被中断
|
*/
|
private void reconnectWithBackoff(boolean isPrivate) throws InterruptedException {
|
String label = isPrivate ? "私有" : "公开";
|
int attempt = 0;
|
long delayMs = INITIAL_RECONNECT_DELAY_MS;
|
|
while (attempt < MAX_RECONNECT_ATTEMPTS) {
|
try {
|
Thread.sleep(delayMs);
|
connect(isPrivate);
|
log.info("[OKX-WS] {} WS第{}次重连成功", label, attempt + 1);
|
return;
|
} catch (Exception e) {
|
log.warn("[OKX-WS] {} WS第{}次重连失败", label, attempt + 1, e);
|
delayMs *= 2;
|
attempt++;
|
}
|
}
|
log.error("[OKX-WS] {} WS超过最大重试次数({}),放弃重连", label, MAX_RECONNECT_ATTEMPTS);
|
}
|
|
// ==================== 工具方法 ====================
|
|
/**
|
* 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。
|
*
|
* @param executor 需要关闭的线程池
|
*/
|
private void shutdownExecutorGracefully(ExecutorService executor) {
|
if (executor == null || executor.isTerminated()) {
|
return;
|
}
|
try {
|
executor.shutdown();
|
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
|
executor.shutdownNow();
|
}
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
executor.shutdownNow();
|
}
|
}
|
|
// ==================== 状态查询 ====================
|
|
/** @return 公开 WS 是否已连接 */
|
public boolean isPublicConnected() {
|
return isPublicConnected.get();
|
}
|
|
/** @return 私有 WS 是否已连接并登录成功 */
|
public boolean isPrivateConnected() {
|
return isPrivateConnected.get() && isPrivateLoggedIn.get();
|
}
|
}
|