package com.xcong.excoin.modules.okxNewPrice;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxGridChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SignUtils;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SSLConfig;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* OKX 网格交易专用 WebSocket 客户端,对齐 Gate 的 GateKlineWebSocketClient 模式。
*
*
职责
* 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给
* {@link OkxGridChannelHandler} 实现类。
*
* 生命周期
*
* init() → connect() → login() → subscribe handlers → startHeartbeat()
* destroy() → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
* onClose() → reconnectWithBackoff() (最多 3 次,指数退避)
*
*
* @author Administrator
*/
@Slf4j
public class OkxGridWsClient {
private static final int HEARTBEAT_TIMEOUT = 10;
/** 模拟盘 WS 地址 */
private static final String WS_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private";
/** 实盘 WS 地址 */
private static final String WS_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private";
private final ExchangeInfoEnum account;
private WebSocketClient webSocketClient;
private ScheduledExecutorService heartbeatExecutor;
private volatile ScheduledFuture> pongTimeoutFuture;
private final AtomicReference lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
private final AtomicBoolean isConnected = new AtomicBoolean(false);
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
/** 频道处理器列表 */
private final List channelHandlers = new ArrayList<>();
/** 共享线程池 */
private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
Thread t = new Thread(r, "okx-grid-ws-worker");
t.setDaemon(true);
return t;
});
public OkxGridWsClient(ExchangeInfoEnum account) {
this.account = account;
}
public void addChannelHandler(OkxGridChannelHandler handler) {
channelHandlers.add(handler);
}
public void init() {
if (!isInitialized.compareAndSet(false, true)) {
log.warn("[OKX-Grid-WS] 已初始化过,跳过重复初始化");
return;
}
connect();
startHeartbeat();
}
public void destroy() {
log.info("[OKX-Grid-WS] 开始销毁...");
if (webSocketClient != null && webSocketClient.isOpen()) {
for (OkxGridChannelHandler handler : channelHandlers) {
handler.unsubscribe(webSocketClient);
}
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
if (webSocketClient != null && webSocketClient.isOpen()) {
try {
webSocketClient.closeBlocking();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
shutdownExecutorGracefully(heartbeatExecutor);
if (pongTimeoutFuture != null) pongTimeoutFuture.cancel(true);
shutdownExecutorGracefully(sharedExecutor);
log.info("[OKX-Grid-WS] 销毁完成");
}
private void connect() {
if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
log.info("[OKX-Grid-WS] 连接进行中,跳过重复请求");
return;
}
try {
SSLConfig.configureSSL();
System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
String wsUrl = account.isAccountType() ? WS_URL_PROD : WS_URL_SIM;
URI uri = new URI(wsUrl);
if (webSocketClient != null) {
try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
webSocketClient = new WebSocketClient(uri) {
@Override
public void onOpen(ServerHandshake handshake) {
log.info("[OKX-Grid-WS] 连接成功");
isConnected.set(true);
isConnecting.set(false);
if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
resetHeartbeatTimer();
wsLogin();
}
}
@Override
public void onMessage(String message) {
lastMessageTime.set(System.currentTimeMillis());
handleMessage(message);
resetHeartbeatTimer();
}
@Override
public void onClose(int code, String reason, boolean remote) {
log.warn("[OKX-Grid-WS] 连接关闭, code:{}, reason:{}", code, reason);
isConnected.set(false);
isConnecting.set(false);
cancelPongTimeout();
if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
sharedExecutor.execute(() -> {
try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[OKX-Grid-WS] 重连失败", e); }
});
}
}
@Override
public void onError(Exception ex) {
log.error("[OKX-Grid-WS] 发生错误", ex);
isConnected.set(false);
}
};
webSocketClient.connect();
} catch (URISyntaxException e) {
log.error("[OKX-Grid-WS] URI格式错误", e);
isConnecting.set(false);
}
}
/**
* WebSocket 登录认证
*/
private void wsLogin() {
try {
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
String sign = SignUtils.signWebsocket(timestamp, account.getSecretKey());
JSONObject msg = new JSONObject();
msg.put("op", "login");
com.alibaba.fastjson.JSONArray args = new com.alibaba.fastjson.JSONArray();
JSONObject loginArgs = new JSONObject();
loginArgs.put("apiKey", account.getApiKey());
loginArgs.put("passphrase", account.getPassphrase());
loginArgs.put("timestamp", timestamp);
loginArgs.put("sign", sign);
args.add(loginArgs);
msg.put("args", args);
webSocketClient.send(msg.toJSONString());
log.info("[OKX-Grid-WS] 发送登录请求");
} catch (Exception e) {
log.error("[OKX-Grid-WS] 登录请求构建失败", e);
}
}
private void handleMessage(String message) {
try {
JSONObject response = JSON.parseObject(message);
String event = response.getString("event");
String op = response.getString("op");
// 登录成功 → 订阅所有频道
if ("login".equals(event) || ("login".equals(op))) {
log.info("[OKX-Grid-WS] 登录成功, 开始订阅频道");
for (OkxGridChannelHandler handler : channelHandlers) {
handler.subscribe(webSocketClient);
}
return;
}
// 订阅确认
if ("subscribe".equals(event) || "unsubscribe".equals(event)) {
log.info("[OKX-Grid-WS] {}事件: {}", event, response.getString("arg"));
return;
}
// 错误
if ("error".equals(event)) {
log.error("[OKX-Grid-WS] 错误: {}", message);
return;
}
// 数据推送 → 路由到 handler
for (OkxGridChannelHandler handler : channelHandlers) {
if (handler.handleMessage(response)) return;
}
} catch (Exception e) {
log.error("[OKX-Grid-WS] 处理消息失败: {}", message, e);
}
}
// ---- heartbeat ----
private void startHeartbeat() {
if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow();
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "okx-grid-ws-heartbeat");
t.setDaemon(true);
return t;
});
heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
}
private synchronized void resetHeartbeatTimer() {
cancelPongTimeout();
if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
pongTimeoutFuture = heartbeatExecutor.schedule(this::sendPing, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
}
}
private void checkHeartbeatTimeout() {
if (!isConnected.get()) return;
if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing();
}
private void sendPing() {
try {
if (webSocketClient != null && webSocketClient.isOpen()) {
webSocketClient.send("ping");
log.debug("[OKX-Grid-WS] 发送 ping");
}
} catch (Exception e) { log.warn("[OKX-Grid-WS] 发送 ping 失败", e); }
}
private synchronized void cancelPongTimeout() {
if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
}
private void reconnectWithBackoff() throws InterruptedException {
int attempt = 0, maxAttempts = 3;
long delayMs = 5000;
while (attempt < maxAttempts) {
try { Thread.sleep(delayMs); connect(); return; }
catch (Exception e) { log.warn("[OKX-Grid-WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; }
}
log.error("[OKX-Grid-WS] 超过最大重试次数({}),放弃重连", maxAttempts);
}
private void shutdownExecutorGracefully(ExecutorService executor) {
if (executor == null || executor.isTerminated()) return;
try {
executor.shutdown();
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executor.shutdownNow();
}
}
}