13 files deleted
1 files renamed
16 files added
49770 ■■■■ changed files
src/main/java/com/xcong/excoin/modules/gateApi/Example.java 76 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/GateConfig.java 186 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/GateGridTradeService.java 636 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/GateTradeExecutor.java 209 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/GateWebSocketClientMain.java 21 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/GateWebSocketClientManager.java 100 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/gate-websocket.txt 5011 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/gateApi-logic.md 517 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/AbstractPrivateChannelHandler.java 146 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/GateChannelHandler.java 44 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/CandlestickChannelHandler.java 102 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/PositionClosesChannelHandler.java 63 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/PositionsChannelHandler.java 99 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java 181 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxGridTradeService.java 416 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java 184 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java 125 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java 296 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientMain.java 23 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java 118 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/OkxWsUtil.java 132 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/enums/OkxEnums.java 29 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/okxApi-logic.md 724 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/okx文档.txt 39846 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/param/TradeRequestParam.java 41 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/OkxChannelHandler.java 44 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxAccountChannelHandler.java 90 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxCandlestickChannelHandler.java 93 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxOrderInfoChannelHandler.java 121 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxPositionsChannelHandler.java 97 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/gateApi/Example.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/GateConfig.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/GateGridTradeService.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/GateTradeExecutor.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/GateWebSocketClientMain.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/GateWebSocketClientManager.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/gate-websocket.txt
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/gateApi-logic.md
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/AbstractPrivateChannelHandler.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/GateChannelHandler.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/CandlestickChannelHandler.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/PositionClosesChannelHandler.java
File was deleted
src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/PositionsChannelHandler.java
File was deleted
src/main/java/com/xcong/excoin/modules/okxApi/OkxConfig.java
New file
@@ -0,0 +1,181 @@
package com.xcong.excoin.modules.okxApi;
import java.math.BigDecimal;
/**
 * OKX 模块统一配置。
 *
 * <p>通过 Builder 模式集中管理所有运行参数,避免参数散落在多个文件中。
 * 提供 REST API 和 WebSocket 地址的自动环境切换(模拟盘/实盘)。
 *
 * <h3>使用示例</h3>
 * <pre>
 *   OkxConfig config = OkxConfig.builder()
 *       .apiKey("...")
 *       .secretKey("...")
 *       .passphrase("...")
 *       .contract("BTC-USDT-SWAP")
 *       .leverage("100")
 *       .gridRate(new BigDecimal("0.0035"))
 *       .contractMultiplier(new BigDecimal("1"))
 *       .isProduction(false)
 *       .build();
 *
 *   String wsKlineUrl = config.getWsKlineUrl();
 *   String wsPrivateUrl = config.getWsPrivateUrl();
 * </pre>
 *
 * <h3>默认值</h3>
 * <ul>
 *   <li>合约: BTC-USDT-SWAP, 杠杆: 100x, 全仓</li>
 *   <li>网格间距: 0.35%, 队列容量: 50, 保证金比例上限: 20%</li>
 *   <li>止盈: 5 USDT, 亏损上限: 15 USDT</li>
 *   <li>数量: 1 张, 合约乘数: 1, 环境: 模拟盘</li>
 * </ul>
 *
 * @author Administrator
 */
