package com.xcong.excoin.modules.okxNewPrice;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxGridChannelHandler;
|
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SignUtils;
|
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.utils.SSLConfig;
|
import lombok.extern.slf4j.Slf4j;
|
import org.java_websocket.client.WebSocketClient;
|
import org.java_websocket.handshake.ServerHandshake;
|
|
import java.net.URI;
|
import java.net.URISyntaxException;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicReference;
|
|
/**
|
* OKX 网格交易专用 WebSocket 客户端,对齐 Gate 的 GateKlineWebSocketClient 模式。
|
*
|
* <h3>职责</h3>
|
* 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给
|
* {@link OkxGridChannelHandler} 实现类。
|
*
|
* <h3>生命周期</h3>
|
* <pre>
|
* init() → connect() → login() → subscribe handlers → startHeartbeat()
|
* destroy() → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
|
* onClose() → reconnectWithBackoff() (最多 3 次,指数退避)
|
* </pre>
|
*
|
* @author Administrator
|
*/
|
@Slf4j
|
public class OkxGridWsClient {
|
|
private static final int HEARTBEAT_TIMEOUT = 10;
|
|
/** 模拟盘业务 WS 地址(K线等行情数据) */
|
private static final String WS_BUSINESS_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/business";
|
/** 实盘业务 WS 地址(K线等行情数据) */
|
private static final String WS_BUSINESS_URL_PROD = "wss://ws.okx.com:8443/ws/v5/business";
|
/** 模拟盘私有 WS 地址 */
|
private static final String WS_PRIVATE_URL_SIM = "wss://wspap.okx.com:8443/ws/v5/private";
|
/** 实盘私有 WS 地址 */
|
private static final String WS_PRIVATE_URL_PROD = "wss://ws.okx.com:8443/ws/v5/private";
|
|
private final ExchangeInfoEnum account;
|
private final boolean isPublic;
|
private final String logPrefix;
|
private WebSocketClient webSocketClient;
|
private ScheduledExecutorService heartbeatExecutor;
|
private volatile ScheduledFuture<?> pongTimeoutFuture;
|
private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
|
|
private final AtomicBoolean isConnected = new AtomicBoolean(false);
|
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
|
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
|
|
/** 频道处理器列表 */
|
private final List<OkxGridChannelHandler> channelHandlers = new ArrayList<>();
|
|
/** 共享线程池 */
|
private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "okx-grid-ws-worker");
|
t.setDaemon(true);
|
return t;
|
});
|
|
public OkxGridWsClient(ExchangeInfoEnum account, boolean isPublic) {
|
this.account = account;
|
this.isPublic = isPublic;
|
this.logPrefix = isPublic ? "[OKX-Grid-WS-PUB]" : "[OKX-Grid-WS-PRI]";
|
}
|
|
public void addChannelHandler(OkxGridChannelHandler handler) {
|
channelHandlers.add(handler);
|
}
|
|
public void init() {
|
if (!isInitialized.compareAndSet(false, true)) {
|
log.warn("[{}] 已初始化过,跳过重复初始化", logPrefix);
|
return;
|
}
|
connect();
|
startHeartbeat();
|
}
|
|
public void destroy() {
|
log.info("[{}] 开始销毁...", logPrefix);
|
if (webSocketClient != null && webSocketClient.isOpen()) {
|
for (OkxGridChannelHandler handler : channelHandlers) {
|
handler.unsubscribe(webSocketClient);
|
}
|
try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
|
}
|
if (webSocketClient != null && webSocketClient.isOpen()) {
|
try {
|
webSocketClient.closeBlocking();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
}
|
}
|
shutdownExecutorGracefully(heartbeatExecutor);
|
if (pongTimeoutFuture != null) pongTimeoutFuture.cancel(true);
|
shutdownExecutorGracefully(sharedExecutor);
|
log.info("[{}] 销毁完成", logPrefix);
|
}
|
|
private void connect() {
|
if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
|
log.info("[{}] 连接进行中,跳过重复请求", logPrefix);
|
return;
|
}
|
try {
|
SSLConfig.configureSSL();
|
System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
|
String wsUrl;
|
if (account.isAccountType()) {
|
wsUrl = isPublic ? WS_BUSINESS_URL_PROD : WS_PRIVATE_URL_PROD;
|
} else {
|
wsUrl = isPublic ? WS_BUSINESS_URL_SIM : WS_PRIVATE_URL_SIM;
|
}
|
URI uri = new URI(wsUrl);
|
|
if (webSocketClient != null) {
|
try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
|
}
|
|
webSocketClient = new WebSocketClient(uri) {
|
@Override
|
public void onOpen(ServerHandshake handshake) {
|
log.info("[{}] 连接成功", logPrefix);
|
isConnected.set(true);
|
isConnecting.set(false);
|
if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
|
resetHeartbeatTimer();
|
if (isPublic) {
|
subscribeAllHandlers();
|
} else {
|
wsLogin();
|
}
|
}
|
}
|
|
@Override
|
public void onMessage(String message) {
|
lastMessageTime.set(System.currentTimeMillis());
|
handleMessage(message);
|
resetHeartbeatTimer();
|
}
|
|
@Override
|
public void onClose(int code, String reason, boolean remote) {
|
log.warn("[{}] 连接关闭, code:{}, reason:{}", logPrefix, code, reason);
|
isConnected.set(false);
|
isConnecting.set(false);
|
cancelPongTimeout();
|
if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
|
sharedExecutor.execute(() -> {
|
try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[{}] 重连失败", logPrefix, e); }
|
});
|
}
|
}
|
|
@Override
|
public void onError(Exception ex) {
|
log.error("[{}] 发生错误", logPrefix, ex);
|
isConnected.set(false);
|
}
|
};
|
webSocketClient.connect();
|
} catch (URISyntaxException e) {
|
log.error("[{}] URI格式错误", logPrefix, e);
|
isConnecting.set(false);
|
}
|
}
|
|
/**
|
* WebSocket 登录认证
|
*/
|
private void wsLogin() {
|
try {
|
String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
|
String sign = SignUtils.signWebsocket(timestamp, account.getSecretKey());
|
|
JSONObject msg = new JSONObject();
|
msg.put("op", "login");
|
com.alibaba.fastjson.JSONArray args = new com.alibaba.fastjson.JSONArray();
|
JSONObject loginArgs = new JSONObject();
|
loginArgs.put("apiKey", account.getApiKey());
|
loginArgs.put("passphrase", account.getPassphrase());
|
loginArgs.put("timestamp", timestamp);
|
loginArgs.put("sign", sign);
|
args.add(loginArgs);
|
msg.put("args", args);
|
webSocketClient.send(msg.toJSONString());
|
log.info("[{}] 发送登录请求", logPrefix);
|
} catch (Exception e) {
|
log.error("[{}] 登录请求构建失败", logPrefix, e);
|
}
|
}
|
|
private void subscribeAllHandlers() {
|
log.info("[{}] 开始订阅频道", logPrefix);
|
for (OkxGridChannelHandler handler : channelHandlers) {
|
handler.subscribe(webSocketClient);
|
}
|
}
|
|
private void handleMessage(String message) {
|
try {
|
if ("pong".equals(message)) {
|
log.debug("[{}] 收到 pong", logPrefix);
|
return;
|
}
|
JSONObject response = JSON.parseObject(message);
|
String event = response.getString("event");
|
String op = response.getString("op");
|
|
if ("login".equals(event) || ("login".equals(op))) {
|
log.info("[{}] 登录成功, 开始订阅频道", logPrefix);
|
subscribeAllHandlers();
|
return;
|
}
|
|
if ("subscribe".equals(event) || "unsubscribe".equals(event)) {
|
log.info("[{}] {}事件: {}", logPrefix, event, response.getString("arg"));
|
return;
|
}
|
|
if ("error".equals(event)) {
|
log.error("[{}] 错误: {}", logPrefix, message);
|
return;
|
}
|
|
for (OkxGridChannelHandler handler : channelHandlers) {
|
if (handler.handleMessage(response)) return;
|
}
|
} catch (Exception e) {
|
log.error("[{}] 处理消息失败: {}", logPrefix, message, e);
|
}
|
}
|
|
// ---- heartbeat ----
|
|
private void startHeartbeat() {
|
if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow();
|
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
|
Thread t = new Thread(r, "okx-grid-ws-heartbeat");
|
t.setDaemon(true);
|
return t;
|
});
|
heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
|
}
|
|
private synchronized void resetHeartbeatTimer() {
|
cancelPongTimeout();
|
if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
|
pongTimeoutFuture = heartbeatExecutor.schedule(this::sendPing, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
|
}
|
}
|
|
private void checkHeartbeatTimeout() {
|
if (!isConnected.get()) return;
|
if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing();
|
}
|
|
private void sendPing() {
|
try {
|
if (webSocketClient != null && webSocketClient.isOpen()) {
|
webSocketClient.send("ping");
|
log.debug("[{}] 发送 ping", logPrefix);
|
}
|
} catch (Exception e) { log.warn("[{}] 发送 ping 失败", logPrefix, e); }
|
}
|
|
private synchronized void cancelPongTimeout() {
|
if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
|
}
|
|
private void reconnectWithBackoff() throws InterruptedException {
|
int attempt = 0, maxAttempts = 3;
|
long delayMs = 5000;
|
while (attempt < maxAttempts) {
|
try { Thread.sleep(delayMs); connect(); return; }
|
catch (Exception e) { log.warn("[{}] 第{}次重连失败", logPrefix, attempt + 1, e); delayMs *= 2; attempt++; }
|
}
|
log.error("[{}] 超过最大重试次数({}),放弃重连", logPrefix, maxAttempts);
|
}
|
|
private void shutdownExecutorGracefully(ExecutorService executor) {
|
if (executor == null || executor.isTerminated()) return;
|
try {
|
executor.shutdown();
|
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
executor.shutdownNow();
|
}
|
}
|
}
|