package com.xcong.excoin.modules.okxApi;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* OKX WebSocket 连接管理器 — 双通道架构。
*
*
与 Gate 版本的关键区别
* OKX 使用两条独立的 WebSocket 连接:
*
* - 公开 WS ({@code wss://ws.okx.com:8443/ws/v5/public}):
* 无需认证,订阅 K 线等公开数据。
* - 私有 WS ({@code wss://ws.okx.com:8443/ws/v5/private}):
* 需要登录认证(login 消息),订阅仓位、条件订单等私有数据。
*
* 而 Gate 只有一条 WS 连接,通过签名区分公开/私有频道。
*
* 登录认证(私有 WS)
*
* {
* "op": "login",
* "args": [{
* "apiKey": "...",
* "passphrase": "...",
* "timestamp": "1734567890", // Unix 秒级时间戳
* "sign": "base64(HMAC-SHA256(timestamp + 'GET' + '/users/self/verify'))"
* }]
* }
*
*
* 心跳机制
* OKX 标准格式为 JSON {@code {"op":"ping"}} / {@code {"op":"pong"}},
* 同时兼容纯文本 {@code "ping"} / {@code "pong"} 格式。
*
* 消息路由
*
* onMessage → handleMessage(message, isPrivate):
* 1. "pong" (纯文本) → 日志 + cancelPongTimeout
* 2. "ping" (纯文本) → 回复 "pong"
* 3. {"op":"pong"} (JSON) → 日志 + cancelPongTimeout
* 4. {"op":"ping"} (JSON) → 回复 {"op":"pong"}
* 5. {"event":"login"} → 登录成功 → 订阅所有私有 handlers
* 6. {"event":"subscribe"} → 标记对应 handler subscribed=true
* 7. {"event":"error"} → 错误日志
* 8. {"arg":{...}, "data":[...]} → 遍历 handlers 路由
*
*
* 生命周期
*
* init() → connect(public) + connect(private,true) → startHeartbeat()
* destroy() → unsubscribe 所有 handler → closeBlocking() 两条连接 → shutdown 线程池
* onClose() → reconnectWithBackoff() 重连对应连接(最多 3 次,指数退避)
*
*
* 线程安全
* 连接状态用 AtomicBoolean(isPublicConnected, isPrivateConnected, isConnecting, isInitialized)。
* 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
*
* @author Administrator
*/
@SuppressWarnings("ALL")
@Slf4j
public class OkxKlineWebSocketClient {
// ==================== 常量 ====================
/** 心跳超时时间(秒) */
private static final int HEARTBEAT_TIMEOUT = 10;
/** ISO 8601 时间格式化器(毫秒精度,UTC 时区) */
private static final DateTimeFormatter ISO_8601_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
.withZone(ZoneId.of("UTC"));
// ==================== 配置 ====================
/** OKX 配置(提供 WS URL、API 密钥等) */
private final OkxConfig config;
/** OKX API Key */
private final String apiKey;
/** OKX API Secret */
private final String apiSecret;
/** OKX API Passphrase */
private final String passphrase;
// ==================== WebSocket 客户端 ====================
/** 公开频道 WebSocket 客户端(K线等) */
private WebSocketClient publicWsClient;
/** 私有频道 WebSocket 客户端(仓位、条件单等) */
private WebSocketClient privateWsClient;
/** 心跳检测调度器 */
private ScheduledExecutorService heartbeatExecutor;
/** 心跳超时 Future */
private volatile ScheduledFuture> pongTimeoutFuture;
/** 最后收到消息的时间戳(毫秒) */
private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
// ==================== 连接状态 ====================
/** 公开频道连接状态 */
private final AtomicBoolean isPublicConnected = new AtomicBoolean(false);
/** 私有频道连接状态 */
private final AtomicBoolean isPrivateConnected = new AtomicBoolean(false);
/** 连接中标记,防重入 */
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
/** 初始化标记,防重复 init */
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
/** 私有频道登录成功标记 */
private final AtomicBoolean isPrivateLoggedIn = new AtomicBoolean(false);
// ==================== 频道处理器 ====================
/** 公开频道处理器列表(如 K线) */
private final List publicHandlers = new ArrayList<>();
/** 私有频道处理器列表(如 仓位、条件单) */
private final List privateHandlers = new ArrayList<>();
// ==================== 异步线程池 ====================
/** 重连等异步任务的缓存线程池(daemon 线程) */
private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "okx-ws-worker");
t.setDaemon(true);
return t;
});
// ==================== 重连配置 ====================
/** 重连最大次数 */
private static final int MAX_RECONNECT_ATTEMPTS = 3;
/** 重连初始延迟(毫秒) */
private static final long INITIAL_RECONNECT_DELAY_MS = 5000;
// ==================== 构造器 ====================
/**
* 构造 OKX WebSocket 客户端。
*
* @param config OKX 配置(提供 WS URL、API 密钥等)
*/
public OkxKlineWebSocketClient(OkxConfig config) {
this.config = config;
this.apiKey = config.getApiKey();
this.apiSecret = config.getApiSecret();
this.passphrase = config.getPassphrase();
}
// ==================== Handler 注册 ====================
/**
* 注册公开频道处理器(如 K线)。需在 init() 前调用。
*
* @param handler 实现了 {@link OkxChannelHandler} 接口的公开频道处理器
*/
public void addPublicHandler(OkxChannelHandler handler) {
publicHandlers.add(handler);
log.info("[OKX-WS] 注册公开频道处理器: {}", handler.getChannelName());
}
/**
* 注册私有频道处理器(如 仓位、条件单)。需在 init() 前调用。
*
* @param handler 实现了 {@link OkxChannelHandler} 接口的私有频道处理器
*/
public void addPrivateHandler(OkxChannelHandler handler) {
privateHandlers.add(handler);
log.info("[OKX-WS] 注册私有频道处理器: {}", handler.getChannelName());
}
// ==================== 生命周期 ====================
/**
* 初始化:建立公开 WS 连接 + 私有 WS 连接 → 启动心跳检测。
* 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。
*/
public void init() {
if (!isInitialized.compareAndSet(false, true)) {
log.warn("[OKX-WS] 已初始化过,跳过重复初始化");
return;
}
connect(false); // 公开 WS
connect(true); // 私有 WS
startHeartbeat();
}
/**
* 销毁:取消所有频道订阅 → 关闭两条 WebSocket 连接 → 关闭线程池。
*
* 执行顺序
* 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接,
* 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。
*/
public void destroy() {
log.info("[OKX-WS] 开始销毁...");
// 取消公开频道订阅
if (publicWsClient != null && publicWsClient.isOpen()) {
for (OkxChannelHandler handler : publicHandlers) {
handler.unsubscribe(publicWsClient);
}
}
// 取消私有频道订阅
if (privateWsClient != null && privateWsClient.isOpen()) {
for (OkxChannelHandler handler : privateHandlers) {
handler.unsubscribe(privateWsClient);
}
}
// 等待取消订阅消息发出
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[OKX-WS] 取消订阅等待被中断");
}
// 关闭公开 WS
closeWebSocket(publicWsClient);
publicWsClient = null;
// 关闭私有 WS
closeWebSocket(privateWsClient);
privateWsClient = null;
// 关闭心跳
shutdownExecutorGracefully(heartbeatExecutor);
if (pongTimeoutFuture != null) {
pongTimeoutFuture.cancel(true);
}
// 关闭共享线程池
shutdownExecutorGracefully(sharedExecutor);
log.info("[OKX-WS] 销毁完成");
}
/**
* 安全关闭 WebSocket 连接。
*/
private void closeWebSocket(WebSocketClient ws) {
if (ws != null && ws.isOpen()) {
try {
ws.closeBlocking();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[OKX-WS] 关闭连接时被中断");
}
}
}
// ==================== 连接管理 ====================
/**
* 建立 WebSocket 连接。
*
* 公开 WS 连接成功回调
* 订阅所有公开 handlers(K线等)。
*
* 私有 WS 连接成功回调
* 先发送 login 认证消息,登录成功后再订阅所有私有 handlers。
*
* 连接关闭回调
* 设置断连状态 → 异步触发指数退避重连(最多3次)。
*
* @param isPrivate true=私有 WS(需登录),false=公开 WS
*/
private void connect(boolean isPrivate) {
String wsUrl = isPrivate ? config.getWsPrivateUrl() : config.getWsPublicUrl();
String label = isPrivate ? "私有" : "公开";
if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
log.info("[OKX-WS] 连接进行中,跳过重复{} WS请求", label);
return;
}
try {
SSLConfig.configureSSL();
System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
URI uri = new URI(wsUrl);
WebSocketClient client = new WebSocketClient(uri) {
@Override
public void onOpen(ServerHandshake handshake) {
log.info("[OKX-WS] {} WS连接成功", label);
isConnecting.set(false);
if (isPrivate) {
isPrivateConnected.set(true);
// 私有 WS 需要先登录
sendLogin(this);
} else {
isPublicConnected.set(true);
// 公开 WS 直接订阅
if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
resetHeartbeatTimer();
for (OkxChannelHandler handler : publicHandlers) {
handler.subscribe(this);
}
} else {
log.warn("[OKX-WS] 应用正在关闭,忽略{} WS连接成功回调", label);
}
}
}
@Override
public void onMessage(String message) {
lastMessageTime.set(System.currentTimeMillis());
handleMessage(message, isPrivate, this);
resetHeartbeatTimer();
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.warn("[OKX-WS] {} WS连接关闭, code:{}, reason:{}, remote:{}", label, code, reason, remote);
if (isPrivate) {
isPrivateConnected.set(false);
isPrivateLoggedIn.set(false);
} else {
isPublicConnected.set(false);
}
isConnecting.set(false);
cancelPongTimeout();
if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
sharedExecutor.execute(() -> {
try {
reconnectWithBackoff(isPrivate);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("[OKX-WS] {} WS重连失败", label, e);
}
});
} else {
log.warn("[OKX-WS] 线程池已关闭,不执行{} WS重连", label);
}
}
@Override
public void onError(Exception ex) {
log.error("[OKX-WS] {} WS发生错误", label, ex);
if (isPrivate) {
isPrivateConnected.set(false);
} else {
isPublicConnected.set(false);
}
}
};
client.setConnectionLostTimeout(0);
client.connect();
if (isPrivate) {
this.privateWsClient = client;
} else {
this.publicWsClient = client;
}
} catch (URISyntaxException e) {
log.error("[OKX-WS] URI格式错误: {}", wsUrl, e);
isConnecting.set(false);
}
}
// ==================== 登录认证 ====================
/**
* 发送 OKX 私有 WS 登录消息。
*
* 签名算法
*
* timestamp = ISO 8601 当前时间 (UTC, 毫秒精度)
* message = timestamp + "GET" + "/users/self/verify" + ""
* sign = Base64(HMAC-SHA256(apiSecret, message))
*
*
* 登录消息格式
*
* {
* "op": "login",
* "args": [{
* "apiKey": "...",
* "passphrase": "...",
* "timestamp": "2023-01-01T00:00:00.000Z",
* "sign": "..."
* }]
* }
*
*
* @param ws 私有频道 WebSocket 客户端
*/
private void sendLogin(WebSocketClient ws) {
try {
// OKX WS 登录必须使用 Unix 秒级时间戳(非 ISO 8601!)
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
String message = timestamp + "GET" + "/users/self/verify";
String sign = hmacSha256Base64(apiSecret, message);
JSONObject loginMsg = new JSONObject();
loginMsg.put("op", "login");
JSONArray args = new JSONArray();
JSONObject arg = new JSONObject();
arg.put("apiKey", apiKey);
arg.put("passphrase", passphrase);
arg.put("timestamp", timestamp);
arg.put("sign", sign);
args.add(arg);
loginMsg.put("args", args);
ws.send(loginMsg.toJSONString());
log.info("[OKX-WS] 发送登录消息, timestamp: {}", timestamp);
} catch (Exception e) {
log.error("[OKX-WS] 发送登录消息失败", e);
}
}
/**
* HMAC-SHA256 签名并 Base64 编码。
*
* @param secret 密钥
* @param message 待签名消息
* @return Base64 编码的签名字符串
*/
private String hmacSha256Base64(String secret, String message) {
try {
Mac mac = Mac.getInstance("HmacSHA256");
SecretKeySpec spec = new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
mac.init(spec);
byte[] hash = mac.doFinal(message.getBytes(StandardCharsets.UTF_8));
return Base64.getEncoder().encodeToString(hash);
} catch (Exception e) {
log.error("[OKX-WS] HMAC-SHA256签名失败", e);
return "";
}
}
// ==================== 消息路由 ====================
/**
* 消息分发:先处理系统事件(ping/pong/login/subscribe/error),
* 再把数据推送路由到对应的 channelHandler。
*
* 路由规则
*
* - "pong" → 日志(忽略)
* - "ping" → 回复 "pong"
* - {"event":"login"} → 登录成功 → 订阅所有私有 handlers
* - {"event":"subscribe"} → 标记对应 handler subscribed=true
* - {"event":"error"} → 错误日志
* - {"arg":{...}, "data":[...]} → 遍历 handlers 路由
*
*
* @param message 原始消息文本
* @param isPrivate true=私有频道消息,false=公开频道消息
* @param ws 接收消息的 WebSocket 客户端
*/
private void handleMessage(String message, boolean isPrivate, WebSocketClient ws) {
try {
// OKX ping/pong 混合格式兼容:JSON {"op":"ping"} 与纯文本 "ping" 均支持
if ("pong".equals(message)) {
log.debug("[OKX-WS] 收到 pong 响应(纯文本)");
cancelPongTimeout();
return;
}
if ("ping".equals(message)) {
log.debug("[OKX-WS] 收到 ping(纯文本),回复 pong");
if (ws != null && ws.isOpen()) {
ws.send("pong");
}
return;
}
JSONObject response = JSON.parseObject(message);
// JSON 格式的 ping/pong (OKX 文档标准格式)
String op = response.getString("op");
if ("pong".equals(op)) {
log.debug("[OKX-WS] 收到 pong 响应");
cancelPongTimeout();
return;
}
if ("ping".equals(op)) {
log.debug("[OKX-WS] 收到 ping,回复 pong");
if (ws != null && ws.isOpen()) {
ws.send("{\"op\":\"pong\"}");
}
return;
}
// 登录响应
String event = response.getString("event");
if ("login".equals(event)) {
String code = response.getString("code");
if ("0".equals(code)) {
log.info("[OKX-WS] 私有频道登录成功");
isPrivateLoggedIn.set(true);
// 登录成功后订阅所有私有频道
for (OkxChannelHandler handler : privateHandlers) {
handler.subscribe(ws);
}
} else {
log.error("[OKX-WS] 私有频道登录失败, code:{}, msg:{}",
code, response.getString("msg"));
}
return;
}
// 订阅确认
if ("subscribe".equals(event)) {
JSONObject arg = response.getJSONObject("arg");
if (arg != null) {
String channel = arg.getString("channel");
log.info("[OKX-WS] 订阅成功: {}", channel);
List handlers = isPrivate ? privateHandlers : publicHandlers;
for (OkxChannelHandler handler : handlers) {
if (channel.equals(handler.getChannelName())) {
handler.setSubscribed(true);
break;
}
}
}
return;
}
// 取消订阅确认
if ("unsubscribe".equals(event)) {
JSONObject arg = response.getJSONObject("arg");
log.info("[OKX-WS] 取消订阅成功: {}",
arg != null ? arg.getString("channel") : "unknown");
return;
}
// 错误
if ("error".equals(event)) {
log.error("[OKX-WS] 错误, code:{}, msg:{}",
response.getString("code"), response.getString("msg"));
return;
}
// 数据推送: {"arg":{"channel":"positions",...}, "data":[...]}
JSONObject arg = response.getJSONObject("arg");
if (arg != null && response.getJSONArray("data") != null) {
String channel = arg.getString("channel");
if (channel == null) {
return;
}
List handlers = isPrivate ? privateHandlers : publicHandlers;
for (OkxChannelHandler handler : handlers) {
if (handler.handleMessage(response)) {
return;
}
}
}
} catch (Exception e) {
log.error("[OKX-WS] 处理消息失败: {}", message, e);
}
}
// ==================== 订阅状态检查 ====================
/**
* 检查所有已注册的频道是否都已收到订阅成功确认。
* 同时检查公开和私有频道的 handlers。
*
* @return true 如果所有 handlers 都已订阅确认
*/
public boolean areAllSubscribed() {
List allHandlers = new ArrayList<>();
allHandlers.addAll(publicHandlers);
allHandlers.addAll(privateHandlers);
if (allHandlers.isEmpty()) {
return false;
}
for (OkxChannelHandler h : allHandlers) {
if (!h.isSubscribed()) {
return false;
}
}
return true;
}
// ==================== 心跳机制 ====================
/**
* 启动心跳检测器。
* 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。
*/
private void startHeartbeat() {
if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) {
heartbeatExecutor.shutdownNow();
}
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "okx-ws-heartbeat");
t.setDaemon(true);
return t;
});
heartbeatExecutor.scheduleWithFixedDelay(
this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
}
/**
* 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。
*/
private synchronized void resetHeartbeatTimer() {
cancelPongTimeout();
if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
pongTimeoutFuture = heartbeatExecutor.schedule(
this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
}
}
/**
* 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 ping。
* OKX 服务端收到 ping 后会回复 pong。
*/
private void checkHeartbeatTimeout() {
boolean isAnyConnected = isPublicConnected.get() || isPrivateConnected.get();
if (!isAnyConnected) {
return;
}
long elapsed = System.currentTimeMillis() - lastMessageTime.get();
if (elapsed >= HEARTBEAT_TIMEOUT * 1000L) {
log.debug("[OKX-WS] 心跳超时 {}ms, 主动发送ping", elapsed);
sendPing();
}
}
/**
* 向两条 WS 连接主动发送 ping(OKX 文档标准 JSON 格式)。
*/
private void sendPing() {
try {
String pingMsg = "{\"op\":\"ping\"}";
if (publicWsClient != null && publicWsClient.isOpen()) {
publicWsClient.send(pingMsg);
}
if (privateWsClient != null && privateWsClient.isOpen()) {
privateWsClient.send(pingMsg);
}
} catch (Exception e) {
log.warn("[OKX-WS] 发送ping失败", e);
}
}
/**
* 取消心跳超时检测任务。
*/
private synchronized void cancelPongTimeout() {
if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) {
pongTimeoutFuture.cancel(true);
}
}
// ==================== 重连机制 ====================
/**
* 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。
*
* @param isPrivate true=重连私有 WS,false=重连公开 WS
* @throws InterruptedException 线程被中断
*/
private void reconnectWithBackoff(boolean isPrivate) throws InterruptedException {
String label = isPrivate ? "私有" : "公开";
int attempt = 0;
long delayMs = INITIAL_RECONNECT_DELAY_MS;
while (attempt < MAX_RECONNECT_ATTEMPTS) {
try {
Thread.sleep(delayMs);
connect(isPrivate);
log.info("[OKX-WS] {} WS第{}次重连成功", label, attempt + 1);
return;
} catch (Exception e) {
log.warn("[OKX-WS] {} WS第{}次重连失败", label, attempt + 1, e);
delayMs *= 2;
attempt++;
}
}
log.error("[OKX-WS] {} WS超过最大重试次数({}),放弃重连", label, MAX_RECONNECT_ATTEMPTS);
}
// ==================== 工具方法 ====================
/**
* 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。
*
* @param executor 需要关闭的线程池
*/
private void shutdownExecutorGracefully(ExecutorService executor) {
if (executor == null || executor.isTerminated()) {
return;
}
try {
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
// ==================== 状态查询 ====================
/** @return 公开 WS 是否已连接 */
public boolean isPublicConnected() {
return isPublicConnected.get();
}
/** @return 私有 WS 是否已连接并登录成功 */
public boolean isPrivateConnected() {
return isPrivateConnected.get() && isPrivateLoggedIn.get();
}
}