public class OkxConfig {
    public enum PnLPriceMode {
        LAST_PRICE,
        MARK_PRICE
    }
    private final String apiKey;
    private final String secretKey;
    private final String passphrase;
    private final String contract;
    private final String leverage;
    private final String marginMode;
    private final String posMode;
    private final BigDecimal gridRate;
    private final BigDecimal overallTp;
    private final BigDecimal maxLoss;
    private final String quantity;
    private final boolean isProduction;
    private final int gridQueueSize;
    private final BigDecimal marginRatioLimit;
    private final BigDecimal contractMultiplier;
    private final PnLPriceMode unrealizedPnlPriceMode;
    private OkxConfig(Builder builder) {
        this.apiKey = builder.apiKey;
        this.secretKey = builder.secretKey;
        this.passphrase = builder.passphrase;
        this.contract = builder.contract;
        this.leverage = builder.leverage;
        this.marginMode = builder.marginMode;
        this.posMode = builder.posMode;
        this.gridRate = builder.gridRate;
        this.overallTp = builder.overallTp;
        this.maxLoss = builder.maxLoss;
        this.quantity = builder.quantity;
        this.isProduction = builder.isProduction;
        this.gridQueueSize = builder.gridQueueSize;
        this.marginRatioLimit = builder.marginRatioLimit;
        this.contractMultiplier = builder.contractMultiplier;
        this.unrealizedPnlPriceMode = builder.unrealizedPnlPriceMode;
    }
    // ==================== WS 地址 ====================
    public String getWsKlineUrl() {
        return isProduction
                ? "wss://ws.okx.com:8443/ws/v5/business"
                : "wss://wspap.okx.com:8443/ws/v5/business";
    }
    public String getWsPrivateUrl() {
        return isProduction
                ? "wss://ws.okx.com:8443/ws/v5/private"
                : "wss://wspap.okx.com:8443/ws/v5/private";
    }
    // ==================== REST 地址 ====================
    public String getRestBaseUrl() {
        return isProduction
                ? "https://www.okx.com"
                : "https://www.okx.cab";
    }
    // ==================== 认证信息 ====================
    public String getApiKey() { return apiKey; }
    public String getSecretKey() { return secretKey; }
    public String getPassphrase() { return passphrase; }
    // ==================== 交易标的 ====================
    public String getContract() { return contract; }
    public String getLeverage() { return leverage; }
    // ==================== 持仓配置 ====================
    public String getMarginMode() { return marginMode; }
    public String getPosMode() { return posMode; }
    // ==================== 策略参数 ====================
    public BigDecimal getGridRate() { return gridRate; }
    public BigDecimal getOverallTp() { return overallTp; }
    public BigDecimal getMaxLoss() { return maxLoss; }
    public String getQuantity() { return quantity; }
    public int getGridQueueSize() { return gridQueueSize; }
    // ==================== 风险控制 ====================
    public BigDecimal getMarginRatioLimit() { return marginRatioLimit; }
    // ==================== 盈亏计算 ====================
    public BigDecimal getContractMultiplier() { return contractMultiplier; }
    public PnLPriceMode getUnrealizedPnlPriceMode() { return unrealizedPnlPriceMode; }
    // ==================== 环境 ====================
    public boolean isProduction() { return isProduction; }
    public static Builder builder() {
        return new Builder();
    }
    public static class Builder {
        private String apiKey;
        private String secretKey;
        private String passphrase;
        private String contract = "BTC-USDT-SWAP";
        private String leverage = "100";
        private String marginMode = "cross";
        private String posMode = "long_short_mode";
        private BigDecimal gridRate = new BigDecimal("0.0035");
        private BigDecimal overallTp = new BigDecimal("5");
        private BigDecimal maxLoss = new BigDecimal("15");
        private String quantity = "1";
        private boolean isProduction = false;
        private int gridQueueSize = 50;
        private BigDecimal marginRatioLimit = new BigDecimal("0.2");
        private BigDecimal contractMultiplier = new BigDecimal("1");
        private PnLPriceMode unrealizedPnlPriceMode = PnLPriceMode.LAST_PRICE;
        public Builder apiKey(String apiKey) { this.apiKey = apiKey; return this; }
        public Builder secretKey(String secretKey) { this.secretKey = secretKey; return this; }
        public Builder passphrase(String passphrase) { this.passphrase = passphrase; return this; }
        public Builder contract(String contract) { this.contract = contract; return this; }
        public Builder leverage(String leverage) { this.leverage = leverage; return this; }
        public Builder marginMode(String marginMode) { this.marginMode = marginMode; return this; }
        public Builder posMode(String posMode) { this.posMode = posMode; return this; }
        public Builder gridRate(BigDecimal gridRate) { this.gridRate = gridRate; return this; }
        public Builder overallTp(BigDecimal overallTp) { this.overallTp = overallTp; return this; }
        public Builder maxLoss(BigDecimal maxLoss) { this.maxLoss = maxLoss; return this; }
        public Builder quantity(String quantity) { this.quantity = quantity; return this; }
        public Builder isProduction(boolean isProduction) { this.isProduction = isProduction; return this; }
        public Builder contractMultiplier(BigDecimal contractMultiplier) { this.contractMultiplier = contractMultiplier; return this; }
        public Builder unrealizedPnlPriceMode(PnLPriceMode mode) { this.unrealizedPnlPriceMode = mode; return this; }
        public OkxConfig build() {
            return new OkxConfig(this);
        }
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/OkxGridTradeService.java
New file
@@ -0,0 +1,416 @@
package com.xcong.excoin.modules.okxApi;
import com.xcong.excoin.modules.okxApi.enums.OkxEnums;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@Slf4j
public class OkxGridTradeService {
    public enum StrategyState {
        WAITING_KLINE, OPENING, ACTIVE, STOPPED
    }
    private final String accountName;
    private final OkxConfig config;
    private final OkxTradeExecutor executor;
    private volatile StrategyState state = StrategyState.WAITING_KLINE;
    private final List<BigDecimal> shortPriceQueue = Collections.synchronizedList(new ArrayList<>());
    private final List<BigDecimal> longPriceQueue = Collections.synchronizedList(new ArrayList<>());
    private BigDecimal shortBaseEntryPrice;
    private BigDecimal longBaseEntryPrice;
    private volatile boolean baseLongOpened = false;
    private volatile boolean baseShortOpened = false;
    private volatile boolean shortActive = false;
    private volatile boolean longActive = false;
    private volatile BigDecimal lastKlinePrice;
    private volatile BigDecimal cumulativePnl = BigDecimal.ZERO;
    private volatile BigDecimal unrealizedPnl = BigDecimal.ZERO;
    private volatile BigDecimal longEntryPrice = BigDecimal.ZERO;
    private volatile BigDecimal shortEntryPrice = BigDecimal.ZERO;
    private volatile BigDecimal longPositionSize = BigDecimal.ZERO;
    private volatile BigDecimal shortPositionSize = BigDecimal.ZERO;
    public OkxGridTradeService(OkxConfig config, String accountName) {
        this.config = config;
        this.accountName = accountName;
        this.executor = new OkxTradeExecutor(config.getContract(), config.getMarginMode(), accountName);
    }
    public void setWebSocketClient(WebSocketClient wsClient) {
        this.executor.setWebSocketClient(wsClient);
    }
    public void startGrid() {
        if (state != StrategyState.WAITING_KLINE && state != StrategyState.STOPPED) {
            log.warn("[{}] 策略已在运行中, state:{}", config.getContract(), state);
            return;
        }
        state = StrategyState.WAITING_KLINE;
        cumulativePnl = BigDecimal.ZERO;
        unrealizedPnl = BigDecimal.ZERO;
        longEntryPrice = BigDecimal.ZERO;
        shortEntryPrice = BigDecimal.ZERO;
        longPositionSize = BigDecimal.ZERO;
        shortPositionSize = BigDecimal.ZERO;
        baseLongOpened = false;
        baseShortOpened = false;
        longActive = false;
        shortActive = false;
        shortPriceQueue.clear();
        longPriceQueue.clear();
        log.info("[{}] 网格策略已启动", config.getContract());
    }
    public void stopGrid() {
        state = StrategyState.STOPPED;
        executor.cancelAllPriceTriggeredOrders();
        executor.shutdown();
        log.info("[{}] 策略已停止, 累计盈亏: {}", config.getContract(), cumulativePnl);
    }
    public void onKline(BigDecimal closePrice) {
        lastKlinePrice = closePrice;
        updateUnrealizedPnl();
        if (state == StrategyState.STOPPED) {
            return;
        }
        if (state == StrategyState.WAITING_KLINE) {
            state = StrategyState.OPENING;
            log.info("[{}] 首根K线到达,开基底仓位...", config.getContract());
            executor.openLong(config.getQuantity(), () -> {
                log.info("[{}] 基底多单已提交", config.getContract());
            }, null);
            executor.openShort(config.getQuantity(), () -> {
                log.info("[{}] 基底空单已提交", config.getContract());
            }, null);
            return;
        }
        if (state != StrategyState.ACTIVE) {
            return;
        }
        processShortGrid(closePrice);
        processLongGrid(closePrice);
    }
    public void onPositionUpdate(String posSide, BigDecimal size, BigDecimal entryPrice) {
        if (state == StrategyState.STOPPED || state == StrategyState.WAITING_KLINE) {
            return;
        }
        boolean hasPosition = size.compareTo(BigDecimal.ZERO) > 0;
        if (OkxEnums.POSSIDE_LONG.equals(posSide)) {
            if (hasPosition) {
                longActive = true;
                longEntryPrice = entryPrice;
                if (!baseLongOpened) {
                    longPositionSize = size;
                    longBaseEntryPrice = entryPrice;
                    baseLongOpened = true;
                    log.info("[{}] 基底多成交价: {}", config.getContract(), longBaseEntryPrice);
                    tryGenerateQueues();
                } else if (size.compareTo(longPositionSize) > 0) {
                    longPositionSize = size;
                    if (longPriceQueue.isEmpty()) {
                        log.warn("[{}] 多仓队列为空,无法设止盈", config.getContract());
                    } else {
                        BigDecimal tpPrice = longPriceQueue.get(0);
                        executor.placeTakeProfit(tpPrice, OkxEnums.ORDER_TYPE_CLOSE_LONG, config.getQuantity());
                        log.info("[{}] 多单止盈已设, entry:{}, tp:{}, size:{}", config.getContract(), entryPrice, tpPrice, config.getQuantity());
                    }
                } else {
                    longPositionSize = size;
                }
            } else {
                longActive = false;
                longPositionSize = BigDecimal.ZERO;
            }
        } else if (OkxEnums.POSSIDE_SHORT.equals(posSide)) {
            if (hasPosition) {
                shortActive = true;
                shortEntryPrice = entryPrice;
                if (!baseShortOpened) {
                    shortPositionSize = size;
                    shortBaseEntryPrice = entryPrice;
                    baseShortOpened = true;
                    log.info("[{}] 基底空成交价: {}", config.getContract(), shortBaseEntryPrice);
                    tryGenerateQueues();
                } else if (size.compareTo(shortPositionSize) > 0) {
                    shortPositionSize = size;
                    if (shortPriceQueue.isEmpty()) {
                        log.warn("[{}] 空仓队列为空,无法设止盈", config.getContract());
                    } else {
                        BigDecimal tpPrice = shortPriceQueue.get(0);
                        executor.placeTakeProfit(tpPrice, OkxEnums.ORDER_TYPE_CLOSE_SHORT, config.getQuantity());
                        log.info("[{}] 空单止盈已设, entry:{}, tp:{}, size:{}", config.getContract(), entryPrice, tpPrice, config.getQuantity());
                    }
                } else {
                    shortPositionSize = size;
                }
            } else {
                shortActive = false;
                shortPositionSize = BigDecimal.ZERO;
            }
        }
    }
    public void onOrderFilled(String posSide, BigDecimal fillSz, BigDecimal pnl) {
        if (state == StrategyState.STOPPED) {
            return;
        }
        cumulativePnl = cumulativePnl.add(pnl);
        log.info("[{}] 订单成交盈亏: {}, 方向:{}, 累计:{}", config.getContract(), pnl, posSide, cumulativePnl);
        if (cumulativePnl.compareTo(config.getOverallTp()) >= 0) {
            log.info("[{}] 已达止盈目标 {}→已停止", config.getContract(), cumulativePnl);
            state = StrategyState.STOPPED;
        } else if (cumulativePnl.compareTo(config.getMaxLoss().negate()) <= 0) {
            log.info("[{}] 已达亏损上限 {}→已停止", config.getContract(), cumulativePnl);
            state = StrategyState.STOPPED;
        }
    }
    private void tryGenerateQueues() {
        if (baseLongOpened && baseShortOpened) {
            generateShortQueue();
            generateLongQueue();
            state = StrategyState.ACTIVE;
            log.info("[{}] 网格队列已生成, 空队首:{} → 尾:{}, 多队首:{} → 尾:{}, 已激活",
                    config.getContract(),
                    shortPriceQueue.isEmpty() ? "N/A" : shortPriceQueue.get(0),
                    shortPriceQueue.isEmpty() ? "N/A" : shortPriceQueue.get(shortPriceQueue.size() - 1),
                    longPriceQueue.isEmpty() ? "N/A" : longPriceQueue.get(0),
                    longPriceQueue.isEmpty() ? "N/A" : longPriceQueue.get(longPriceQueue.size() - 1));
        }
    }
    private void generateShortQueue() {
        shortPriceQueue.clear();
        BigDecimal fixedStep = shortBaseEntryPrice.multiply(config.getGridRate()).setScale(1, RoundingMode.HALF_UP);
        BigDecimal prev = shortBaseEntryPrice;
        for (int i = 0; i < config.getGridQueueSize(); i++) {
            prev = prev.subtract(fixedStep).setScale(1, RoundingMode.HALF_UP);
            shortPriceQueue.add(prev);
        }
        shortPriceQueue.sort((a, b) -> b.compareTo(a));
        log.info("[{}] 空队列:{} 步长:{}", config.getContract(), shortPriceQueue, fixedStep);
    }
    private void generateLongQueue() {
        longPriceQueue.clear();
        BigDecimal fixedStep = longBaseEntryPrice.multiply(config.getGridRate()).setScale(1, RoundingMode.HALF_UP);
        BigDecimal prev = longBaseEntryPrice;
        for (int i = 0; i < config.getGridQueueSize(); i++) {
            prev = prev.add(fixedStep).setScale(1, RoundingMode.HALF_UP);
            longPriceQueue.add(prev);
        }
        Collections.sort(longPriceQueue);
        log.info("[{}] 多队列:{} 步长:{}", config.getContract(), longPriceQueue, fixedStep);
    }
    /**
     * 空仓网格处理(价格跌破空仓队列中的高价)。
     *
     * <h3>匹配规则</h3>
     * 遍历空仓队列(降序),收集所有大于当前价的元素为 matched。
     * 队列为降序排列,一旦遇 price ≤ currentPrice 即停止遍历。
     *
     * <h3>执行流程</h3>
     * <ol>
     *   <li>匹配队列元素 → 为空则直接返回</li>
     *   <li>空仓队列:移除 matched 元素,尾部以固定步长(shortBasePrice × gridRate)递减补充新元素</li>
     *   <li>多仓队列:以多仓队列首元素(最小价)为种子,以多仓固定步长递减加入新元素</li>
     *   <li>保证金检查 → 安全则开空一次</li>
     *   <li>额外反向开多:若多仓均价 > 空仓均价 且 当前价夹在中间且远离多仓均价</li>
     * </ol>
     *
     * <h3>多仓队列转移过滤</h3>
     * 新增元素若与多仓持仓均价差距小于 gridRate,则跳过该元素(避免在持仓成本附近生成无效网格线)。
     */
    private void processShortGrid(BigDecimal currentPrice) {
        List<BigDecimal> matched = new ArrayList<>();
        synchronized (shortPriceQueue) {
            for (BigDecimal p : shortPriceQueue) {
                if (p.compareTo(currentPrice) > 0) {
                    matched.add(p);
                } else {
                    break;
                }
            }
        }
        log.info("[{}] 原空队列:{}", config.getContract(), shortPriceQueue);
        if (matched.isEmpty()) {
            log.info("[{}] 空仓队列未触发, 当前价:{}", config.getContract(), currentPrice);
            return;
        }
        log.info("[{}] 空仓队列触发, 匹配{}个元素, 当前价:{}", config.getContract(), matched.size(), currentPrice);
        BigDecimal shortFixedStep = shortBaseEntryPrice.multiply(config.getGridRate()).setScale(1, RoundingMode.HALF_UP);
        BigDecimal longFixedStep = longBaseEntryPrice.multiply(config.getGridRate()).setScale(1, RoundingMode.HALF_UP);
        replenishOwnQueue(shortPriceQueue, matched, shortFixedStep, true, "空");
        transferBetweenQueues(longPriceQueue, matched, matched.get(matched.size() - 1),
                longFixedStep, false, longEntryPrice, BigDecimal::compareTo, "多", currentPrice);
        if (!isMarginSafe()) {
            log.warn("[{}] 保证金超限,跳过空单开仓", config.getContract());
        } else {
            executor.openShort(config.getQuantity(), null, null);
            if (shortEntryPrice.compareTo(BigDecimal.ZERO) > 0
                    && longEntryPrice.compareTo(BigDecimal.ZERO) > 0
                    && longEntryPrice.compareTo(shortEntryPrice) > 0
                    && currentPrice.compareTo(shortEntryPrice) > 0
                    && currentPrice.compareTo(longEntryPrice.subtract(longFixedStep)) < 0) {
                executor.openLong(config.getQuantity(), null, null);
                log.info("[{}] 触发价在多/空持仓价之间且多>空且远离多仓均价, 额外开多一次, 当前价:{}", config.getContract(), currentPrice);
            }
        }
    }
    /**
     * 多仓网格处理(价格涨破多仓队列中的低价)。
     *
     * <h3>匹配规则</h3>
     * 遍历多仓队列(升序),收集所有小于当前价的元素为 matched。
     * 队列为升序排列,一旦遇 price ≥ currentPrice 即停止遍历。
     *
     * <h3>执行流程</h3>
     * <ol>
     *   <li>匹配队列元素 → 为空则直接返回</li>
     *   <li>多仓队列:移除 matched 元素,尾部以固定步长(longBasePrice × gridRate)递增补充新元素</li>
     *   <li>空仓队列:以空仓队列首元素(最高价)为种子,以空仓固定步长递增加入新元素</li>
     *   <li>保证金检查 → 安全则开多一次</li>
     *   <li>额外反向开空:若多仓均价 > 空仓均价 且 当前价夹在中间且远离空仓均价</li>
     * </ol>
     *
     * <h3>空仓队列转移过滤</h3>
     * 新增元素若与空仓持仓均价差距小于 gridRate,则跳过该元素(避免在持仓成本附近生成无效网格线)。
     */
    private void processLongGrid(BigDecimal currentPrice) {
        List<BigDecimal> matched = new ArrayList<>();
        synchronized (longPriceQueue) {
            for (BigDecimal p : longPriceQueue) {
                if (p.compareTo(currentPrice) < 0) {
                    matched.add(p);
                } else {
                    break;
                }
            }
        }
        log.info("[{}] 原多队列:{}", config.getContract(), longPriceQueue);
        if (matched.isEmpty()) {
            log.info("[{}] 多仓队列未触发, 当前价:{}", config.getContract(), currentPrice);
            return;
        }
        log.info("[{}] 多仓队列触发, 匹配{}个元素, 当前价:{}", config.getContract(), matched.size(), currentPrice);
        BigDecimal longFixedStep = longBaseEntryPrice.multiply(config.getGridRate()).setScale(1, RoundingMode.HALF_UP);
        BigDecimal shortFixedStep = shortBaseEntryPrice.multiply(config.getGridRate()).setScale(1, RoundingMode.HALF_UP);
        replenishOwnQueue(longPriceQueue, matched, longFixedStep, false, "多");
        transferBetweenQueues(shortPriceQueue, matched, matched.get(0),
                shortFixedStep, true, shortEntryPrice, (a, b) -> b.compareTo(a), "空", currentPrice);
        if (!isMarginSafe()) {
            log.warn("[{}] 保证金超限,跳过多单开仓", config.getContract());
        } else {
            executor.openLong(config.getQuantity(), null, null);
            if (shortEntryPrice.compareTo(BigDecimal.ZERO) > 0
                    && longEntryPrice.compareTo(BigDecimal.ZERO) > 0
                    && longEntryPrice.compareTo(shortEntryPrice) > 0
                    && currentPrice.compareTo(shortEntryPrice.add(shortFixedStep)) > 0
                    && currentPrice.compareTo(longEntryPrice) < 0) {
                executor.openShort(config.getQuantity(), null, null);
                log.info("[{}] 触发价在多/空持仓价之间且多>空且远离空仓均价, 额外开空一次, 当前价:{}", config.getContract(), currentPrice);
            }
        }
    }
    private void replenishOwnQueue(List<BigDecimal> queue, List<BigDecimal> matched, BigDecimal fixedStep,
                                    boolean isShort, String label) {
        synchronized (queue) {
            queue.removeAll(matched);
            BigDecimal tail = queue.isEmpty() ? matched.get(matched.size() - 1) : queue.get(queue.size() - 1);
            Comparator<BigDecimal> comparator = isShort ? (a, b) -> b.compareTo(a) : BigDecimal::compareTo;
            for (int i = 0; i < matched.size(); i++) {
                tail = isShort
                        ? tail.subtract(fixedStep).setScale(1, RoundingMode.HALF_UP)
                        : tail.add(fixedStep).setScale(1, RoundingMode.HALF_UP);
                queue.add(tail);
                log.info("[{}] {}队列增加:{}", config.getContract(), label, tail);
            }
            queue.sort(comparator);
            log.info("[{}] 现{}队列:{}", config.getContract(), label, queue);
        }
    }
    private void transferBetweenQueues(List<BigDecimal> targetQueue, List<BigDecimal> matched,
                                        BigDecimal firstFallback, BigDecimal fixedStep, boolean isShort,
                                        BigDecimal filterEntryPrice, Comparator<BigDecimal> comparator,
                                        String label, BigDecimal currentPrice) {
        synchronized (targetQueue) {
            BigDecimal first = targetQueue.isEmpty() ? firstFallback : targetQueue.get(0);
            for (int i = 1; i <= matched.size(); i++) {
                BigDecimal offset = fixedStep.multiply(BigDecimal.valueOf(i));
                BigDecimal elem = isShort
                        ? first.add(offset).setScale(1, RoundingMode.HALF_UP)
                        : first.subtract(offset).setScale(1, RoundingMode.HALF_UP);
                if (filterEntryPrice.compareTo(BigDecimal.ZERO) > 0
                        && currentPrice.subtract(filterEntryPrice).abs().compareTo(filterEntryPrice.multiply(config.getGridRate())) < 0) {
                    log.info("[{}] {}队列跳过(price≈entry):{}", config.getContract(), label, elem);
                    continue;
                }
                targetQueue.add(elem);
                log.info("[{}] {}队列增加:{}", config.getContract(), label, elem);
            }
            targetQueue.sort(comparator);
            while (targetQueue.size() > config.getGridQueueSize()) {
                targetQueue.remove(targetQueue.size() - 1);
            }
            log.info("[{}] 现{}队列:{}", config.getContract(), label, targetQueue);
        }
    }
    private boolean isMarginSafe() {
        return true;
    }
    private void updateUnrealizedPnl() {
        BigDecimal price = lastKlinePrice;
        if (price == null || price.compareTo(BigDecimal.ZERO) == 0) {
            return;
        }
        BigDecimal multiplier = config.getContractMultiplier();
        BigDecimal longPnl = BigDecimal.ZERO;
        BigDecimal shortPnl = BigDecimal.ZERO;
        if (longPositionSize.compareTo(BigDecimal.ZERO) > 0 && longEntryPrice.compareTo(BigDecimal.ZERO) > 0) {
            longPnl = longPositionSize.multiply(multiplier).multiply(price.subtract(longEntryPrice));
        }
        if (shortPositionSize.compareTo(BigDecimal.ZERO) > 0 && shortEntryPrice.compareTo(BigDecimal.ZERO) > 0) {
            shortPnl = shortPositionSize.multiply(multiplier).multiply(shortEntryPrice.subtract(price));
        }
        unrealizedPnl = longPnl.add(shortPnl);
        log.info("[{}] 未实现盈亏: {}", config.getContract(), unrealizedPnl);
    }
    public BigDecimal getLastKlinePrice() { return lastKlinePrice; }
    public boolean isStrategyActive() { return state != StrategyState.STOPPED && state != StrategyState.WAITING_KLINE; }
    public BigDecimal getCumulativePnl() { return cumulativePnl; }
    public BigDecimal getUnrealizedPnl() { return unrealizedPnl; }
    public StrategyState getState() { return state; }
    public String getAccountName() { return accountName; }
}
src/main/java/com/xcong/excoin/modules/okxApi/OkxKlineWebSocketClient.java
File was renamed from src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java
@@ -1,9 +1,9 @@
package com.xcong.excoin.modules.gateApi;
package com.xcong.excoin.modules.okxApi;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
@@ -17,10 +17,10 @@
import java.util.concurrent.atomic.AtomicReference;
/**
 * Gate WebSocket 连接管理器。
 * OKX WebSocket 连接管理器。
 *
 * <h3>职责</h3>
 * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link GateChannelHandler} 实现类。
 * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link OkxChannelHandler} 实现类。
 *
 * <h3>生命周期</h3>
 * <pre>
@@ -32,73 +32,91 @@
 * <h3>消息路由</h3>
 * <pre>
 *   onMessage → handleMessage:
 *     1. futures.pong         → cancelPongTimeout
 *     2. subscribe/unsubscribe → 日志
 *     3. error                → 错误日志
 *     4. update/all           → 遍历 channelHandlers → handler.handleMessage(response)
 *     1. pong                  → cancelPongTimeout
 *     2. login/subscribe/error → 日志
 *     3. order/batch-orders    → 下单结果日志
 *     4. 数据推送              → 遍历 channelHandlers → handler.handleMessage(response)
 * </pre>
 *
 * <h3>心跳机制</h3>
 * 采用双重检测:TCP 层的 WebSocket ping/pong + 应用层 futures.ping/futures.pong。
 * 10 秒未收到任何消息 → 发送 futures.ping;25 秒周期检查。
 *
 * <h3>线程安全</h3>
 * 连接状态用 AtomicBoolean(isConnected, isConnecting, isInitialized)。
 * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。
 * 10 秒未收到任何消息 → 发送 ping;25 秒周期检查。
 *
 * @author Administrator
 */
@SuppressWarnings("ALL")
@Slf4j
public class GateKlineWebSocketClient {
public class OkxKlineWebSocketClient {
    private static final String FUTURES_PING = "futures.ping";
    private static final String FUTURES_PONG = "futures.pong";
    private static final int HEARTBEAT_TIMEOUT = 10;
    /** WebSocket 地址,由 GateConfig 提供 */
    private final String wsUrl;
    private final boolean isPrivate;
    private final String apiKey;
    private final String secretKey;
    private final String passphrase;
    /** Java-WebSocket 客户端实例 */
    private WebSocketClient webSocketClient;
    /** 心跳检测调度器 */
    private ScheduledExecutorService heartbeatExecutor;
    /** 心跳超时 Future */
    private volatile ScheduledFuture<?> pongTimeoutFuture;
    /** 最后收到消息的时间戳(毫秒) */
    private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis());
    /** 连接状态 */
    private final AtomicBoolean isConnected = new AtomicBoolean(false);
    /** 连接中标记,防重入 */
    private final AtomicBoolean isConnecting = new AtomicBoolean(false);
    /** 初始化标记,防重复 init */
    private final AtomicBoolean isInitialized = new AtomicBoolean(false);
    /** 频道处理器列表,通过 addChannelHandler 注册 */
    private final List<GateChannelHandler> channelHandlers = new ArrayList<>();
    private final List<OkxChannelHandler> channelHandlers = new ArrayList<>();
    /** 重连等异步任务的缓存线程池(daemon 线程) */
    public WebSocketClient getWebSocketClient() {
        return webSocketClient;
    }
    private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> {
        Thread t = new Thread(r, "gate-ws-worker");
        Thread t = new Thread(r, "okxApi-ws-worker");
        t.setDaemon(true);
        return t;
    });
    public GateKlineWebSocketClient(String wsUrl) {
    public OkxKlineWebSocketClient(String wsUrl) {
        this.wsUrl = wsUrl;
        this.isPrivate = false;
        this.apiKey = null;
        this.secretKey = null;
        this.passphrase = null;
    }
    /**
     * 注册频道处理器。需在 init() 前调用。
     */
    public void addChannelHandler(GateChannelHandler handler) {
    public OkxKlineWebSocketClient(String wsUrl, String apiKey, String secretKey, String passphrase) {
        this.wsUrl = wsUrl;
        this.isPrivate = true;
        this.apiKey = apiKey;
        this.secretKey = secretKey;
        this.passphrase = passphrase;
    }
    public void addChannelHandler(OkxChannelHandler handler) {
        channelHandlers.add(handler);
    }
    /**
     * 初始化:建立 WebSocket 连接 → 启动心跳。
     */
    private void websocketLogin() {
        try {
            String timestamp = String.valueOf(System.currentTimeMillis() / 1000);
            String sign = OkxWsUtil.signWebsocket(timestamp, secretKey);
            JSONArray argsArray = new JSONArray();
            JSONObject loginArgs = new JSONObject();
            loginArgs.put("apiKey", apiKey);
            loginArgs.put("passphrase", passphrase);
            loginArgs.put("timestamp", timestamp);
            loginArgs.put("sign", sign);
            argsArray.add(loginArgs);
            JSONObject login = OkxWsUtil.buildJsonObject(null, "login", argsArray);
            webSocketClient.send(login.toJSONString());
            log.info("[WS] 发送登录请求");
        } catch (Exception e) {
            log.error("[WS] 登录请求构建失败", e);
        }
    }
    public void init() {
        if (!isInitialized.compareAndSet(false, true)) {
            log.warn("[WS] 已初始化过,跳过重复初始化");
@@ -108,16 +126,11 @@
        startHeartbeat();
    }
    /**
     * 销毁:取消订阅 → 关闭连接 → 关闭线程池。
     * <p>注意:先 closeBlocking 再 shutdown sharedExecutor,
     * 避免 onClose 回调中的 reconnectWithBackoff 访问已关闭的线程池。
     */
    public void destroy() {
        log.info("[WS] 开始销毁...");
        if (webSocketClient != null && webSocketClient.isOpen()) {
            for (GateChannelHandler handler : channelHandlers) {
            for (OkxChannelHandler handler : channelHandlers) {
                handler.unsubscribe(webSocketClient);
            }
            try {
@@ -150,17 +163,13 @@
        log.info("[WS] 销毁完成");
    }
    /**
     * 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。
     * 连接成功后依次订阅所有已注册的频道处理器。
     */
    private void connect() {
        if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) {
            log.info("[WS] 连接进行中,跳过重复请求");
            return;
        }
        try {
            SSLConfig.configureSSL();
            OkxWsUtil.configureSSL();
            System.setProperty("https.protocols", "TLSv1.2,TLSv1.3");
            URI uri = new URI(wsUrl);
            if (webSocketClient != null) {
@@ -169,15 +178,19 @@
            webSocketClient = new WebSocketClient(uri) {
                @Override
                public void onOpen(ServerHandshake handshake) {
                    log.info("[WS] 连接成功");
                    log.info("[WS] 连接成功, isPrivate:{}", isPrivate);
                    isConnected.set(true);
                    isConnecting.set(false);
                    if (sharedExecutor != null && !sharedExecutor.isShutdown()) {
                        resetHeartbeatTimer();
                        for (GateChannelHandler handler : channelHandlers) {
                            handler.subscribe(webSocketClient);
                        if (isPrivate) {
                            websocketLogin();
                        } else {
                            for (OkxChannelHandler handler : channelHandlers) {
                                handler.subscribe(webSocketClient);
                            }
                            sendPing();
                        }
                        sendPing();
                    } else {
                        log.warn("[WS] 应用正在关闭,忽略连接成功回调");
                    }
@@ -218,53 +231,61 @@
        }
    }
    /**
     * 消息分发:先处理系统事件(pong/subscribe/error),
     * 再把 update/all 事件路由到各 channelHandler。
     * <p>每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。
     */
    private void handleMessage(String message) {
        try {
            JSONObject response = JSON.parseObject(message);
            String channel = response.getString("channel");
            String event = response.getString("event");
            if (FUTURES_PONG.equals(channel)) {
                log.debug("[WS] 收到 pong 响应");
            if ("pong".equals(message)) {
                log.debug("[WS] 收到心跳响应");
                cancelPongTimeout();
                return;
            }
            JSONObject response = JSON.parseObject(message);
            String event = response.getString("event");
            if ("login".equals(event)) {
                String code = response.getString("code");
                if ("0".equals(code)) {
                    log.info("[WS] WebSocket登录成功");
                    for (OkxChannelHandler handler : channelHandlers) {
                        handler.subscribe(webSocketClient);
                    }
                    sendPing();
                } else {
                    log.error("[WS] WebSocket登录失败, code:{}, msg:{}", code, response.getString("msg"));
                }
                return;
            }
            if ("subscribe".equals(event)) {
                log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result"));
                log.info("[WS] 订阅成功: {}", response.getJSONObject("arg"));
                return;
            }
            if ("unsubscribe".equals(event)) {
                log.info("[WS] {} 取消订阅成功", channel);
                log.info("[WS] 取消订阅成功: {}", response.getJSONObject("arg"));
                return;
            }
            if ("error".equals(event)) {
                JSONObject error = response.getJSONObject("error");
                log.error("[WS] {} 错误, code:{}, msg:{}",
                        channel,
                        error != null ? error.getInteger("code") : "N/A",
                        error != null ? error.getString("message") : response.getString("msg"));
                log.error("[WS] 错误, code:{}, msg:{}",
                        response.getString("code"), response.getString("msg"));
                return;
            }
            if ("update".equals(event) || "all".equals(event)) {
                for (GateChannelHandler handler : channelHandlers) {
                    if (handler.handleMessage(response)) return;
                }
            if ("channel-conn-count".equals(event)) {
                return;
            }
            String op = response.getString("op");
            if ("order".equals(op) || "batch-orders".equals(op)) {
                log.info("[WS] 收到下单推送结果: {}", JSON.toJSONString(response.get("data")));
                return;
            }
            for (OkxChannelHandler handler : channelHandlers) {
                if (handler.handleMessage(response)) return;
            }
        } catch (Exception e) {
            log.error("[WS] 处理消息失败: {}", message, e);
        }
    }
    // ---- heartbeat ----
    private void startHeartbeat() {
        if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow();
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; });
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "okxApi-heartbeat"); t.setDaemon(true); return t; });
        heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS);
    }
@@ -283,10 +304,7 @@
    private void sendPing() {
        try {
            if (webSocketClient != null && webSocketClient.isOpen()) {
                JSONObject pingMsg = new JSONObject();
                pingMsg.put("time", System.currentTimeMillis() / 1000);
                pingMsg.put("channel", FUTURES_PING);
                webSocketClient.send(pingMsg.toJSONString());
                webSocketClient.send("ping");
                log.debug("[WS] 发送 ping 请求");
            }
        } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); }
@@ -295,8 +313,6 @@
    private synchronized void cancelPongTimeout() {
        if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true);
    }
    // ---- reconnect ----
    private void reconnectWithBackoff() throws InterruptedException {
        int attempt = 0, maxAttempts = 3;
src/main/java/com/xcong/excoin/modules/okxApi/OkxRestClient.java
New file
@@ -0,0 +1,125 @@
package com.xcong.excoin.modules.okxApi;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
@Slf4j
public class OkxRestClient {
    private final String baseUrl;
    private final String apiKey;
    private final String secretKey;
    private final String passphrase;
    private final boolean isSimulate;
    public OkxRestClient(String baseUrl, String apiKey, String secretKey, String passphrase, boolean isSimulate) {
        this.baseUrl = baseUrl;
        this.apiKey = apiKey;
        this.secretKey = secretKey;
        this.passphrase = passphrase;
        this.isSimulate = isSimulate;
    }
    public boolean setPositionMode(String posMode) {
        JSONObject params = new JSONObject();
        params.put("posMode", posMode);
        JSONObject result = post("/api/v5/account/set-position-mode", params);
        return isSuccess(result, "设置持仓方式");
    }
    public boolean setLeverage(String instId, String lever, String mgnMode) {
        JSONObject params = new JSONObject();
        params.put("instId", instId);
        params.put("lever", lever);
        params.put("mgnMode", mgnMode);
        JSONObject result = post("/api/v5/account/set-leverage", params);
        return isSuccess(result, "设置杠杆");
    }
    private JSONObject post(String path, JSONObject body) {
        HttpURLConnection conn = null;
        try {
            String bodyStr = body.toJSONString();
            String timestamp = OkxWsUtil.getIso8601Timestamp();
            String sign = OkxWsUtil.signRest(timestamp, "POST", path, bodyStr, secretKey);
            URL url = new URL(baseUrl + path);
            conn = (HttpURLConnection) url.openConnection();
            conn.setRequestMethod("POST");
            conn.setDoOutput(true);
            conn.setConnectTimeout(15000);
            conn.setReadTimeout(15000);
            conn.setRequestProperty("Content-Type", "application/json");
            conn.setRequestProperty("OK-ACCESS-KEY", apiKey);
            conn.setRequestProperty("OK-ACCESS-SIGN", sign);
            conn.setRequestProperty("OK-ACCESS-TIMESTAMP", timestamp);
            conn.setRequestProperty("OK-ACCESS-PASSPHRASE", passphrase);
            if (isSimulate) {
                conn.setRequestProperty("x-simulated-trading", "1");
            }
            try (OutputStream os = conn.getOutputStream()) {
                os.write(bodyStr.getBytes(StandardCharsets.UTF_8));
                os.flush();
            }
            int code = conn.getResponseCode();
            String response = readResponse(conn);
            log.info("[REST] POST {} → HTTP {} body:{}", path, code, response);
            return JSON.parseObject(response);
        } catch (Exception e) {
            log.error("[REST] POST {} 失败: {}", path, e.getMessage());
            return null;
        } finally {
            if (conn != null) {
                conn.disconnect();
            }
        }
    }
    private String readResponse(HttpURLConnection conn) throws Exception {
        StringBuilder sb = new StringBuilder();
        String line;
        if (conn.getResponseCode() >= 200 && conn.getResponseCode() < 300) {
            try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getInputStream(), StandardCharsets.UTF_8))) {
                while ((line = br.readLine()) != null) {
                    sb.append(line);
                }
            }
        } else {
            try (BufferedReader br = new BufferedReader(new InputStreamReader(conn.getErrorStream(), StandardCharsets.UTF_8))) {
                while ((line = br.readLine()) != null) {
                    sb.append(line);
                }
            }
        }
        return sb.toString();
    }
    private boolean isSuccess(JSONObject result, String label) {
        if (result == null) {
            log.error("[REST] {}失败: 无响应", label);
            return false;
        }
        String code = result.getString("code");
        if ("0".equals(code)) {
            log.info("[REST] {}成功", label);
            return true;
        }
        if ("59000".equals(code)) {
            log.info("[REST] {}已设置(59000:配置未变更)", label);
            return true;
        }
        log.error("[REST] {}失败, code:{}, msg:{}", label, code, result.getString("msg"));
        return false;
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/OkxTradeExecutor.java
New file
@@ -0,0 +1,296 @@
package com.xcong.excoin.modules.okxApi;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.enums.OkxEnums;
import com.xcong.excoin.modules.okxApi.param.TradeRequestParam;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
 * OKX WebSocket 交易执行器。
 *
 * <h3>设计目的</h3>
 * WebSocket 消息在回调线程中处理。下单操作参数构建等逻辑
 * 提交到独立线程池异步执行,避免阻塞 WS 回调线程。
 *
 * <h3>回调设计</h3>
 * 每个下单方法接受 onSuccess/onFailure 两个 Runnable。
 * 基底开仓时 onSuccess 用于标记基底已开,网格触发时通常为 null(成交状态由仓位推送驱动)。
 *
 * <h3>线程模型</h3>
 * <ul>
 *   <li><b>单线程</b>:保证下单顺序(开多→开空→止盈单),避免并发竞争</li>
 *   <li><b>有界队列 64</b>:防止堆积。极端行情下最多累积 64 个任务</li>
 *   <li><b>CallerRunsPolicy</b>:队列满时由提交线程直接同步执行,形成自然背压</li>
 *   <li><b>allowCoreThreadTimeOut</b>:60s 空闲后线程回收,不浪费资源</li>
 * </ul>
 *
 * <h3>调用链</h3>
 * <pre>
 *   OkxGridTradeService.onKline → executor.openLong/openShort (基底双开 + 网格触发)
 *   OkxGridTradeService.onPositionUpdate → executor.placeTakeProfit (开仓成交后设止盈)
 *   OkxGridTradeService.stopGrid → executor.cancelAllPriceTriggeredOrders
 * </pre>
 *
 * @author Administrator
 */
@Slf4j
public class OkxTradeExecutor {
    private final String contract;
    private final String marginMode;
    private final String accountName;
    private volatile WebSocketClient wsClient;
    private final ExecutorService executor;
    public OkxTradeExecutor(String contract, String marginMode, String accountName) {
        this.contract = contract;
        this.marginMode = marginMode;
        this.accountName = accountName;
        this.executor = new ThreadPoolExecutor(
                1, 1,
                60L, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(64),
                r -> {
                    Thread t = new Thread(r, "okxApi-trade-worker");
                    t.setDaemon(true);
                    return t;
                },
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true);
    }
    public void setWebSocketClient(WebSocketClient wsClient) {
        this.wsClient = wsClient;
    }
    public void shutdown() {
        executor.shutdown();
        try {
            executor.awaitTermination(10, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            executor.shutdownNow();
        }
    }
    /**
     * 异步市价开多。quantity 为正数(如 "1")。
     *
     * @param quantity  开仓张数(正数)
     * @param onSuccess 成交成功回调(可为 null)
     * @param onFailure 成交失败回调(可为 null)
     */
    public void openLong(String quantity, Runnable onSuccess, Runnable onFailure) {
        openPosition(quantity, OkxEnums.POSSIDE_LONG, OkxEnums.SIDE_BUY, "开多", onSuccess, onFailure);
    }
    /**
     * 异步市价开空。quantity 为正数(如 "1")。
     *
     * @param quantity  开仓张数(正数)
     * @param onSuccess 成交成功回调(可为 null)
     * @param onFailure 成交失败回调(可为 null)
     */
    public void openShort(String quantity, Runnable onSuccess, Runnable onFailure) {
        openPosition(quantity, OkxEnums.POSSIDE_SHORT, OkxEnums.SIDE_SELL, "开空", onSuccess, onFailure);
    }
    private void openPosition(String sz, String posSide, String side, String label, Runnable onSuccess, Runnable onFailure) {
        executor.execute(() -> {
            try {
                TradeRequestParam param = buildParam(side, posSide, sz, OkxEnums.ORDTYPE_MARKET);
                sendOrder(param);
                log.info("[TradeExec] {}已发送, 方向:{}, 数量:{}", label, posSide, sz);
                if (onSuccess != null) {
                    onSuccess.run();
                }
            } catch (Exception e) {
                log.error("[TradeExec] {}发送失败", label, e);
                if (onFailure != null) {
                    onFailure.run();
                }
            }
        });
    }
    /**
     * 异步创建止盈条件单(仓位计划止盈止损)。
     *
     * <p>通过 OKX WebSocket 发送 algo order(conditional order),服务器监控价格,
     * 达到触发价后自动平指定张数。
     *
     * <h3>orderType 说明</h3>
     * <ul>
     *   <li>plan-close-long-position:平多仓,posSide=long, side=sell</li>
     *   <li>plan-close-short-position:平空仓,posSide=short, side=buy</li>
     * </ul>
     *
     * <p>止盈单创建失败时,立即市价平仓兜底(marketClose)。
     *
     * @param triggerPrice 触发价格
     * @param orderType    stop 类型(plan-close-long-position / plan-close-short-position)
     * @param size         平仓张数(正数)
     */
    public void placeTakeProfit(BigDecimal triggerPrice, String orderType, String size) {
        executor.execute(() -> {
            String posSide;
            String side;
            if (OkxEnums.ORDER_TYPE_CLOSE_LONG.equals(orderType)) {
                posSide = OkxEnums.POSSIDE_LONG;
                side = OkxEnums.SIDE_SELL;
            } else if (OkxEnums.ORDER_TYPE_CLOSE_SHORT.equals(orderType)) {
                posSide = OkxEnums.POSSIDE_SHORT;
                side = OkxEnums.SIDE_BUY;
            } else {
                log.error("[TradeExec] 未知止盈类型: {}", orderType);
                return;
            }
            try {
                if (wsClient == null || !wsClient.isOpen()) {
                    log.warn("[TradeExec] WS未连接,跳过止盈单");
                    return;
                }
                if (BigDecimal.ZERO.compareTo(new BigDecimal(size)) >= 0) {
                    log.warn("[TradeExec] 止盈数量<=0,跳过");
                    return;
                }
                JSONArray argsArray = new JSONArray();
                JSONObject args = new JSONObject();
                args.put("instId", contract);
                args.put("tdMode", marginMode);
                args.put("side", side);
                args.put("posSide", posSide);
                args.put("ordType", OkxEnums.ORDTYPE_CONDITIONAL);
                args.put("sz", size);
                args.put("tpTriggerPx", triggerPrice.toString());
                args.put("tpOrdPx", "-1");
                argsArray.add(args);
                String connId = OkxWsUtil.getOrderNum("algo");
                JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order-algo", argsArray);
                wsClient.send(msg.toJSONString());
                log.info("[TradeExec] 止盈单已发送, 触发价:{}, 类型:{}, size:{}", triggerPrice, orderType, size);
            } catch (Exception e) {
                log.error("[TradeExec] 止盈单发送失败, 触发价:{}, size:{}, 立即市价止盈", triggerPrice, size, e);
                marketClose(side, posSide, size);
            }
        });
    }
    /**
     * 市价止盈:在止盈条件单创建失败时立即市价平仓。
     */
    private void marketClose(String side, String posSide, String size) {
        try {
            TradeRequestParam param = buildParam(side, posSide, size, OkxEnums.ORDTYPE_MARKET);
            param.setTradeType("3");
            sendOrder(param);
            log.info("[TradeExec] 市价止盈已发送, posSide:{}, size:{}", posSide, size);
        } catch (Exception e) {
            log.error("[TradeExec] 市价止盈也失败, posSide:{}, size:{}", posSide, size, e);
        }
    }
    /**
     * 异步清除指定合约的所有止盈止损条件单。
     */
    public void cancelAllPriceTriggeredOrders() {
        executor.execute(() -> {
            try {
                if (wsClient == null || !wsClient.isOpen()) {
                    log.warn("[TradeExec] WS未连接,跳过撤销条件单");
                    return;
                }
                JSONArray argsArray = new JSONArray();
                JSONObject args = new JSONObject();
                args.put("instId", contract);
                args.put("algoOrdType", "conditional");
                argsArray.add(args);
                String connId = OkxWsUtil.getOrderNum("cancel");
                JSONObject msg = OkxWsUtil.buildJsonObject(connId, "cancel-algos", argsArray);
                wsClient.send(msg.toJSONString());
                log.info("[TradeExec] 已发送撤销所有条件单请求");
            } catch (Exception e) {
                log.error("[TradeExec] 撤销条件单失败", e);
            }
        });
    }
    private TradeRequestParam buildParam(String side, String posSide, String sz, String ordType) {
        TradeRequestParam param = new TradeRequestParam();
        param.setAccountName(accountName);
        param.setInstId(contract);
        param.setTdMode(marginMode);
        param.setPosSide(posSide);
        param.setOrdType(ordType);
        param.setSide(side);
        param.setClOrdId(OkxWsUtil.getOrderNum(side));
        param.setSz(sz);
        param.setTradeType("1");
        return param;
    }
    private JSONObject buildOrderArgs(TradeRequestParam param) {
        JSONObject args = new JSONObject();
        args.put("instId", param.getInstId());
        args.put("tdMode", param.getTdMode());
        args.put("clOrdId", param.getClOrdId());
        args.put("side", param.getSide());
        args.put("posSide", param.getPosSide());
        args.put("ordType", param.getOrdType());
        args.put("sz", param.getSz());
        return args;
    }
    private void sendOrder(TradeRequestParam param) {
        if (wsClient == null || !wsClient.isOpen()) {
            log.warn("[TradeExec] WS未连接,跳过下单");
            return;
        }
        if (BigDecimal.ZERO.compareTo(new BigDecimal(param.getSz())) >= 0) {
            log.warn("[TradeExec] 下单数量<=0,跳过");
            return;
        }
        JSONArray argsArray = new JSONArray();
        argsArray.add(buildOrderArgs(param));
        String connId = OkxWsUtil.getOrderNum("order");
        JSONObject msg = OkxWsUtil.buildJsonObject(connId, "order", argsArray);
        wsClient.send(msg.toJSONString());
        log.info("[TradeExec] 发送下单: side={}, sz={}", param.getSide(), param.getSz());
    }
    private void sendBatchOrders(List<TradeRequestParam> params) {
        if (wsClient == null || !wsClient.isOpen() || params == null || params.isEmpty()) {
            log.warn("[TradeExec] WS未连接或参数为空,跳过批量下单");
            return;
        }
        JSONArray argsArray = new JSONArray();
        for (TradeRequestParam p : params) {
            argsArray.add(buildOrderArgs(p));
        }
        String connId = OkxWsUtil.getOrderNum(null);
        JSONObject msg = OkxWsUtil.buildJsonObject(connId, "batch-orders", argsArray);
        wsClient.send(msg.toJSONString());
        log.info("[TradeExec] 发送批量下单: {}条", params.size());
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientMain.java
New file
@@ -0,0 +1,23 @@
package com.xcong.excoin.modules.okxApi;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * OKX 模块独立测试入口。
 *
 * <p>通过 Spring XML 上下文(applicationContext.xml)初始化管理器,
 * 运行极长时间后手动关闭。用于脱离 Web 容器独立调试策略。
 *
 * @author Administrator
 */
public class OkxWebSocketClientMain {
    public static void main(String[] args) throws InterruptedException {
        ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
        OkxWebSocketClientManager manager = context.getBean(OkxWebSocketClientManager.class);
        Thread.sleep(1200000000L);
        manager.destroy();
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/OkxWebSocketClientManager.java
New file
@@ -0,0 +1,118 @@
package com.xcong.excoin.modules.okxApi;
import com.xcong.excoin.modules.okxApi.wsHandler.handler.OkxAccountChannelHandler;
import com.xcong.excoin.modules.okxApi.wsHandler.handler.OkxCandlestickChannelHandler;
import com.xcong.excoin.modules.okxApi.wsHandler.handler.OkxOrderInfoChannelHandler;
import com.xcong.excoin.modules.okxApi.wsHandler.handler.OkxPositionsChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.math.BigDecimal;
@Slf4j
@Component
public class OkxWebSocketClientManager {
    private OkxKlineWebSocketClient wsKlineClient;
    private OkxKlineWebSocketClient wsPrivateClient;
    private OkxGridTradeService gridTradeService;
    private OkxConfig config;
    @PostConstruct
    public void init() {
        log.info("[管理器] 开始初始化...");
        try {
            config = OkxConfig.builder()
                    .apiKey("d90ca272391992b8e74f8f92cedb21ec")
                    .secretKey("1861e4f52de4bb53369ea3208d9ede38ece4777368030f96c77d27934c46c274")
                    .passphrase("Aa123123@")
                    .contract("BTC-USDT-SWAP")
                    .leverage("100")
                    .marginMode("cross")
                    .posMode("long_short_mode")
                    .gridRate(new BigDecimal("0.0015"))
                    .overallTp(new BigDecimal("5"))
                    .maxLoss(new BigDecimal("15"))
                    .quantity("1")
                    .contractMultiplier(new BigDecimal("1"))
                    .unrealizedPnlPriceMode(OkxConfig.PnLPriceMode.LAST_PRICE)
                    .isProduction(false)
                    .build();
            setupAccount();
            String accountName = "OKX_API";
            gridTradeService = new OkxGridTradeService(config, accountName);
            gridTradeService.startGrid();
            wsKlineClient = new OkxKlineWebSocketClient(config.getWsKlineUrl());
            wsKlineClient.addChannelHandler(new OkxCandlestickChannelHandler(config.getContract(), gridTradeService));
            wsKlineClient.init();
            log.info("[管理器] K线WS已连接, 已注册K线频道处理器");
            wsPrivateClient = new OkxKlineWebSocketClient(
                    config.getWsPrivateUrl(),
                    config.getApiKey(),
                    config.getSecretKey(),
                    config.getPassphrase());
            wsPrivateClient.addChannelHandler(new OkxPositionsChannelHandler(config.getContract(), gridTradeService));
            wsPrivateClient.addChannelHandler(new OkxAccountChannelHandler());
            wsPrivateClient.addChannelHandler(new OkxOrderInfoChannelHandler(config.getContract(), gridTradeService, config));
            wsPrivateClient.init();
            log.info("[管理器] 私有WS已连接, 已注册 3 个频道处理器");
            gridTradeService.setWebSocketClient(wsPrivateClient.getWebSocketClient());
        } catch (Exception e) {
            log.error("[管理器] 初始化失败", e);
        }
    }
    @PreDestroy
    public void destroy() {
        log.info("[管理器] 开始销毁...");
        if (gridTradeService != null) {
            gridTradeService.stopGrid();
        }
        if (wsKlineClient != null) {
            wsKlineClient.destroy();
        }
        if (wsPrivateClient != null) {
            wsPrivateClient.destroy();
        }
        log.info("[管理器] 销毁完成");
    }
    private void setupAccount() {
        log.info("[管理器] 开始配置账户...");
        OkxRestClient restClient = new OkxRestClient(
                config.getRestBaseUrl(),
                config.getApiKey(),
                config.getSecretKey(),
                config.getPassphrase(),
                !config.isProduction());
        boolean posModeOk = restClient.setPositionMode(config.getPosMode());
        if (!posModeOk) {
            log.error("[管理器] 设置持仓方式失败,策略可能无法正常运作");
        }
        boolean leverOk = restClient.setLeverage(
                config.getContract(), config.getLeverage(), config.getMarginMode());
        if (!leverOk) {
            log.error("[管理器] 设置杠杆倍数失败,策略可能无法正常运作");
        }
        log.info("[管理器] 账户配置完成, posMode:{}, leverage:{}, marginMode:{}",
                config.getPosMode(), config.getLeverage(), config.getMarginMode());
    }
    public OkxKlineWebSocketClient getKlineWebSocketClient() { return wsKlineClient; }
    public OkxKlineWebSocketClient getPrivateWebSocketClient() { return wsPrivateClient; }
    public OkxGridTradeService getGridTradeService() { return gridTradeService; }
}
src/main/java/com/xcong/excoin/modules/okxApi/OkxWsUtil.java
New file
@@ -0,0 +1,132 @@
package com.xcong.excoin.modules.okxApi;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import javax.net.ssl.SSLContext;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.text.SimpleDateFormat;
import java.util.Base64;
import java.util.Date;
import java.util.Random;
/**
 * OKX API 工具类:SSL配置、HMAC-SHA256签名、订单ID生成、日期格式化、JSON构建。
 *
 * @author Administrator
 */
@Slf4j
public final class OkxWsUtil {
    private OkxWsUtil() {}
    // ==================== SSL ====================
    public static void configureSSL() {
        try {
            TrustManager[] trustAllCerts = new TrustManager[]{
                    new X509TrustManager() {
                        @Override
                        public X509Certificate[] getAcceptedIssuers() { return new X509Certificate[0]; }
                        @Override
                        public void checkClientTrusted(X509Certificate[] certs, String authType) {}
                        @Override
                        public void checkServerTrusted(X509Certificate[] certs, String authType) {}
                    }
            };
            SSLContext sc = SSLContext.getInstance("TLS");
            sc.init(null, trustAllCerts, new SecureRandom());
            SSLContext.setDefault(sc);
        } catch (Exception e) {
            log.error("SSL配置失败", e);
        }
    }
    // ==================== HMAC-SHA256 签名 ====================
    public static String signWebsocket(String timestamp, String secretKey) {
        return sign(timestamp, "GET", "/users/self/verify", "", secretKey);
    }
    public static String signRest(String timestamp, String method, String path, String body, String secretKey) {
        return sign(timestamp, method.toUpperCase(), path, body, secretKey);
    }
    private static String sign(String timestamp, String method, String path, String body, String secretKey) {
        try {
            String message = String.format("%s%s%s%s", timestamp, method, path, body);
            Mac mac = Mac.getInstance("HmacSHA256");
            SecretKeySpec spec = new SecretKeySpec(secretKey.getBytes("UTF-8"), "HmacSHA256");
            mac.init(spec);
            byte[] hash = mac.doFinal(message.getBytes("UTF-8"));
            return Base64.getEncoder().encodeToString(hash);
        } catch (Exception e) {
            log.error("签名计算失败", e);
            return "";
        }
    }
    // ==================== 订单ID ====================
    private static final ThreadLocal<SimpleDateFormat> ORDER_ID_DF =
            ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyyMMddHHmmss"));
    private static final ThreadLocal<Random> RANDOM =
            ThreadLocal.withInitial(Random::new);
    public static String getOrderNum(String prefix) {
        String dd = ORDER_ID_DF.get().format(new Date());
        if (prefix != null && !prefix.isEmpty()) {
            return prefix + dd + getRandomNum(5);
        }
        return dd + getRandomNum(5);
    }
    private static String getRandomNum(int length) {
        String str = "0123456789";
        Random random = RANDOM.get();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < length; ++i) {
            sb.append(str.charAt(random.nextInt(str.length())));
        }
        return sb.toString();
    }
    // ==================== JSON构建 ====================
    public static JSONObject buildJsonObject(String connId, String op, JSONArray args) {
        JSONObject jsonObject = new JSONObject();
        if (connId != null && !connId.isEmpty()) {
            jsonObject.put("id", connId);
        }
        jsonObject.put("op", op);
        jsonObject.put("args", args);
        return jsonObject;
    }
    // ==================== 日期格式化 ====================
    private static final ThreadLocal<SimpleDateFormat> DATE_TIME_DF =
            ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
    private static final ThreadLocal<SimpleDateFormat> ISO8601_DF =
            ThreadLocal.withInitial(() -> {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
                sdf.setTimeZone(java.util.TimeZone.getTimeZone("UTC"));
                return sdf;
            });
    public static String getIso8601Timestamp() {
        return ISO8601_DF.get().format(new Date());
    }
    public static String timestampToDateTime(long timestamp) {
        return DATE_TIME_DF.get().format(new Date(timestamp));
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/enums/OkxEnums.java
New file
@@ -0,0 +1,29 @@
package com.xcong.excoin.modules.okxApi.enums;
/**
 * OKX API 模块枚举常量。
 *
 * @author Administrator
 */
public final class OkxEnums {
    private OkxEnums() {}
    public static final String POSSIDE_LONG = "long";
    public static final String POSSIDE_SHORT = "short";
    public static final String SIDE_BUY = "buy";
    public static final String SIDE_SELL = "sell";
    public static final String ORDTYPE_MARKET = "market";
    public static final String ORDTYPE_LIMIT = "limit";
    public static final String ORDTYPE_CONDITIONAL = "conditional";
    public static final String INSTTYPE_SWAP = "SWAP";
    public static final String MARGIN_CROSS = "cross";
    public static final String ORDER_TYPE_CLOSE_LONG = "plan-close-long-position";
    public static final String ORDER_TYPE_CLOSE_SHORT = "plan-close-short-position";
    public static final String CHANNEL_CANDLE = "candle1m";
    public static final String CHANNEL_POSITIONS = "positions";
    public static final String CHANNEL_ACCOUNT = "account";
    public static final String CHANNEL_ORDERS = "orders";
}
src/main/java/com/xcong/excoin/modules/okxApi/okxApi-logic.md
New file
@@ -0,0 +1,724 @@
# OKX API 网格交易策略 — 逻辑文档
---
## 目录
1. [整体架构](#1-整体架构)
2. [配置层:OkxConfig](#2-配置层okxconfig)
3. [基础设施层](#3-基础设施层)
   - [3.1 OkxEnums — 常量定义](#31-okxenums--常量定义)
   - [3.2 OkxWsUtil — WebSocket 工具类](#32-okxwsutil--websocket-工具类)
   - [3.3 TradeRequestParam — 下单参数](#33-traderequestparam--下单参数)
4. [WebSocket 通信层](#4-websocket-通信层)
   - [4.1 OkxWebSocketClientManager — 入口管理器](#41-okxwebsocketclientmanager--入口管理器)
   - [4.2 OkxKlineWebSocketClient — WS 连接客户端](#42-okxklinewebsocketclient--ws-连接客户端)
5. [频道处理器层](#5-频道处理器层)
   - [5.1 OkxChannelHandler — 处理器接口](#51-okxchannelhandler--处理器接口)
   - [5.2 OkxCandlestickChannelHandler — K线频道](#52-okxcandlestickchannelhandler--k线频道)
   - [5.3 OkxPositionsChannelHandler — 持仓频道](#53-okxpositionschannelhandler--持仓频道)
   - [5.4 OkxAccountChannelHandler — 账户频道](#54-okxaccountchannelhandler--账户频道)
   - [5.5 OkxOrderInfoChannelHandler — 订单成交频道](#55-okxorderinfochannelhandler--订单成交频道)
6. [策略执行层](#6-策略执行层)
   - [6.1 OkxTradeExecutor — 异步下单执行器](#61-okxtradeexecutor--异步下单执行器)
   - [6.2 OkxGridTradeService — 网格策略核心](#62-okxgridtradeservice--网格策略核心)
7. [独立启动类:OkxWebSocketClientMain](#7-独立启动类okxwebsocketclientmain)
8. [完整调用链](#8-完整调用链)
9. [与 Gate API 的差异对比](#9-与-gate-api-的差异对比)
---
## 1. 整体架构
```
┌──────────────────────────────────────────────────────────────┐
│  OkxWebSocketClientManager (Spring @Component)               │
│    · 读取配置 · 组装所有组件 · 启动 WS 连接 · 生命周期管理    │
└──────────────────────┬───────────────────────────────────────┘
                       │
         ┌─────────────┼─────────────┐
         ▼             ▼             ▼
   OkxConfig      OkxGridTradeService   OkxKlineWebSocketClient
   (Builder配置)    (策略核心)            (WS连接客户端)
                       │                      │
                       │              ┌────────┴────────┐
                       │              │  4 个频道处理器   │
                       │              ├─────────────────┤
                       │              │ K线 | 持仓       │
                       │              │ 账户 | 订单成交   │
                       │              └────────┬────────┘
                       │                       │
                       ▼                       │
              OkxTradeExecutor                 │
              (异步下单线程池) ◄────────────────┘
                       │
                       ▼
              通过 WS 发送下单 JSON
```
**核心数据流**:
```
K线推送 → OkxGridTradeService.onKline() → 匹配网格队列 → OkxTradeExecutor 异步下单
持仓推送 → OkxGridTradeService.onPositionUpdate() → 识别基底成交 → 设置止盈单 → 队列就绪 → 激活策略
订单推送 → OkxGridTradeService.onOrderFilled() → 累计盈亏跟踪 → 达标/超限自动停止
```
**设计原则**:
- **包自包含**:`okxApi` 包不依赖任何其他业务包(`okxNewPrice`、`gateApi`、`newPrice`、`blackchain` 等)
- **WS 回调不阻塞**:所有下单操作通过 `OkxTradeExecutor` 单线程池异步执行
- **状态机驱动**:策略状态(`WAITING_KLINE → OPENING → ACTIVE → STOPPED`)严格控制执行流程
---
## 2. 配置层:OkxConfig
```
文件:OkxConfig.java
```
使用 **Builder 模式**构造配置对象,不可变设计,所有字段 `private final`。
### 配置字段
| 分组 | 字段 | 说明 |
|------|------|------|
| **API 密钥** | `apiKey` | OKX API Key |
| | `secretKey` | OKX Secret Key |
| | `passphrase` | OKX Passphrase |
| **合约参数** | `contract` | 合约品种(如 `BTC-USDT-SWAP`) |
| | `marginMode` | 保证金模式(`cross` 全仓 / `isolated` 逐仓) |
| | `tickSz` | 价格精度 |
| | `contractMultiplier` | 合约乘数(用于盈亏计算) |
| | `leverage` | 杠杆倍数 |
| **策略参数** | `quantity` | 每次开仓张数 |
| | `gridRate` | 网格间距比例(如 0.01 = 1%) |
| | `gridQueueSize` | 网格队列长度 |
| | `marginRatioLimit` | 保证金占用比例上限 |
| | `overallTp` | 全局止盈目标(累计盈亏 ≥ 此值停止) |
| | `maxLoss` | 最大亏损限制(累计盈亏 ≤ 此值停止) |
| **环境** | `isProduction` | 是否生产环境(决定 WS URL 域名) |
### Builder 方法链
```java
OkxConfig config = OkxConfig.builder()
    .apiKey("xxx")
    .secretKey("xxx")
    .passphrase("xxx")
    .contract("BTC-USDT-SWAP")
    .marginMode("cross")
    .leverage(1)
    .quantity("1")
    .gridRate(new BigDecimal("0.01"))
    .gridQueueSize(10)
    .overallTp(new BigDecimal("100"))
    .isProduction(false)
    .build();
```
---
## 3. 基础设施层
### 3.1 OkxEnums — 常量定义
```
文件:enums/OkxEnums.java
```
集中管理所有 OKX API 相关字符串常量,替代外部 `CoinEnums` 依赖。
| 常量 | 值 | 用途 |
|------|-----|------|
| `INSTTYPE_SPOT` | `SPOT` | 现货 |
| `INSTTYPE_SWAP` | `SWAP` | 永续合约 |
| `POSSIDE_LONG` | `long` | 多仓方向 |
| `POSSIDE_SHORT` | `short` | 空仓方向 |
| `SIDE_BUY` | `buy` | 买入 |
| `SIDE_SELL` | `sell` | 卖出 |
| `ORDTYPE_MARKET` | `market` | 市价单 |
| `ORDTYPE_LIMIT` | `limit` | 限价单 |
| `CHANNEL_POSITIONS` | `positions` | 持仓频道 |
| `CHANNEL_CANDLE` | `candle` + 周期 | K线频道 |
| `CHANNEL_ACCOUNT` | `account` | 账户频道 |
| `CHANNEL_ORDERS` | `orders` | 订单频道 |
| `CHANNEL_ORDERS_ALGO` | `orders-algo` | 策略委托频道 |
---
### 3.2 OkxWsUtil — WebSocket 工具类
```
文件:OkxWsUtil.java
```
替代外部 `SSLConfig`、`SignUtils`、`WsParamBuild`、`DateUtil` 等依赖,提供以下静态方法:
| 方法 | 用途 |
|------|------|
| `configureSSL(wsClient)` | 为 `WebSocketClient` 配置 SSL(跳过证书验证,仅测试环境) |
| `generateSignature(timestamp, method, requestPath, body, secretKey)` | OKX 签名算法:HMAC-SHA256 + Base64 |
| `getOrderNum(side)` | 生成唯一订单 ID(时间戳 + 随机数 + side) |
| `timestampToDateTime(timestamp)` | 毫秒时间戳 → `yyyy/MM/dd HH:mm:ss` 格式 |
| `timestampToDateToString(timestamp)` | 毫秒时间戳 → `yyyy/MM/dd` 格式 |
| `buildJsonObject(connId, channel, args)` | 构建 WS 请求 JSON 对象 |
| `buildLoginParam(okxConfig)` | 构建登录认证参数(sign + timestamp) |
**签名算法**:
```
sign = Base64(HMAC-SHA256(timestamp + "GET" + requestPath + body, secretKey))
```
---
### 3.3 TradeRequestParam — 下单参数
```
文件:param/TradeRequestParam.java
```
纯 POJO,替代外部 `TradeRequestParam` 依赖。
| 字段 | 说明 |
|------|------|
| `accountName` | 账户标识 |
| `instId` | 合约 ID |
| `tdMode` | 保证金模式(cross/isolated) |
| `posSide` | 持仓方向(long/short) |
| `ordType` | 订单类型(market/limit) |
| `side` | 买卖方向(buy/sell) |
| `clOrdId` | 客户端订单 ID(唯一) |
| `sz` | 下单数量 |
| `markPx` | 标记价格(限价单用) |
| `tradeType` | 交易类型(1=开仓,3=平仓) |
---
## 4. WebSocket 通信层
### 4.1 OkxWebSocketClientManager — 入口管理器
```
文件:OkxWebSocketClientManager.java
```
**Spring `@Component`**,管理完整的 WS 生命周期。
#### 职责
1. **组件组装**:创建 `OkxConfig` → `OkxGridTradeService` → `OkxTradeExecutor`(注入 WS Client)→ 4 个频道处理器 → `OkxKlineWebSocketClient`
2. **生命周期管理**:
   - `@PostConstruct init()`:初始化和连接(生产环境)或注册 MBean(测试环境)
   - `@PreDestroy close()`:优雅关闭(停止策略 → 取消条件单 → 关闭 WS)
#### 初始化流程
```
init()
  ├── configMap — 从本地缓存读取账户配置
  ├── OkxGridTradeService.startGrid()
  ├── OkxTradeExecutor.setWebSocketClient(wsClient)
  ├── 创建 4 个频道处理器
  ├── OkxKlineWebSocketClient.connect()
  │     ├── 连接 OSKL 公开 WS(K线频道不需要登录)
  │     ├── 登录私有 WS → 订阅 持仓/账户/订单/策略委托频道
  │     └── 启动心跳定时器(30s ping/pong)
  └── isProduction ? 直接启动 : MBean 注册(JMX 手动控制)
```
---
### 4.2 OkxKlineWebSocketClient — WS 连接客户端
```
文件:OkxKlineWebSocketClient.java
```
封装 `java-websocket` 客户端,管理物理连接。
#### 连接架构
- **公开频道 WS**(`wss://ws.okx.com:8443/ws/v5/public` 或模拟盘域名):K线推送不需要登录
- **私有频道 WS**(`wss://ws.okx.com:8443/ws/v5/private` 或模拟盘域名):需要登录认证,订阅持仓/账户/订单/策略委托频道
#### 连接流程
```
connect()
  ├── 1. 创建公开 WS Client → connect()
  │      └── onOpen → subscribePublicChannels() → 订阅 K线频道
  ├── 2. 创建私有 WS Client → connect()
  │      └── onOpen → login() → onLoginSuccess → subscribePrivateChannels()
  │             ├── 订阅 positions 频道
  │             ├── 订阅 account 频道
  │             ├── 订阅 orders 频道
  │             └── 订阅 orders-algo 频道
  └── 3. 启动心跳定时器(30s 间隔 ping/pong,60s 超时检测)
```
#### 断线重连机制
- 最多重连 `MAX_RECONNECT_ATTEMPTS` 次(默认 30)
- 重连延迟 `RECONNECT_DELAY_MS`(默认 5000ms)
- 重连成功后重新执行登录 + 订阅流程
- 兜底机制:重连失败后尝试通过 MBean 重启整个 WS 客户端
#### 消息路由
`onMessage` → 遍历所有 `OkxChannelHandler` → 调用 `handleMessage(response)` 分发到具体处理器
---
## 5. 频道处理器层
### 5.1 OkxChannelHandler — 处理器接口
```
文件:wsHandler/OkxChannelHandler.java
```
统一接口,所有频道处理器实现此接口:
```java
public interface OkxChannelHandler {
    String getChannelName();                      // 频道名称
    void subscribe(WebSocketClient ws);           // 订阅
    void unsubscribe(WebSocketClient ws);         // 取消订阅
    boolean handleMessage(JSONObject response);   // 处理推送消息
}
```
### 5.2 OkxCandlestickChannelHandler — K线频道
```
文件:wsHandler/handler/OkxCandlestickChannelHandler.java
```
#### 订阅参数
| 参数 | 值 |
|------|-----|
| `channel` | `candle{period}`(如 `candle1H`) |
| `instId` | 合约 ID(如 `BTC-USDT-SWAP`) |
#### 数据处理
- 解析 `data[0]` → `[ts, o, h, l, c, vol, volCcy, ...]`
- 提取**收盘价** `c` → 调用 `gridTradeService.onKline(closePrice)`
- K线为 `candle1H` 时打印整点日志
#### 调用链
```
onKline(closePrice)
  ├── WAITING_KLINE → 进入 OPENING 状态,开基底多+空
  ├── ACTIVE → processShortGrid(closePrice) + processLongGrid(closePrice)
  └── STOPPED → 仅更新未实现盈亏
```
### 5.3 OkxPositionsChannelHandler — 持仓频道
```
文件:wsHandler/handler/OkxPositionsChannelHandler.java
```
#### 订阅参数
| 参数 | 值 |
|------|-----|
| `channel` | `positions` |
| `instType` | `SWAP` |
| `instId` | 合约 ID |
#### 数据处理
解析 `data[]` 数组 → 提取 `posSide`、`pos`(数量)、`avgPx`(均价)→ 调用 `gridTradeService.onPositionUpdate(posSide, size, avgPx)`
#### 在策略中的作用
```
onPositionUpdate() → 区分 3 种场景:
  1. 仓位从无到有(基底开仓成交)→ 标记 baseOpened → 双基底都成后生成网格队列
  2. 仓位量增加(网格触发开仓成交)→ 取队列首元素做止盈价 → 设止盈条件单
  3. 仓位归零(止盈平仓完成)→ 标记 active=false
```
### 5.4 OkxAccountChannelHandler — 账户频道
```
文件:wsHandler/handler/OkxAccountChannelHandler.java
```
#### 订阅参数
| 参数 | 值 |
|------|-----|
| `channel` | `account` |
#### 数据处理
解析 `data[]` → 提取 `availBal`(可用余额)、`cashBal`(现金余额)、`eq`(权益)、`upl`(未实现盈亏)、`imr`(保证金占用)
**当前版本**:仅做日志输出,不做业务判断。后续可扩展保证金安全阀功能。
### 5.5 OkxOrderInfoChannelHandler — 订单成交频道
```
文件:wsHandler/handler/OkxOrderInfoChannelHandler.java
```
#### 订阅参数
| 参数 | 值 |
|------|-----|
| `channel` | `orders` |
| `instType` | `SWAP` |
| `instId` | 合约 ID |
#### 数据处理
- 过滤 `state=filled` 且 `accFillSz>0` 的订单
- 提取 `posSide`、`accFillSz`(成交数量)、`fillPnl`(已实现盈亏)
- 调用 `gridTradeService.onOrderFilled(posSide, accFillSz, fillPnl)`
#### 在策略中的作用
```
onOrderFilled()
  ├── 累计盈亏 += fillPnl
  ├── 累计盈亏 ≥ overallTp → 策略停止(止盈达标)
  └── 累计盈亏 ≤ -maxLoss → 策略停止(亏损超限)
```
---
## 6. 策略执行层
### 6.1 OkxTradeExecutor — 异步下单执行器
```
文件:OkxTradeExecutor.java
```
#### 设计目的
WS 消息在回调线程处理,下单操作提交到**独立线程池异步执行**,避免阻塞 WS 回调线程。
#### 线程模型
| 参数 | 值 |
|------|-----|
| 核心线程 | 1 |
| 最大线程 | 1 |
| 空闲超时 | 60s(`allowCoreThreadTimeOut`) |
| 队列类型 | `LinkedBlockingQueue` |
| 队列容量 | 64 |
| 拒绝策略 | `CallerRunsPolicy`(队列满时由提交线程直接同步执行,形成自然背压) |
| 守护线程 | 是 |
**单线程的作用**:保证下单顺序(开多 → 开空 → 止盈单),避免并发竞争。
#### 公开方法
| 方法 | 说明 |
|------|------|
| `openLong(quantity, onSuccess, onFailure)` | 异步市价开多 |
| `openShort(quantity, onSuccess, onFailure)` | 异步市价开空 |
| `placeTakeProfit(triggerPrice, orderType, size)` | 异步创建止盈条件单(通过 `batch-orders` 发送 algo 委托) |
| `cancelAllPriceTriggeredOrders()` | 撤销所有条件单(`cancel-algos`) |
| `setWebSocketClient(wsClient)` | 注入 WS 客户端引用 |
| `shutdown()` | 优雅关闭(等待 10s,超时强制中断) |
#### 止盈兜底机制
```
placeTakeProfit()
  ├── 成功 → 发送 batch-orders(algo 条件单)
  └── 失败 → marketClose() 立即市价平仓兜底
```
#### 下单方式
所有下单通过 **WebSocket** 发送 JSON(非 REST API),`sendOrder` 构建如下消息:
```json
{
  "id": "order_1712345678901_a1b2c3",
  "op": "order",
  "args": [{
    "instId": "BTC-USDT-SWAP",
    "tdMode": "cross",
    "clOrdId": "buy_1712345678901_d4e5f6",
    "side": "buy",
    "posSide": "long",
    "ordType": "market",
    "sz": "1"
  }]
}
```
#### 调用链
```
OkxGridTradeService.onKline
  └── executor.openLong() / openShort()     ← 基底双开 + 网格触发
OkxGridTradeService.onPositionUpdate
  └── executor.placeTakeProfit()             ← 仓位成交后设止盈
OkxGridTradeService.stopGrid
  └── executor.cancelAllPriceTriggeredOrders()  + shutdown()
```
---
### 6.2 OkxGridTradeService — 网格策略核心
```
文件:OkxGridTradeService.java
```
这是整个包的核心,实现了**多空双开网格交易策略**的所有状态机和网格队列逻辑。
#### 策略状态机
```
WAITING_KLINE ──(首根K线到达)──▶ OPENING ──(双基底成交)──▶ ACTIVE ──(止盈/止损达标)──▶ STOPPED
                        ▲                                                         │
                        └──────────────────(startGrid() 重新启动)──────────────────┘
```
| 状态 | 含义 |
|------|------|
| `WAITING_KLINE` | 等待首根 K 线到达 |
| `OPENING` | 已收到 K 线,正在开基底仓位(多+空各一单) |
| `ACTIVE` | 双基底已成交,网格队列已生成,正常交易中 |
| `STOPPED` | 已停止(止盈达标 / 亏损超限 / 手动停止) |
#### 数据结构
| 字段 | 类型 | 说明 |
|------|------|------|
| `shortPriceQueue` | `List<BigDecimal>` | 空仓价格队列(**降序**) |
| `longPriceQueue` | `List<BigDecimal>` | 多仓价格队列(**升序**) |
| `shortBaseEntryPrice` | `BigDecimal` | 空仓基底成交价 |
| `longBaseEntryPrice` | `BigDecimal` | 多仓基底成交价 |
| `baseLongOpened` | `boolean` | 多仓基底是否已开 |
| `baseShortOpened` | `boolean` | 空仓基底是否已开 |
| `cumulativePnl` | `BigDecimal` | 累计已实现盈亏 |
#### 策略完整生命周期
```
1. startGrid()
   └── 重置所有状态 → WAITING_KLINE
2. onKline(closePrice) → WAITING_KLINE
   └── 转为 OPENING → 发送基底多单 + 基底空单
3. onPositionUpdate(posSide, size, entryPrice)
   ├── 仓位从无到有
   │   ├── 标记 baseLongOpened / baseShortOpened
   │   ├── 记录 entryPrice
   │   ├── 双基底都成交 → tryGenerateQueues() → ACTIVE
   │   └── 生成网格队列:
   │       · 空仓队列:entryPrice × (1 - gridRate×1), (1 - gridRate×2), ... 降序排列
   │       · 多仓队列:entryPrice × (1 + gridRate×1), (1 + gridRate×2), ... 升序排列
   ├── 仓位量增加(网格触发开仓成交)
   │   └── 检查队列非空 → 取队列首元素做止盈价 → executor.placeTakeProfit()
   └── 仓位归零
       └── 标记 active=false
4. onKline(closePrice) → ACTIVE
   ├── processShortGrid(closePrice)  ← 空仓网格处理
   │   ├── 匹配队列中 > closePrice 的元素
   │   ├── 移除已匹配 → 尾部补充新元素(递减)
   │   ├── 多仓队列转移(以对方队列首元素为种子生成递减元素)
   │   ├── 贴近持仓均价过滤(skip)
   │   ├── 保证金安全检查
   │   ├── 开空一次
   │   └── 额外反向开多(价格夹在多/空均价之间且多>空倒挂时)
   └── processLongGrid(closePrice)   ← 多仓网格处理
       └── (对称逻辑,方向反转)
5. onOrderFilled(posSide, fillSz, pnl)
   ├── cumulativePnl += pnl
   ├── cumulativePnl ≥ overallTp → STOPPED
   └── cumulativePnl ≤ -maxLoss → STOPPED
6. stopGrid()
   ├── 状态 → STOPPED
   ├── cancelAllPriceTriggeredOrders()
   └── executor.shutdown()
```
#### 空仓网格处理 `processShortGrid(currentPrice)` 详解
```
1. 匹配队列元素(空仓队列降序遍历,收集 > currentPrice 的元素)
   └── 为空 → 直接返回
2. 空仓队列更新
   ├── 移除 matched 元素
   └── 尾部补充新元素(尾价 × (1 - gridRate) 循环递减)→ 降序排序
3. 多仓队列转移
   ├── 以多仓队列首元素(最小价)为种子
   ├── 生成 matched.size() 个递减元素加入多仓队列
   ├── 贴近持仓均价过滤:元素与多仓均价差距 < gridRate → skip
   └── 升序排序,超长截断
4. 保证金安全检查
   └── 超限 → warn 跳过开仓
5. 开空一次 → executor.openShort()
6. 额外反向开多条件(同时满足):
   ├── longEntryPrice > shortEntryPrice(多>空倒挂)
   ├── currentPrice > shortEntryPrice(当前价在空仓均价上方)
   └── currentPrice < longEntryPrice × (1 - gridRate)(远离多仓均价)
   └── 满足 → executor.openLong() 额外开多一次
```
#### 多仓网格处理 `processLongGrid(currentPrice)` 详解
对称逻辑,方向反转。
---
## 7. 独立启动类:OkxWebSocketClientMain
```
文件:OkxWebSocketClientMain.java
```
**纯 main 方法启动**,不依赖 Spring 容器。
### 用途
用于**本地测试和调试**,无需启动整个 Spring 应用。
### 启动流程
```java
public static void main(String[] args) {
    OkxConfig config = OkxConfig.builder()
        .apiKey("xxx")
        .secretKey("xxx")
        .passphrase("xxx")
        .contract("BTC-USDT-SWAP")
        .marginMode("cross")
        .leverage(1)
        .quantity("1")
        .gridRate(new BigDecimal("0.01"))
        .gridQueueSize(10)
        .overallTp(new BigDecimal("100"))
        .isProduction(false)
        .build();
    OkxGridTradeService service = new OkxGridTradeService(config, "test-account");
    service.startGrid();
    OkxKlineWebSocketClient wsClient = new OkxKlineWebSocketClient(config, service, ...);
    wsClient.connect();
    // 注册 JVM 关闭钩子
    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        service.stopGrid();
        wsClient.close();
    }));
}
```
---
## 8. 完整调用链
```
┌─────────────────────────────────────────────────────────────┐
│ 1. 启动阶段                                                   │
├─────────────────────────────────────────────────────────────┤
│ OkxWebSocketClientManager.init()                             │
│   ├── new OkxConfig.Builder()...build()                      │
│   ├── new OkxGridTradeService(config, accountName)           │
│   │     └── new OkxTradeExecutor(contract, marginMode, name) │
│   ├── gridTradeService.startGrid() → WAITING_KLINE           │
│   ├── new OkxCandlestickChannelHandler(instId, candlePeriod, │
│   │        gridTradeService, config)                          │
│   ├── new OkxPositionsChannelHandler(instId, gridTradeService)│
│   ├── new OkxAccountChannelHandler()                         │
│   ├── new OkxOrderInfoChannelHandler(instId, gridTradeService,│
│   │        config)                                            │
│   ├── new OkxKlineWebSocketClient(config, handlers, ...)     │
│   ├── wsClient.connect()                                     │
│   │     ├── 连接公开 WS → 订阅 K线频道                        │
│   │     ├── 连接私有 WS → 登录 → 订阅 持仓/账户/订单/策略委托  │
│   │     └── 启动心跳定时器                                    │
│   └── executor.setWebSocketClient(privateWsClient)           │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 2. 运行时数据流                                               │
├─────────────────────────────────────────────────────────────┤
│ [K线推送]                                                    │
│ OkxCandlestickChannelHandler.handleMessage()                 │
│   └── gridTradeService.onKline(closePrice)                   │
│         ├── WAITING_KLINE → OPENING                          │
│         │     ├── executor.openLong(quantity, onSuccess,     │
│         │     │        onFailure)                             │
│         │     └── executor.openShort(quantity, onSuccess,    │
│         │              onFailure)                             │
│         └── ACTIVE                                           │
│               ├── processShortGrid(closePrice)               │
│               │     ├── 匹配队列 → 更新队列 → 转移对方队列    │
│               │     └── executor.openShort()                  │
│               └── processLongGrid(closePrice)                │
│                     └──(对称逻辑)                            │
│                                                              │
│ [持仓推送]                                                    │
│ OkxPositionsChannelHandler.handleMessage()                   │
│   └── gridTradeService.onPositionUpdate(posSide, size, avgPx)│
│         ├── 基底成交 → 标记 baseOpened → tryGenerateQueues() │
│         └── 增量成交 → executor.placeTakeProfit(tp, type, sz)│
│                                                              │
│ [订单成交推送]                                                │
│ OkxOrderInfoChannelHandler.handleMessage()                   │
│   └── gridTradeService.onOrderFilled(posSide, fillSz, pnl)   │
│         └── cumulativePnl 累加 → 达标则停止                   │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 3. 停止阶段                                                   │
├─────────────────────────────────────────────────────────────┤
│ OkxWebSocketClientManager.close()                            │
│   ├── gridTradeService.stopGrid()                            │
│   │     ├── state = STOPPED                                  │
│   │     ├── executor.cancelAllPriceTriggeredOrders()         │
│   │     │     └── wsClient.send(cancel-algos)               │
│   │     └── executor.shutdown()                              │
│   └── wsClient.close()                                       │
│         └── 关闭公开WS + 私有WS                               │
└─────────────────────────────────────────────────────────────┘
```
---
## 9. 与 Gate API 的差异对比
| 方面 | Gate API (gateApi) | OKX API (okxApi) |
|------|-------------------|-----------------|
| **下单方式** | REST API (`FuturesApi.createFuturesOrder`) | WebSocket JSON 消息 (`op: "order"`) |
| **止盈单** | REST API (`createPriceTriggeredOrder`),`plan-close-*-position` | WS 消息 (`op: "batch-orders"`),`ordType: limit` + `px` 触发价 |
| **仓位方向** | 正数=开多、负数=开空(size 带符号) | `posSide: long/short` 显式区分,`sz` 始终正数 |
| **保证金模式** | 无(Gate API 隐含) | `tdMode: cross/isolated` 显式指定 |
| **客户端订单 ID** | 自动生成(Gate API 隐式处理) | `clOrdId` 显式生成和传入 |
| **取消条件单** | REST API (`cancelPriceTriggeredOrderList`) | WS 消息 (`op: "cancel-algos"`) |
| **止损失败兜底** | REST `createFuturesOrder` IOC 市价平仓 | WS 消息 `marketClose()`(`tradeType: "3"`) |
| **成交识别** | 通过 WS `FuturesOrderBookTicker` 或 REST 查询 | WS `orders` 频道推送 `state=filled` |
| **API 认证** | HMAC-SHA256 请求头签名(Gate SDK 封装) | HMAC-SHA256 + Base64 WS 登录消息 |
| **WS 连接** | 单一连接,频道订阅混合 | 双连接:公开 WS(K线)+ 私有 WS(持仓/账户/订单) |
| **`placeTakeProfit` 签名** | `(triggerPrice, rule, orderType, size)` 多一个 `rule` 参数 | `(triggerPrice, orderType, size)` (OKX 无 rule 概念) |
| **止盈单下单** | 单条 `FuturesPriceTriggeredOrder` | `batch-orders` 包装(List 格式,但实际传 1 条) |
| **心跳机制** | 应用层 ping/pong JSON 消息 | `java-websocket` 自带 ping/pong + 60s 超时重连 |
| **包依赖** | 依赖 Gate SDK (`io.gate.gateapi`) | **完全自包含**,仅依赖 `java-websocket` + `fastjson` + `lombok` |
src/main/java/com/xcong/excoin/modules/okxApi/okx文档.txt
New file
Diff too large
src/main/java/com/xcong/excoin/modules/okxApi/param/TradeRequestParam.java
New file
@@ -0,0 +1,41 @@
package com.xcong.excoin.modules.okxApi.param;
/**
 * 交易请求参数。
 *
 * @author Administrator
 */
public class TradeRequestParam {
    private String accountName;
    private String markPx;
    private String instId;
    private String tdMode;
    private String posSide;
    private String ordType;
    private String tradeType;
    private String side;
    private String clOrdId;
    private String sz;
    public String getAccountName() { return accountName; }
    public void setAccountName(String accountName) { this.accountName = accountName; }
    public String getMarkPx() { return markPx; }
    public void setMarkPx(String markPx) { this.markPx = markPx; }
    public String getInstId() { return instId; }
    public void setInstId(String instId) { this.instId = instId; }
    public String getTdMode() { return tdMode; }
    public void setTdMode(String tdMode) { this.tdMode = tdMode; }
    public String getPosSide() { return posSide; }
    public void setPosSide(String posSide) { this.posSide = posSide; }
    public String getOrdType() { return ordType; }
    public void setOrdType(String ordType) { this.ordType = ordType; }
    public String getTradeType() { return tradeType; }
    public void setTradeType(String tradeType) { this.tradeType = tradeType; }
    public String getSide() { return side; }
    public void setSide(String side) { this.side = side; }
    public String getClOrdId() { return clOrdId; }
    public void setClOrdId(String clOrdId) { this.clOrdId = clOrdId; }
    public String getSz() { return sz; }
    public void setSz(String sz) { this.sz = sz; }
}
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/OkxChannelHandler.java
New file
@@ -0,0 +1,44 @@
package com.xcong.excoin.modules.okxApi.wsHandler;
import com.alibaba.fastjson.JSONObject;
import org.java_websocket.client.WebSocketClient;
/**
 * OKX WebSocket 频道处理器接口。
 *
 * <p>每个 OKX 频道对应一个实现类。新增频道只需实现此接口,
 * 然后通过 {@code OkxKlineWebSocketClient.addChannelHandler()} 注册即可。
 *
 * <h3>实现类</h3>
 * <ul>
 *   <li>{@code OkxCandlestickChannelHandler} — K 线数据</li>
 *   <li>{@code OkxPositionsChannelHandler} — 持仓更新</li>
 *   <li>{@code OkxAccountChannelHandler} — 账户信息</li>
 *   <li>{@code OkxOrderInfoChannelHandler} — 订单信息(止盈止损、PnL跟踪)</li>
 * </ul>
 *
 * <h3>路由机制</h3>
 * {@code handleMessage()} 返回 {@code true} 表示消息已被该 handler 处理,
 * 路由循环会停止遍历。返回 {@code false} 表示不匹配(channel 名不相等)。
 *
 * @author Administrator
 */
public interface OkxChannelHandler {
    /** 频道名称,如 {@code "candle1m"} */
    String getChannelName();
    /** 发送订阅请求 */
    void subscribe(WebSocketClient ws);
    /** 发送取消订阅请求 */
    void unsubscribe(WebSocketClient ws);
    /**
     * 处理频道推送消息。
     *
     * @param response WebSocket 推送的完整 JSON
     * @return true 表示已处理(循环停止),false 表示频道不匹配(继续遍历下一个 handler)
     */
    boolean handleMessage(JSONObject response);
}
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxAccountChannelHandler.java
New file
@@ -0,0 +1,90 @@
package com.xcong.excoin.modules.okxApi.wsHandler.handler;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
/**
 * OKX 账户频道处理器(account)。
 *
 * <h3>数据用途</h3>
 * 监控账户余额和保证金信息。用于保证金安全阀检查。
 *
 * <h3>推送字段</h3>
 * ccy, availBal, cashBal, eq, upl, imr, ordFroz
 *
 * <h3>注意</h3>
 * 当前版本仅做日志输出,后续可扩展保证金安全阀功能。
 *
 * @author Administrator
 */
@Slf4j
public class OkxAccountChannelHandler implements OkxChannelHandler {
    private static final String CHANNEL_NAME = "account";
    private static final String CHANNEL = "account";
    public OkxAccountChannelHandler() {
    }
    @Override
    public String getChannelName() {
        return CHANNEL_NAME;
    }
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "subscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 订阅成功", CHANNEL_NAME);
    }
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "unsubscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 取消订阅成功", CHANNEL_NAME);
    }
    @Override
    public boolean handleMessage(JSONObject response) {
        JSONObject argObj = response.getJSONObject("arg");
        if (argObj == null) {
            return false;
        }
        String channel = argObj.getString("channel");
        if (!CHANNEL.equals(channel)) {
            return false;
        }
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray == null || dataArray.isEmpty()) {
                return true;
            }
            for (int i = 0; i < dataArray.size(); i++) {
                JSONObject acct = dataArray.getJSONObject(i);
                log.info("[{}] 账户更新, 可用余额:{}, 现金余额:{}, 权益:{}, 未实现盈亏:{}, 保证金:{}",
                        CHANNEL_NAME,
                        acct.get("availBal"), acct.get("cashBal"),
                        acct.get("eq"), acct.get("upl"), acct.get("imr"));
            }
        } catch (Exception e) {
            log.error("[{}] 处理数据失败", CHANNEL_NAME, e);
        }
        return true;
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxCandlestickChannelHandler.java
New file
@@ -0,0 +1,93 @@
package com.xcong.excoin.modules.okxApi.wsHandler.handler;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.OkxGridTradeService;
import com.xcong.excoin.modules.okxApi.OkxWsUtil;
import com.xcong.excoin.modules.okxApi.enums.OkxEnums;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
@Slf4j
public class OkxCandlestickChannelHandler implements OkxChannelHandler {
    private final String instId;
    private final OkxGridTradeService gridTradeService;
    public OkxCandlestickChannelHandler(String instId, OkxGridTradeService gridTradeService) {
        this.instId = instId;
        this.gridTradeService = gridTradeService;
    }
    @Override
    public String getChannelName() {
        return OkxEnums.CHANNEL_CANDLE;
    }
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "subscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", OkxEnums.CHANNEL_CANDLE);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 订阅成功, 合约:{}, 周期:1m", OkxEnums.CHANNEL_CANDLE, instId);
    }
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "unsubscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", OkxEnums.CHANNEL_CANDLE);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 取消订阅成功", OkxEnums.CHANNEL_CANDLE);
    }
    @Override
    public boolean handleMessage(JSONObject response) {
        JSONObject argObj = response.getJSONObject("arg");
        if (argObj == null) {
            return false;
        }
        String channel = argObj.getString("channel");
        if (!OkxEnums.CHANNEL_CANDLE.equals(channel)) {
            return false;
        }
        String msgInstId = argObj.getString("instId");
        if (!instId.equals(msgInstId)) {
            return false;
        }
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray == null || dataArray.isEmpty()) {
                log.warn("[{}] 数据为空", OkxEnums.CHANNEL_CANDLE);
                return true;
            }
            JSONArray data = dataArray.getJSONArray(0);
            BigDecimal closePx = new BigDecimal(data.getString(4));
            String time = OkxWsUtil.timestampToDateTime(Long.parseLong(data.getString(0)));
            String confirm = data.getString(8);
            log.info("[{}] 收盘:{}, 时间:{}, 完结:{}", OkxEnums.CHANNEL_CANDLE, closePx, time, "1".equals(confirm));
            if (gridTradeService != null) {
                gridTradeService.onKline(closePx);
            }
        } catch (Exception e) {
            log.error("[{}] 处理数据失败", OkxEnums.CHANNEL_CANDLE, e);
        }
        return true;
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxOrderInfoChannelHandler.java
New file
@@ -0,0 +1,121 @@
package com.xcong.excoin.modules.okxApi.wsHandler.handler;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.OkxConfig;
import com.xcong.excoin.modules.okxApi.OkxGridTradeService;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
/**
 * OKX 订单信息频道处理器(orders)。
 *
 * <h3>数据用途</h3>
 * 监控订单成交(filled)状态,跟踪已实现盈亏(fillPnl)。
 * 成交后通过 batch-orders 频道设置止盈止损限价单。
 *
 * <h3>推送字段</h3>
 * instId, ordId, clOrdId, side, posSide, state, accFillSz, avgPx, fillPnl, fillFee
 *
 * <h3>数据处理</h3>
 * state=filled 且 accFillSz>0 → 成交 → 调用 gridTradeService.onOrderFilled() 跟踪盈亏
 *
 * @author Administrator
 */
@Slf4j
public class OkxOrderInfoChannelHandler implements OkxChannelHandler {
    private static final String CHANNEL_NAME = "orders";
    private static final String CHANNEL = "orders";
    private final String instId;
    private final OkxGridTradeService gridTradeService;
    private final OkxConfig config;
    public OkxOrderInfoChannelHandler(String instId, OkxGridTradeService gridTradeService, OkxConfig config) {
        this.instId = instId;
        this.gridTradeService = gridTradeService;
        this.config = config;
    }
    @Override
    public String getChannelName() {
        return CHANNEL_NAME;
    }
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "subscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL);
        arg.put("instType", "SWAP");
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 订阅成功, 合约:{}", CHANNEL_NAME, instId);
    }
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "unsubscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", CHANNEL);
        arg.put("instType", "SWAP");
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 取消订阅成功", CHANNEL_NAME);
    }
    @Override
    public boolean handleMessage(JSONObject response) {
        JSONObject argObj = response.getJSONObject("arg");
        if (argObj == null) {
            return false;
        }
        String channel = argObj.getString("channel");
        if (!CHANNEL.equals(channel)) {
            return false;
        }
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray == null || dataArray.isEmpty()) {
                return true;
            }
            for (int i = 0; i < dataArray.size(); i++) {
                JSONObject detail = dataArray.getJSONObject(i);
                if (!instId.equals(detail.getString("instId"))) {
                    continue;
                }
                String state = detail.getString("state");
                String accFillSz = detail.getString("accFillSz");
                String pnl = detail.getString("pnl");
                String posSide = detail.getString("posSide");
                String avgPx = detail.getString("avgPx");
                String clOrdId = detail.getString("clOrdId");
                log.info("[{}] 订单, 方向:{}, 状态:{}, 成交量:{}, 均价:{}, 盈亏:{}, 编号:{}",
                        CHANNEL_NAME, posSide, state, accFillSz, avgPx, pnl, clOrdId);
                if ("filled".equals(state) && accFillSz != null && new BigDecimal(accFillSz).compareTo(BigDecimal.ZERO) > 0) {
                    if (gridTradeService != null) {
                        BigDecimal pnlVal = pnl != null ? new BigDecimal(pnl) : BigDecimal.ZERO;
                        gridTradeService.onOrderFilled(posSide, new BigDecimal(accFillSz), pnlVal);
                    }
                }
            }
        } catch (Exception e) {
            log.error("[{}] 处理数据失败", CHANNEL_NAME, e);
        }
        return true;
    }
}
src/main/java/com/xcong/excoin/modules/okxApi/wsHandler/handler/OkxPositionsChannelHandler.java
New file
@@ -0,0 +1,97 @@
package com.xcong.excoin.modules.okxApi.wsHandler.handler;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.okxApi.OkxGridTradeService;
import com.xcong.excoin.modules.okxApi.enums.OkxEnums;
import com.xcong.excoin.modules.okxApi.wsHandler.OkxChannelHandler;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import java.math.BigDecimal;
@Slf4j
public class OkxPositionsChannelHandler implements OkxChannelHandler {
    private final String instId;
    private final OkxGridTradeService gridTradeService;
    public OkxPositionsChannelHandler(String instId, OkxGridTradeService gridTradeService) {
        this.instId = instId;
        this.gridTradeService = gridTradeService;
    }
    @Override
    public String getChannelName() {
        return OkxEnums.CHANNEL_POSITIONS;
    }
    @Override
    public void subscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "subscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", OkxEnums.CHANNEL_POSITIONS);
        arg.put("instType", OkxEnums.INSTTYPE_SWAP);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 订阅成功, 合约:{}", OkxEnums.CHANNEL_POSITIONS, instId);
    }
    @Override
    public void unsubscribe(WebSocketClient ws) {
        JSONObject msg = new JSONObject();
        msg.put("op", "unsubscribe");
        JSONArray args = new JSONArray();
        JSONObject arg = new JSONObject();
        arg.put("channel", OkxEnums.CHANNEL_POSITIONS);
        arg.put("instType", OkxEnums.INSTTYPE_SWAP);
        arg.put("instId", instId);
        args.add(arg);
        msg.put("args", args);
        ws.send(msg.toJSONString());
        log.info("[{}] 取消订阅成功", OkxEnums.CHANNEL_POSITIONS);
    }
    @Override
    public boolean handleMessage(JSONObject response) {
        JSONObject argObj = response.getJSONObject("arg");
        if (argObj == null) {
            return false;
        }
        String channel = argObj.getString("channel");
        if (!OkxEnums.CHANNEL_POSITIONS.equals(channel)) {
            return false;
        }
        try {
            JSONArray dataArray = response.getJSONArray("data");
            if (dataArray == null || dataArray.isEmpty()) {
                return true;
            }
            for (int i = 0; i < dataArray.size(); i++) {
                JSONObject pos = dataArray.getJSONObject(i);
                if (!instId.equals(pos.getString("instId"))) {
                    continue;
                }
                String posSide = pos.getString("posSide");
                BigDecimal size = new BigDecimal(pos.getString("pos"));
                BigDecimal avgPx = pos.containsKey("avgPx") && pos.getString("avgPx") != null
                        ? new BigDecimal(pos.getString("avgPx")) : BigDecimal.ZERO;
                log.info("[{}] 持仓更新, 方向:{}, 数量:{}, 均价:{}, 未实现盈亏:{}, 保证金:{}",
                        OkxEnums.CHANNEL_POSITIONS, posSide, size, avgPx,
                        pos.get("upl"), pos.get("imr"));
                if (gridTradeService != null) {
                    gridTradeService.onPositionUpdate(posSide, size, avgPx);
                }
            }
        } catch (Exception e) {
            log.error("[{}] 处理数据失败", OkxEnums.CHANNEL_POSITIONS, e);
        }
        return true;
    }
}