Administrator
2026-06-02 8838c52d80aa4f728999e36e064e6d09c27ceeaa
src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxWebSocketClientManager.java
@@ -1,133 +1,169 @@
package com.xcong.excoin.modules.okxNewPrice;
import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService;
import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.ExchangeInfoEnum;
import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService;
import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxAlgoOrdersChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxKlineChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.gridWs.OkxPositionsChannelHandler;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.OKXAccount;
import com.xcong.excoin.modules.okxNewPrice.okxpi.config.enums.DefaultUrls;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.math.BigDecimal;
/**
 * 管理多个OKX WebSocket客户端实例,每个实例对应一个账号
 * OKX 网格交易系统 Spring 容器入口 — 组件组装 + 生命周期管理。
 *
 * <h3>组装顺序({@code @PostConstruct})</h3>
 * <ol>
 *   <li>{@link OkxConfig} — 构建配置(API 密钥、合约、策略参数)</li>
 *   <li>{@link OkxGridTradeService}.init() — 获取账户 → 清条件单 → 平仓 → 设杠杆</li>
 *   <li>{@link OkxGridWsClient} — 注册 3 个频道处理器 → init():建立 WS 连接并登录订阅</li>
 *   <li>{@code gridTradeService.startGrid()} — 状态重置,等待首根 K 线</li>
 * </ol>
 *
 * <h3>3 个频道处理器</h3>
 * <ol>
 *   <li>OkxKlineChannelHandler — candle1m,K线 → onKline()</li>
 *   <li>OkxPositionsChannelHandler — positions,持仓 → onPositionUpdate()</li>
 *   <li>OkxAlgoOrdersChannelHandler — orders-algo,条件单 → onOrderUpdate()</li>
 * </ol>
 *
 * <h3>销毁顺序({@code @PreDestroy})</h3>
 * <ol>
 *   <li>gridTradeService.stopGrid():取消所有条件单 → 关闭交易线程池</li>
 *   <li>gridWsClient.destroy():取消订阅 → 断开 WS → 关闭线程池</li>
 * </ol>
 *
 * @author Administrator
 */
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "quant", havingValue = "true")
public class OkxWebSocketClientManager {
    @Autowired
    private CaoZuoService caoZuoService;
    @Autowired
    private RedisUtils redisUtils;
    @Autowired
    private WangGeListService wangGeListService;
    // 存储所有OkxQuantWebSocketClient实例,key为账号类型名称
    private final Map<String, OkxQuantWebSocketClient> quantClientMap = new ConcurrentHashMap<>();
    // 存储OkxNewPriceWebSocketClient实例
    private OkxNewPriceWebSocketClient newPriceClient;
    /** 网格交易公共 WS 客户端(candle1m) */
    private OkxGridWsClient gridWsClientPublic;
    /** 网格交易私有 WS 客户端(positions / orders-algo) */
    private OkxGridWsClient gridWsClientPrivate;
    /** 网格交易策略服务 */
    private OkxGridTradeService gridTradeService;
    /** 统一配置 */
    private OkxConfig okxConfig;
    /**
     * 初始化方法,在Spring Bean构造完成后执行
     * 创建并初始化所有账号的WebSocket客户端实例
     */
    @PostConstruct
    public void init() {
        log.info("开始初始化OkxWebSocketClientManager");
        // 初始化价格WebSocket客户端
        log.info("[OKX-Manager] 开始初始化...");
        try {
            newPriceClient = new OkxNewPriceWebSocketClient(redisUtils, caoZuoService, this, wangGeListService);
            newPriceClient.init();
            log.info("已初始化OkxNewPriceWebSocketClient");
        } catch (Exception e) {
            log.error("初始化OkxNewPriceWebSocketClient失败", e);
        }
        // 获取所有ExchangeInfoEnum枚举值
        ExchangeInfoEnum[] accounts = ExchangeInfoEnum.values();
        // 为每个账号创建一个WebSocket客户端实例
        for (ExchangeInfoEnum account : accounts) {
            try {
                OkxQuantWebSocketClient client = new OkxQuantWebSocketClient(account, redisUtils);
                quantClientMap.put(account.name(), client);
                client.init();
                log.info("已初始化账号 {} 的WebSocket客户端", account.name());
            } catch (Exception e) {
                log.error("初始化账号 {} 的WebSocket客户端失败", account.name(), e);
            // 获取账户配置
            ExchangeInfoEnum[] accounts = ExchangeInfoEnum.values();
            if (accounts == null || accounts.length == 0) {
                log.error("[OKX-Manager] 无可用账户,初始化失败");
                return;
            }
            // 使用第一个账户
            ExchangeInfoEnum primaryAccount = accounts[0];
            log.info("[OKX-Manager] 使用主账户: {}", primaryAccount.name());
            // 1. 创建 OKXAccount(REST API 调用需要)
            String baseUrl = primaryAccount.isAccountType() ? DefaultUrls.USDM_PROD_URL : DefaultUrls.USDM_UAT_URL;
            OKXAccount okxAccount = new OKXAccount(
                    baseUrl,
                    primaryAccount.getApiKey(),
                    primaryAccount.getSecretKey(),
                    primaryAccount.getPassphrase(),
                    !primaryAccount.isAccountType()
            );
            // 2. 构建 OkxConfig
            // TODO: 参数可通过配置文件/数据库动态读取
            okxConfig = OkxConfig.builder()
                    .apiKey(primaryAccount.getApiKey())
                    .secretKey(primaryAccount.getSecretKey())
                    .passphrase(primaryAccount.getPassphrase())
                    .instId("BTC-USDT-SWAP")
                    .leverage("100")
                    .tdMode("cross")
                    .gridRate(new BigDecimal("0.0025"))
                    .expectedProfit(new BigDecimal("2"))
                    .maxLoss(new BigDecimal("15"))
                    .quantity("1")
                    .baseQuantity("10")
                    .priceScale(2)
                    .ctVal(new BigDecimal("0.1"))
                    .isSimulate(!primaryAccount.isAccountType())
                    .gridQueueSize(300)
                    .marginRatioLimit(new BigDecimal("0.2"))
                    .build();
            // 3. 初始化交易服务:查账户 → 清条件单 → 平已有仓位 → 设杠杆
            gridTradeService = new OkxGridTradeService(okxConfig, okxAccount);
            gridTradeService.init();
            // 4. 创建 WS 客户端并注册频道处理器
            // 公共 WS:candle1m(K线数据)
            gridWsClientPublic = new OkxGridWsClient(primaryAccount, true);
            gridWsClientPublic.addChannelHandler(new OkxKlineChannelHandler(okxConfig.getInstId(), gridTradeService));
            gridWsClientPublic.init();
            // 私有 WS:positions + orders-algo(持仓 + 条件单)
            gridWsClientPrivate = new OkxGridWsClient(primaryAccount, false);
            gridWsClientPrivate.addChannelHandler(new OkxPositionsChannelHandler(okxConfig.getInstId(), gridTradeService));
            gridWsClientPrivate.addChannelHandler(new OkxAlgoOrdersChannelHandler(okxConfig.getInstId(), gridTradeService));
            gridWsClientPrivate.init();
            log.info("[OKX-Manager] WS已连接, 公共: candle1m, 私有: positions/orders-algo");
            // 5. 激活策略,等待首根 K 线触发基底双开
            gridTradeService.startGrid();
            log.info("[OKX-Manager] 初始化完成");
        } catch (Exception e) {
            log.error("[OKX-Manager] 初始化失败", e);
        }
        log.info("OkxWebSocketClientManager初始化完成");
    }
    /**
     * 销毁方法,在Spring Bean销毁前执行
     * 关闭所有WebSocket客户端连接和相关资源
     * 销毁:停止策略 → 关闭交易线程池 → 取消 WS 订阅 → 断开连接 → 关闭 WS 线程池。
     */
    @PreDestroy
    public void destroy() {
        log.info("开始销毁OkxWebSocketClientManager");
        // 关闭价格WebSocket客户端
        if (newPriceClient != null) {
        log.info("[OKX-Manager] 开始销毁...");
        if (gridTradeService != null) {
            try {
                newPriceClient.destroy();
                log.info("已销毁OkxNewPriceWebSocketClient");
                gridTradeService.stopGrid();
            } catch (Exception e) {
                log.error("销毁OkxNewPriceWebSocketClient失败", e);
                log.error("[OKX-Manager] 停止策略失败", e);
            }
        }
        // 关闭所有量化交易WebSocket客户端实例
        for (Map.Entry<String, OkxQuantWebSocketClient> entry : quantClientMap.entrySet()) {
        if (gridWsClientPublic != null) {
            try {
                OkxQuantWebSocketClient client = entry.getValue();
                client.destroy();
                log.info("已销毁账号 {} 的WebSocket客户端", entry.getKey());
                gridWsClientPublic.destroy();
            } catch (Exception e) {
                log.error("销毁账号 {} 的WebSocket客户端失败", entry.getKey(), e);
                log.error("[OKX-Manager] 销毁公共WS客户端失败", e);
            }
        }
        // 清空客户端映射
        quantClientMap.clear();
        log.info("OkxWebSocketClientManager销毁完成");
        if (gridWsClientPrivate != null) {
            try {
                gridWsClientPrivate.destroy();
            } catch (Exception e) {
                log.error("[OKX-Manager] 销毁私有WS客户端失败", e);
            }
        }
        log.info("[OKX-Manager] 销毁完成");
    }
    /**
     * 获取指定账号的OkxQuantWebSocketClient实例
     * @param accountName 账号类型名称
     * @return WebSocket客户端实例
     */
    public OkxQuantWebSocketClient getClient(String accountName) {
        return quantClientMap.get(accountName);
    }
    /**
     * 获取所有OkxQuantWebSocketClient实例
     * @return 所有客户端实例的集合
     */
    public Collection<OkxQuantWebSocketClient> getAllClients() {
        return quantClientMap.values();
    }
    /**
     * 获取OkxNewPriceWebSocketClient实例
     * @return 价格WebSocket客户端实例
     */
    public OkxNewPriceWebSocketClient getNewPriceClient() {
        return newPriceClient;
    }
}
    /** @return 网格交易策略服务实例 */
    public OkxGridTradeService getGridTradeService() { return gridTradeService; }
    /** @return 网格交易公共 WS 客户端实例 */
    public OkxGridWsClient getGridWsClientPublic() { return gridWsClientPublic; }
    /** @return 网格交易私有 WS 客户端实例 */
    public OkxGridWsClient getGridWsClientPrivate() { return gridWsClientPrivate; }
    public OkxConfig getOkxConfig() { return okxConfig; }
}