From 6a51f45e6a00b65a9e7b0b0707b453c11311f3ef Mon Sep 17 00:00:00 2001
From: Administrator <15274802129@163.com>
Date: Mon, 11 May 2026 22:38:13 +0800
Subject: [PATCH] feat(okxApi): 添加仓位模式配置和REST客户端功能

---
 src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java |  184 +++++++++++++++++++++++++---------------------
 1 files changed, 100 insertions(+), 84 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
similarity index 65%
rename from src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java
rename to src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
index 5a5f7b0..014d2f5 100644
--- a/src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java
+++ b/src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
@@ -1,9 +1,9 @@
-package com.xcong.excoin.modules.gateApi;
+package com.xcong.excoin.modules.okxApi;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
-import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
+import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
 import lombok.extern.slf4j.Slf4j;
 import org.java_websocket.client.WebSocketClient;
 import org.java_websocket.handshake.ServerHandshake;
@@ -17,10 +17,10 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Gate WebSocket 连接管理器。
+ * OKX WebSocket 连接管理器。
  *
  * <h3>职责</h3>
- * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link GateChannelHandler} 实现类。
+ * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link OkxChannelHandler} 实现类。
  *
  * <h3>生命周期</h3>
  * <pre>
@@ -32,73 +32,91 @@
  * <h3>消息路由</h3>
  * <pre>
  *   onMessage → handleMessage:
- *     1. futures.pong         → cancelPongTimeout
- *     2. subscribe/unsubscribe → 日志
- *     3. error                → 错误日志
- *     4. update/all           → 遍历 channelHandlers → handler.handleMessage(response)
+ *     1. pong                  → cancelPongTimeout
+ *     2. login/subscribe/error → 日志
+ *     3. order/batch-orders    → 下单结果日志
+ *     4. 数据推送              → 遍历 channelHandlers → handler.handleMessage(response)
  * </pre>
  *
  * <h3>心跳机制</h3>
- * 采用双重检测:TCP 层的 WebSocket ping/pong + 应用层 futures.ping/futures.pong。
- * 10 秒未收到任何消息 → 发送 futures.ping;25 秒周期检查。
- *
- * <h3>线程安全</h3>
- * 连接状态用 AtomicBoolean(isConnected, isConnecting, isInitialized)。
- * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
+ * 10 秒未收到任何消息 → 发送 ping;25 秒周期检查。
  *
  * @author Administrator
  */
 @SuppressWarnings("ALL")
 @Slf4j
-public class GateKlineWebSocketClient {
+public class OkxKlineWebSocketClient {
 
-    private static final String FUTURES_PING = "futures.ping";
-    private static final String FUTURES_PONG = "futures.pong";
     private static final int HEARTBEAT_TIMEOUT = 10;
 
-    /** WebSocket 地址,由 GateConfig 提供 */
     private final String wsUrl;
+    private final boolean isPrivate;
+    private final String apiKey;
+    private final String secretKey;
+    private final String passphrase;
 
-    /** Java-WebSocket 客户端实例 */
     private WebSocketClient webSocketClient;
-    /** 心跳检测调度器 */
     private ScheduledExecutorService heartbeatExecutor;
-    /** 心跳超时 Future */
     private volatile ScheduledFuture<?> pongTimeoutFuture;
-    /** 最后收到消息的时间戳(毫秒) */
     private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
 
-    /** 连接状态 */
     private final AtomicBoolean isConnected = new AtomicBoolean(false);
-    /** 连接中标记,防重入 */
     private final AtomicBoolean isConnecting = new AtomicBoolean(false);
-    /** 初始化标记,防重复 init */
     private final AtomicBoolean isInitialized = new AtomicBoolean(false);
 
-    /** 频道处理器列表,通过 addChannelHandler 注册 */
-    private final List<GateChannelHandler> channelHandlers = new ArrayList<>();
+    private final List<OkxChannelHandler> channelHandlers = new ArrayList<>();
 
-    /** 重连等异步任务的缓存线程池(daemon 线程) */
+    public WebSocketClient getWebSocketClient() {
+        return webSocketClient;
+    }
+
     private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
-        Thread t = new Thread(r, "gate-ws-worker");
+        Thread t = new Thread(r, "okxApi-ws-worker");
         t.setDaemon(true);
         return t;
     });
 
