From 6a51f45e6a00b65a9e7b0b0707b453c11311f3ef Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Mon, 11 May 2026 22:38:13 +0800
Subject: [PATCH] feat(okxApi): 添加仓位模式配置和REST客户端功能
---
src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java | 184 +++++++++++++++++++++++++---------------------
1 files changed, 100 insertions(+), 84 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
similarity index 65%
rename from src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java
rename to src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
index 5a5f7b0..014d2f5 100644
--- a/src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
@@ -1,9 +1,9 @@
-package com.xcong.excoin.modules.gateApi;
+package com.xcong.excoin.modules.okxApi;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
-import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
-import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
+import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
@@ -17,10 +17,10 @@
import java.util.concurrent.atomic.AtomicReference;
/**
- * Gate WebSocket 连接管理器。
+ * OKX WebSocket 连接管理器。
*
* <h3>职责</h3>
- * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link GateChannelHandler} 实现类。
+ * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link OkxChannelHandler} 实现类。
*
* <h3>生命周期</h3>
* <pre>
@@ -32,73 +32,91 @@
* <h3>消息路由</h3>
* <pre>
* onMessage → handleMessage:
- * 1. futures.pong → cancelPongTimeout
- * 2. subscribe/unsubscribe → 日志
- * 3. error → 错误日志
- * 4. update/all → 遍历 channelHandlers → handler.handleMessage(response)
+ * 1. pong → cancelPongTimeout
+ * 2. login/subscribe/error → 日志
+ * 3. order/batch-orders → 下单结果日志
+ * 4. 数据推送 → 遍历 channelHandlers → handler.handleMessage(response)
* </pre>
*
* <h3>心跳机制</h3>
- * 采用双重检测:TCP 层的 WebSocket ping/pong + 应用层 futures.ping/futures.pong。
- * 10 秒未收到任何消息 → 发送 futures.ping;25 秒周期检查。
- *
- * <h3>线程安全</h3>
- * 连接状态用 AtomicBoolean(isConnected, isConnecting, isInitialized)。
- * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
+ * 10 秒未收到任何消息 → 发送 ping;25 秒周期检查。
*
* @author Administrator
*/
@SuppressWarnings("ALL")
@Slf4j
-public class GateKlineWebSocketClient {
+public class OkxKlineWebSocketClient {
- private static final String FUTURES_PING = "futures.ping";
- private static final String FUTURES_PONG = "futures.pong";
private static final int HEARTBEAT_TIMEOUT = 10;
- /** WebSocket 地址,由 GateConfig 提供 */
private final String wsUrl;
+ private final boolean isPrivate;
+ private final String apiKey;
+ private final String secretKey;
+ private final String passphrase;
- /** Java-WebSocket 客户端实例 */
private WebSocketClient webSocketClient;
- /** 心跳检测调度器 */
private ScheduledExecutorService heartbeatExecutor;
- /** 心跳超时 Future */
private volatile ScheduledFuture<?> pongTimeoutFuture;
- /** 最后收到消息的时间戳(毫秒) */
private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
- /** 连接状态 */
private final AtomicBoolean isConnected = new AtomicBoolean(false);
- /** 连接中标记,防重入 */
private final AtomicBoolean isConnecting = new AtomicBoolean(false);
- /** 初始化标记,防重复 init */
private final AtomicBoolean isInitialized = new AtomicBoolean(false);
- /** 频道处理器列表,通过 addChannelHandler 注册 */
- private final List<GateChannelHandler> channelHandlers = new ArrayList<>();
+ private final List<OkxChannelHandler> channelHandlers = new ArrayList<>();
- /** 重连等异步任务的缓存线程池(daemon 线程) */
+ public WebSocketClient getWebSocketClient() {
+ return webSocketClient;
+ }
+
private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
- Thread t = new Thread(r, "gate-ws-worker");
+ Thread t = new Thread(r, "okxApi-ws-worker");
t.setDaemon(true);
return t;
});
- public GateKlineWebSocketClient(String wsUrl) {
+ public OkxKlineWebSocketClient(String wsUrl) {
this.wsUrl = wsUrl;
+ this.isPrivate = false;
+ this.apiKey = null;
+ this.secretKey = null;
+ this.passphrase = null;
}
- /**
- * 注册频道处理器。需在 init() 前调用。
- */
- public void addChannelHandler(GateChannelHandler handler) {
+ public OkxKlineWebSocketClient(String wsUrl, String apiKey, String secretKey, String passphrase) {
+ this.wsUrl = wsUrl;
+ this.isPrivate = true;
+ this.apiKey = apiKey;
+ this.secretKey = secretKey;
+ this.passphrase = passphrase;
+ }
+
+ public void addChannelHandler(OkxChannelHandler handler) {
channelHandlers.add(handler);
}
- /**
- * 初始化:建立 WebSocket 连接 → 启动心跳。
- */
+ private void websocketLogin() {
+ try {
+ String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
+ String sign = OkxWsUtil.signWebsocket(timestamp, secretKey);
+
+ JSONArray argsArray = new JSONArray();
+ JSONObject loginArgs = new JSONObject();
+ loginArgs.put("apiKey", apiKey);
+ loginArgs.put("passphrase", passphrase);
+ loginArgs.put("timestamp", timestamp);
+ loginArgs.put("sign", sign);
+ argsArray.add(loginArgs);
+
+ JSONObject login = OkxWsUtil.buildJsonObject(null, "login", argsArray);
+ webSocketClient.send(login.toJSONString());
+ log.info("[WS] 发送登录请求");
+ } catch (Exception e) {
+ log.error("[WS] 登录请求构建失败", e);
+ }
+ }
+
public void init() {
if (!isInitialized.compareAndSet(false, true)) {
log.warn("[WS] 已初始化过,跳过重复初始化");
@@ -108,16 +126,11 @@
startHeartbeat();
}
- /**
- * 销毁:取消订阅 → 关闭连接 → 关闭线程池。
- * <p>注意:先 closeBlocking 再 shutdown sharedExecutor,
- * 避免 onClose 回调中的 reconnectWithBackoff 访问已关闭的线程池。
- */
public void destroy() {
log.info("[WS] 开始销毁...");
if (webSocketClient != null && webSocketClient.isOpen()) {
- for (GateChannelHandler handler : channelHandlers) {
+ for (OkxChannelHandler handler : channelHandlers) {
handler.unsubscribe(webSocketClient);
}
try {
@@ -150,17 +163,13 @@
log.info("[WS] 销毁完成");
}
- /**
- * 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。
- * 连接成功后依次订阅所有已注册的频道处理器。
- */
private void connect() {
if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
log.info("[WS] 连接进行中,跳过重复请求");
return;
}
try {
- SSLConfig.configureSSL();
+ OkxWsUtil.configureSSL();
System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
URI uri = new URI(wsUrl);
if (webSocketClient != null) {
@@ -169,15 +178,19 @@
webSocketClient = new WebSocketClient(uri) {
@Override
public void onOpen(ServerHandshake handshake) {
- log.info("[WS] 连接成功");
+ log.info("[WS] 连接成功, isPrivate:{}", isPrivate);
isConnected.set(true);
isConnecting.set(false);
if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
resetHeartbeatTimer();
- for (GateChannelHandler handler : channelHandlers) {
- handler.subscribe(webSocketClient);
+ if (isPrivate) {
+ websocketLogin();
+ } else {
+ for (OkxChannelHandler handler : channelHandlers) {
+ handler.subscribe(webSocketClient);
+ }
+ sendPing();
}
- sendPing();
} else {
log.warn("[WS] 应用正在关闭,忽略连接成功回调");
}
@@ -218,53 +231,61 @@
}
}
- /**
- * 消息分发:先处理系统事件(pong/subscribe/error),
- * 再把 update/all 事件路由到各 channelHandler。
- * <p>每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。
- */
private void handleMessage(String message) {
try {
- JSONObject response = JSON.parseObject(message);
- String channel = response.getString("channel");
- String event = response.getString("event");
-
- if (FUTURES_PONG.equals(channel)) {
- log.debug("[WS] 收到 pong 响应");
+ if ("pong".equals(message)) {
+ log.debug("[WS] 收到心跳响应");
cancelPongTimeout();
return;
}
+ JSONObject response = JSON.parseObject(message);
+ String event = response.getString("event");
+
+ if ("login".equals(event)) {
+ String code = response.getString("code");
+ if ("0".equals(code)) {
+ log.info("[WS] WebSocket登录成功");
+ for (OkxChannelHandler handler : channelHandlers) {
+ handler.subscribe(webSocketClient);
+ }
+ sendPing();
+ } else {
+ log.error("[WS] WebSocket登录失败, code:{}, msg:{}", code, response.getString("msg"));
+ }
+ return;
+ }
if ("subscribe".equals(event)) {
- log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result"));
+ log.info("[WS] 订阅成功: {}", response.getJSONObject("arg"));
return;
}
if ("unsubscribe".equals(event)) {
- log.info("[WS] {} 取消订阅成功", channel);
+ log.info("[WS] 取消订阅成功: {}", response.getJSONObject("arg"));
return;
}
if ("error".equals(event)) {
- JSONObject error = response.getJSONObject("error");
- log.error("[WS] {} 错误, code:{}, msg:{}",
- channel,
- error != null ? error.getInteger("code") : "N/A",
- error != null ? error.getString("message") : response.getString("msg"));
+ log.error("[WS] 错误, code:{}, msg:{}",
+ response.getString("code"), response.getString("msg"));
return;
}
- if ("update".equals(event) || "all".equals(event)) {
- for (GateChannelHandler handler : channelHandlers) {
- if (handler.handleMessage(response)) return;
- }
+ if ("channel-conn-count".equals(event)) {
+ return;
+ }
+ String op = response.getString("op");
+ if ("order".equals(op) || "batch-orders".equals(op)) {
+ log.info("[WS] 收到下单推送结果: {}", JSON.toJSONString(response.get("data")));
+ return;
+ }
+ for (OkxChannelHandler handler : channelHandlers) {
+ if (handler.handleMessage(response)) return;
}
} catch (Exception e) {
log.error("[WS] 处理消息失败: {}", message, e);
}
}
- // ---- heartbeat ----
-
private void startHeartbeat() {
if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow();
- heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; });
+ heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okxApi-heartbeat"); t.setDaemon(true); return t; });
heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
}
@@ -283,10 +304,7 @@
private void sendPing() {
try {
if (webSocketClient != null && webSocketClient.isOpen()) {
- JSONObject pingMsg = new JSONObject();
- pingMsg.put("time", System.currentTimeMillis() / 1000);
- pingMsg.put("channel", FUTURES_PING);
- webSocketClient.send(pingMsg.toJSONString());
+ webSocketClient.send("ping");
log.debug("[WS] 发送 ping 请求");
}
} catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); }
@@ -295,8 +313,6 @@
private synchronized void cancelPongTimeout() {
if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
}
-
- // ---- reconnect ----
private void reconnectWithBackoff() throws InterruptedException {
int attempt = 0, maxAttempts = 3;
--
Gitblit v1.9.1