package com.xcong.excoin.modules.gateApi;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
|
import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
|
import lombok.extern.slf4j.Slf4j;
|
import org.java_websocket.client.WebSocketClient;
|
import org.java_websocket.handshake.ServerHandshake;
|
|
import java.net.URI;
|
import java.net.URISyntaxException;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.concurrent.*;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicReference;
|
|
/**
|
* Gate WebSocket 连接管理器。
|
*
|
* <h3>职责</h3>
|
* 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link GateChannelHandler} 实现类。
|
*
|
* <h3>生命周期</h3>
|
* <pre>
|
* init() → connect() → startHeartbeat()
|
* destroy() → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池
|
* onClose() → reconnectWithBackoff() (最多 3 次,指数退避)
|
* </pre>
|
*
|
* <h3>消息路由</h3>
|
* <pre>
|
* onMessage → handleMessage:
|
* 1. futures.pong → cancelPongTimeout
|
* 2. subscribe/unsubscribe → 日志
|
* 3. error → 错误日志
|
* 4. update/all → 遍历 channelHandlers → handler.handleMessage(response)
|
* </pre>
|
*
|
* <h3>心跳机制</h3>
|
* 采用双重检测:TCP 层的 WebSocket ping/pong + 应用层 futures.ping/futures.pong。
|
* 10 秒未收到任何消息 → 发送 futures.ping;25 秒周期检查。
|
*
|
* <h3>线程安全</h3>
|
* 连接状态用 AtomicBoolean(isConnected, isConnecting, isInitialized)。
|
* 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
|
*
|
* @author Administrator
|
*/
|
@SuppressWarnings("ALL")
|
@Slf4j
|
public class GateKlineWebSocketClient {
|
|
private static final String FUTURES_PING = "futures.ping";
|
private static final String FUTURES_PONG = "futures.pong";
|
private static final int HEARTBEAT_TIMEOUT = 10;
|
|
/** WebSocket 地址,由 GateConfig 提供 */
|
private final String wsUrl;
|
|
/** Java-WebSocket 客户端实例 */
|
private WebSocketClient webSocketClient;
|
/** 心跳检测调度器 */
|
private ScheduledExecutorService heartbeatExecutor;
|
/** 心跳超时 Future */
|
private volatile ScheduledFuture<?> pongTimeoutFuture;
|
/** 最后收到消息的时间戳(毫秒) */
|
private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
|
|
/** 连接状态 */
|
private final AtomicBoolean isConnected = new AtomicBoolean(false);
|
/** 连接中标记,防重入 */
|
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
|
/** 初始化标记,防重复 init */
|
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
|
|
/** 频道处理器列表,通过 addChannelHandler 注册 */
|
private final List<GateChannelHandler> channelHandlers = new ArrayList<>();
|
|
/** 重连等异步任务的缓存线程池(daemon 线程) */
|
private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "gate-ws-worker");
|
t.setDaemon(true);
|
return t;
|
});
|
|
public GateKlineWebSocketClient(String wsUrl) {
|
this.wsUrl = wsUrl;
|
}
|
|
/**
|
* 注册频道处理器。需在 init() 前调用。
|
*
|
* @param handler 实现了 {@link GateChannelHandler} 接口的频道处理器
|
*/
|
public void addChannelHandler(GateChannelHandler handler) {
|
channelHandlers.add(handler);
|
}
|
|
/**
|
* 初始化:建立 WebSocket 连接 → 启动心跳检测。
|
* 使用 {@code AtomicBoolean} 防重入,同一实例只允许初始化一次。
|
*/
|
public void init() {
|
if (!isInitialized.compareAndSet(false, true)) {
|
log.warn("[WS] 已初始化过,跳过重复初始化");
|
return;
|
}
|
connect();
|
startHeartbeat();
|
}
|
|
/**
|
* 销毁:取消所有频道订阅 → 关闭 WebSocket 连接 → 关闭线程池。
|
*
|
* <h3>执行顺序</h3>
|
* 先取消订阅(等待 500ms 确保发送完成),再 closeBlocking 关闭连接,
|
* 最后 shutdown 线程池。先关连接再关线程池,避免 onClose 回调中的重连任务访问已关闭的线程池。
|
*/
|
public void destroy() {
|
log.info("[WS] 开始销毁...");
|
|
if (webSocketClient != null && webSocketClient.isOpen()) {
|
for (GateChannelHandler handler : channelHandlers) {
|
handler.unsubscribe(webSocketClient);
|
}
|
try {
|
Thread.sleep(500);
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.warn("[WS] 取消订阅等待被中断");
|
}
|
}
|
|
if (webSocketClient != null && webSocketClient.isOpen()) {
|
try {
|
webSocketClient.closeBlocking();
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
log.warn("[WS] 关闭连接时被中断");
|
}
|
}
|
|
if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
|
sharedExecutor.shutdown();
|
}
|
|
shutdownExecutorGracefully(heartbeatExecutor);
|
if (pongTimeoutFuture != null) {
|
pongTimeoutFuture.cancel(true);
|
}
|
shutdownExecutorGracefully(sharedExecutor);
|
|
log.info("[WS] 销毁完成");
|
}
|
|
/**
|
* 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。
|
*
|
* <h3>连接成功回调</h3>
|
* <ol>
|
* <li>设置 isConnected=true,isConnecting=false</li>
|
* <li>重置心跳计时器</li>
|
* <li>依次订阅所有已注册的频道处理器</li>
|
* <li>发送首次 ping</li>
|
* </ol>
|
*
|
* <h3>连接关闭回调</h3>
|
* 设置断连状态 → 取消心跳超时 → 异步触发指数退避重连(最多3次)。
|
*
|
* <h3>线程安全</h3>
|
* 使用 {@code AtomicBoolean.isConnecting} 防止并发重连。
|
*/
|
private void connect() {
|
if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
|
log.info("[WS] 连接进行中,跳过重复请求");
|
return;
|
}
|
try {
|
SSLConfig.configureSSL();
|
System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
|
URI uri = new URI(wsUrl);
|
if (webSocketClient != null) {
|
try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
|
}
|
webSocketClient = new WebSocketClient(uri) {
|
@Override
|
public void onOpen(ServerHandshake handshake) {
|
log.info("[WS] 连接成功");
|
isConnected.set(true);
|
isConnecting.set(false);
|
if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
|
resetHeartbeatTimer();
|
for (GateChannelHandler handler : channelHandlers) {
|
handler.subscribe(webSocketClient);
|
}
|
sendPing();
|
} else {
|
log.warn("[WS] 应用正在关闭,忽略连接成功回调");
|
}
|
}
|
|
@Override
|
public void onMessage(String message) {
|
lastMessageTime.set(System.currentTimeMillis());
|
handleMessage(message);
|
resetHeartbeatTimer();
|
}
|
|
@Override
|
public void onClose(int code, String reason, boolean remote) {
|
log.warn("[WS] 连接关闭, code:{}, reason:{}", code, reason);
|
isConnected.set(false);
|
isConnecting.set(false);
|
cancelPongTimeout();
|
if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) {
|
sharedExecutor.execute(() -> {
|
try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); }
|
});
|
} else {
|
log.warn("[WS] 线程池已关闭,不执行重连");
|
}
|
}
|
|
@Override
|
public void onError(Exception ex) {
|
log.error("[WS] 发生错误", ex);
|
isConnected.set(false);
|
}
|
};
|
webSocketClient.connect();
|
} catch (URISyntaxException e) {
|
log.error("[WS] URI格式错误", e);
|
isConnecting.set(false);
|
}
|
}
|
|
/**
|
* 消息分发:先处理系统事件(pong/subscribe/error),
|
* 再把 update/all 事件路由到各 channelHandler。
|
* <p>每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。
|
*/
|
private void handleMessage(String message) {
|
try {
|
JSONObject response = JSON.parseObject(message);
|
String channel = response.getString("channel");
|
String event = response.getString("event");
|
|
if (FUTURES_PONG.equals(channel)) {
|
log.debug("[WS] 收到 pong 响应");
|
cancelPongTimeout();
|
return;
|
}
|
if ("subscribe".equals(event)) {
|
log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result"));
|
return;
|
}
|
if ("unsubscribe".equals(event)) {
|
log.info("[WS] {} 取消订阅成功", channel);
|
return;
|
}
|
if ("error".equals(event)) {
|
JSONObject error = response.getJSONObject("error");
|
log.error("[WS] {} 错误, code:{}, msg:{}",
|
channel,
|
error != null ? error.getInteger("code") : "N/A",
|
error != null ? error.getString("message") : response.getString("msg"));
|
return;
|
}
|
if ("update".equals(event) || "all".equals(event)) {
|
for (GateChannelHandler handler : channelHandlers) {
|
if (handler.handleMessage(response)) return;
|
}
|
}
|
} catch (Exception e) {
|
log.error("[WS] 处理消息失败: {}", message, e);
|
}
|
}
|
|
// ---- heartbeat ----
|
|
/**
|
* 启动心跳检测器。
|
* 使用单线程 ScheduledExecutor,每 25 秒检查一次心跳超时。
|
*/
|
private void startHeartbeat() {
|
if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow();
|
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; });
|
heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
|
}
|
|
/**
|
* 重置心跳计时器:取消旧超时任务,提交新的 10 秒超时检测。
|
*/
|
private synchronized void resetHeartbeatTimer() {
|
cancelPongTimeout();
|
if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) {
|
pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS);
|
}
|
}
|
|
/**
|
* 检查心跳超时:如果距离上次收到消息超过 10 秒,主动发送 futures.ping。
|
*/
|
private void checkHeartbeatTimeout() {
|
if (!isConnected.get()) return;
|
if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing();
|
}
|
|
/**
|
* 发送应用层 futures.ping 消息。
|
*/
|
private void sendPing() {
|
try {
|
if (webSocketClient != null && webSocketClient.isOpen()) {
|
JSONObject pingMsg = new JSONObject();
|
pingMsg.put("time", System.currentTimeMillis() / 1000);
|
pingMsg.put("channel", FUTURES_PING);
|
webSocketClient.send(pingMsg.toJSONString());
|
log.debug("[WS] 发送 ping 请求");
|
}
|
} catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); }
|
}
|
|
/**
|
* 取消心跳超时检测任务。
|
*/
|
private synchronized void cancelPongTimeout() {
|
if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
|
}
|
|
// ---- reconnect ----
|
|
/**
|
* 指数退避重连。初始延迟 5 秒,每次翻倍,最多重试 3 次。
|
*
|
* @throws InterruptedException 线程被中断
|
*/
|
private void reconnectWithBackoff() throws InterruptedException {
|
int attempt = 0, maxAttempts = 3;
|
long delayMs = 5000;
|
while (attempt < maxAttempts) {
|
try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; }
|
}
|
log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts);
|
}
|
|
/**
|
* 优雅关闭线程池:先 shutdown,等待 5 秒,超时则 shutdownNow 强制中断。
|
*
|
* @param executor 需要关闭的线程池
|
*/
|
private void shutdownExecutorGracefully(ExecutorService executor) {
|
if (executor == null || executor.isTerminated()) return;
|
try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); }
|
catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); }
|
}
|
}
|