-    public GateKlineWebSocketClient(String wsUrl) {
+    public OkxKlineWebSocketClient(String wsUrl) {
         this.wsUrl = wsUrl;
+        this.isPrivate = false;
+        this.apiKey = null;
+        this.secretKey = null;
+        this.passphrase = null;
     }
 
-    /**
-     * 注册频道处理器。需在 init() 前调用。
-     */
-    public void addChannelHandler(GateChannelHandler handler) {
+    public OkxKlineWebSocketClient(String wsUrl, String apiKey, String secretKey, String passphrase) {
+        this.wsUrl = wsUrl;
+        this.isPrivate = true;
+        this.apiKey = apiKey;
+        this.secretKey = secretKey;
+        this.passphrase = passphrase;
+    }
+
+    public void addChannelHandler(OkxChannelHandler handler) {
         channelHandlers.add(handler);
     }
 
-    /**
-     * 初始化:建立 WebSocket 连接 → 启动心跳。
-     */
+    private void websocketLogin() {
+        try {
+            String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
+            String sign = OkxWsUtil.signWebsocket(timestamp, secretKey);
+
+            JSONArray argsArray = new JSONArray();
+            JSONObject loginArgs = new JSONObject();
+            loginArgs.put("apiKey", apiKey);
+            loginArgs.put("passphrase", passphrase);
+            loginArgs.put("timestamp", timestamp);
+            loginArgs.put("sign", sign);
+            argsArray.add(loginArgs);
+
+            JSONObject login = OkxWsUtil.buildJsonObject(null, "login", argsArray);
+            webSocketClient.send(login.toJSONString());
+            log.info("[WS] 发送登录请求");
+        } catch (Exception e) {
+            log.error("[WS] 登录请求构建失败", e);
+        }
+    }
+
     public void init() {
         if (!isInitialized.compareAndSet(false, true)) {
             log.warn("[WS] 已初始化过,跳过重复初始化");
@@ -108,16 +126,11 @@
         startHeartbeat();
     }
 
-    /**
-     * 销毁:取消订阅 → 关闭连接 → 关闭线程池。
-     * <p>注意:先 closeBlocking 再 shutdown sharedExecutor,
-     * 避免 onClose 回调中的 reconnectWithBackoff 访问已关闭的线程池。
-     */
     public void destroy() {
         log.info("[WS] 开始销毁...");
 
         if (webSocketClient != null && webSocketClient.isOpen()) {
-            for (GateChannelHandler handler : channelHandlers) {
+            for (OkxChannelHandler handler : channelHandlers) {
                 handler.unsubscribe(webSocketClient);
             }
             try {
@@ -150,17 +163,13 @@
         log.info("[WS] 销毁完成");
     }
 
-    /**
-     * 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。
-     * 连接成功后依次订阅所有已注册的频道处理器。
-     */
     private void connect() {
         if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
             log.info("[WS] 连接进行中,跳过重复请求");
             return;
         }
         try {
-            SSLConfig.configureSSL();
+            OkxWsUtil.configureSSL();
             System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
             URI uri = new URI(wsUrl);
             if (webSocketClient != null) {
@@ -169,15 +178,19 @@
             webSocketClient = new WebSocketClient(uri) {
                 @Override
                 public void onOpen(ServerHandshake handshake) {
-                    log.info("[WS] 连接成功");
+                    log.info("[WS] 连接成功, isPrivate:{}", isPrivate);
                     isConnected.set(true);
                     isConnecting.set(false);
                     if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
                         resetHeartbeatTimer();
-                        for (GateChannelHandler handler : channelHandlers) {
-                            handler.subscribe(webSocketClient);
+                        if (isPrivate) {
+                            websocketLogin();
+                        } else {
+                            for (OkxChannelHandler handler : channelHandlers) {
+                                handler.subscribe(webSocketClient);
+                            }
+                            sendPing();
                         }
-                        sendPing();
                     } else {
                         log.warn("[WS] 应用正在关闭,忽略连接成功回调");
                     }
@@ -218,53 +231,61 @@
         }
     }
 
-    /**
-     * 消息分发:先处理系统事件(pong/subscribe/error),
-     * 再把 update/all 事件路由到各 channelHandler。
-     * <p>每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。
-     */
     private void handleMessage(String message) {
         try {
-            JSONObject response = JSON.parseObject(message);
-            String channel = response.getString("channel");
-            String event = response.getString("event");
-
-            if (FUTURES_PONG.equals(channel)) {
-                log.debug("[WS] 收到 pong 响应");
+            if ("pong".equals(message)) {
+                log.debug("[WS] 收到心跳响应");
                 cancelPongTimeout();
                 return;
             }
+            JSONObject response = JSON.parseObject(message);
+            String event = response.getString("event");
+
+            if ("login".equals(event)) {
+                String code = response.getString("code");
+                if ("0".equals(code)) {
+                    log.info("[WS] WebSocket登录成功");
+                    for (OkxChannelHandler handler : channelHandlers) {
+                        handler.subscribe(webSocketClient);
+                    }
+                    sendPing();
+                } else {
+                    log.error("[WS] WebSocket登录失败, code:{}, msg:{}", code, response.getString("msg"));
+                }
+                return;
+            }
             if ("subscribe".equals(event)) {
-                log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result"));
+                log.info("[WS] 订阅成功: {}", response.getJSONObject("arg"));
                 return;
             }
             if ("unsubscribe".equals(event)) {
-                log.info("[WS] {} 取消订阅成功", channel);
+                log.info("[WS] 取消订阅成功: {}", response.getJSONObject("arg"));
                 return;
             }
             if ("error".equals(event)) {
-                JSONObject error = response.getJSONObject("error");
-                log.error("[WS] {} 错误, code:{}, msg:{}",
-                        channel,
-                        error != null ? error.getInteger("code") : "N/A",
-                        error != null ? error.getString("message") : response.getString("msg"));
+                log.error("[WS] 错误, code:{}, msg:{}",
+                        response.getString("code"), response.getString("msg"));
                 return;
             }
-            if ("update".equals(event) || "all".equals(event)) {
-                for (GateChannelHandler handler : channelHandlers) {
-                    if (handler.handleMessage(response)) return;
-                }
+            if ("channel-conn-count".equals(event)) {
+                return;
+            }
+            String op = response.getString("op");
+            if ("order".equals(op) || "batch-orders".equals(op)) {
+                log.info("[WS] 收到下单推送结果: {}", JSON.toJSONString(response.get("data")));
+                return;
+            }
+            for (OkxChannelHandler handler : channelHandlers) {
+                if (handler.handleMessage(response)) return;
             }
         } catch (Exception e) {
             log.error("[WS] 处理消息失败: {}", message, e);
         }
     }
 
-    // ---- heartbeat ----
-
     private void startHeartbeat() {
         if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow();
-        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; });
+        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okxApi-heartbeat"); t.setDaemon(true); return t; });
         heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
     }
 
@@ -283,10 +304,7 @@
     private void sendPing() {
         try {
             if (webSocketClient != null && webSocketClient.isOpen()) {
-                JSONObject pingMsg = new JSONObject();
-                pingMsg.put("time", System.currentTimeMillis() / 1000);
-                pingMsg.put("channel", FUTURES_PING);
-                webSocketClient.send(pingMsg.toJSONString());
+                webSocketClient.send("ping");
                 log.debug("[WS] 发送 ping 请求");
             }
         } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); }
@@ -295,8 +313,6 @@
     private synchronized void cancelPongTimeout() {
         if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
     }
-
-    // ---- reconnect ----
 
     private void reconnectWithBackoff() throws InterruptedException {
         int attempt = 0, maxAttempts = 3;

--
Gitblit v1.9.1