| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
| 4 days ago | Administrator | ![]() |
src/main/java/com/xcong/excoin/modules/gateApi/Example.java
@@ -51,16 +51,15 @@ } /** * 设置杠杆倍数 * 设置杠杆倍数(双向持仓模式专用) * 设置合理的杠杆倍数,不能为0 */ String leverage = "25"; futuresApi.updateContractPositionLeverageCall( futuresApi.updateDualModePositionLeverageCall( settle, contract, leverage, marginMode, position_mode, null); } catch (GateApiException e) { System.err.println(String.format("Gate api exception, label: %s, message: %s", e.getErrorLabel(), e.getMessage())); src/main/java/com/xcong/excoin/modules/gateApi/GateConfig.java
New file @@ -0,0 +1,186 @@ package com.xcong.excoin.modules.gateApi; import java.math.BigDecimal; /** * Gate 模块统一配置。 * * <p>通过 Builder 模式集中管理所有运行参数,避免参数散落在多个文件中。 * 提供 REST API 和 WebSocket 地址的自动环境切换(测试网/生产网)。 * * <h3>使用示例</h3> * <pre> * GateConfig config = GateConfig.builder() * .apiKey("...") * .apiSecret("...") * .contract("XAU_USDT") * .leverage("100") * .gridRate(new BigDecimal("0.0035")) * .contractMultiplier("0.001") * .isProduction(false) * .build(); * * String restUrl = config.getRestBasePath(); // 自动返回测试网或生产网地址 * String wsUrl = config.getWsUrl(); * </pre> * * <h3>默认值</h3> * <ul> * <li>合约: BTC_USDT, 杠杆: 10x, 全仓, 双向持仓</li> * <li>网格间距: 0.35%, 队列容量: 50, 保证金比例上限: 20%</li> * <li>止盈: 0.5 USDT, 亏损上限: 7.5 USDT</li> * <li>数量: 1 张, 合约乘数: 0.001, 环境: 测试网</li> * </ul> * * @author Administrator */ public class GateConfig { /** 未实现盈亏计价模式 */ public enum PnLPriceMode { /** 按最新成交价计算 */ LAST_PRICE, /** 按标记价格计算 */ MARK_PRICE } /** Gate API v4 密钥 */ private final String apiKey; /** Gate API v4 签名密钥 */ private final String apiSecret; /** 合约名称(如 XAU_USDT) */ private final String contract; /** 杠杆倍数 */ private final String leverage; /** 保证金模式(cross / isolated) */ private final String marginMode; /** 持仓模式(single / dual / dual_plus) */ private final String positionMode; /** 网格间距比例(如 0.0035 表示 0.35%) */ private final BigDecimal gridRate; /** 整体止盈阈值(USDT) */ private final BigDecimal overallTp; /** 最大亏损阈值(USDT) */ private final BigDecimal maxLoss; /** 下单数量(合约张数) */ private final String quantity; /** 是否为生产环境 */ private final boolean isProduction; /** 补仓最大重试次数 */ private final int reopenMaxRetries; /** 网格队列容量 */ private final int gridQueueSize; /** 保证金占初始本金比例上限 */ private final BigDecimal marginRatioLimit; /** 合约乘数(单张合约代表的基础资产数量,如 BTC_USDT=0.001, ETH_USDT=0.01) */ private final BigDecimal contractMultiplier; /** 未实现盈亏计价模式:最新价 / 标记价格 */ private final PnLPriceMode unrealizedPnlPriceMode; private GateConfig(Builder builder) { this.apiKey = builder.apiKey; this.apiSecret = builder.apiSecret; this.contract = builder.contract; this.leverage = builder.leverage; this.marginMode = builder.marginMode; this.positionMode = builder.positionMode; this.gridRate = builder.gridRate; this.overallTp = builder.overallTp; this.maxLoss = builder.maxLoss; this.quantity = builder.quantity; this.isProduction = builder.isProduction; this.reopenMaxRetries = builder.reopenMaxRetries; this.gridQueueSize = builder.gridQueueSize; this.marginRatioLimit = builder.marginRatioLimit; this.contractMultiplier = builder.contractMultiplier; this.unrealizedPnlPriceMode = builder.unrealizedPnlPriceMode; } /** * 根据环境返回 REST API 基础路径。 * <ul> * <li>测试网: {@code https://api-testnet.gateapi.io/api/v4}</li> * <li>生产网: {@code https://api.gateio.ws/api/v4}</li> * </ul> */ public String getRestBasePath() { return isProduction ? "https://api.gateio.ws/api/v4" : "https://api-testnet.gateapi.io/api/v4"; } /** * 根据环境返回 WebSocket 地址。 * <ul> * <li>测试网: {@code wss://ws-testnet.gate.com/v4/ws/futures/usdt}</li> * <li>生产网: {@code wss://fx-ws.gateio.ws/v4/ws/usdt}</li> * </ul> */ public String getWsUrl() { return isProduction ? "wss://fx-ws.gateio.ws/v4/ws/usdt" : "wss://ws-testnet.gate.com/v4/ws/futures/usdt"; } public String getApiKey() { return apiKey; } public String getApiSecret() { return apiSecret; } public String getContract() { return contract; } public String getLeverage() { return leverage; } public String getMarginMode() { return marginMode; } public String getPositionMode() { return positionMode; } public BigDecimal getGridRate() { return gridRate; } public BigDecimal getOverallTp() { return overallTp; } public BigDecimal getMaxLoss() { return maxLoss; } public String getQuantity() { return quantity; } public boolean isProduction() { return isProduction; } public int getReopenMaxRetries() { return reopenMaxRetries; } public int getGridQueueSize() { return gridQueueSize; } public BigDecimal getMarginRatioLimit() { return marginRatioLimit; } public BigDecimal getContractMultiplier() { return contractMultiplier; } public PnLPriceMode getUnrealizedPnlPriceMode() { return unrealizedPnlPriceMode; } public static Builder builder() { return new Builder(); } /** * GateConfig 的流式构造器,提供合理的默认值。 */ public static class Builder { private String apiKey; private String apiSecret; private String contract = "BTC_USDT"; private String leverage = "10"; private String marginMode = "cross"; private String positionMode = "dual"; private BigDecimal gridRate = new BigDecimal("0.0035"); private BigDecimal overallTp = new BigDecimal("0.5"); private BigDecimal maxLoss = new BigDecimal("7.5"); private String quantity = "1"; private boolean isProduction = false; private int reopenMaxRetries = 3; private int gridQueueSize = 50; private BigDecimal marginRatioLimit = new BigDecimal("0.2"); private BigDecimal contractMultiplier = new BigDecimal("0.001"); private PnLPriceMode unrealizedPnlPriceMode = PnLPriceMode.LAST_PRICE; public Builder apiKey(String apiKey) { this.apiKey = apiKey; return this; } public Builder apiSecret(String apiSecret) { this.apiSecret = apiSecret; return this; } public Builder contract(String contract) { this.contract = contract; return this; } public Builder leverage(String leverage) { this.leverage = leverage; return this; } public Builder marginMode(String marginMode) { this.marginMode = marginMode; return this; } public Builder positionMode(String positionMode) { this.positionMode = positionMode; return this; } public Builder gridRate(BigDecimal gridRate) { this.gridRate = gridRate; return this; } public Builder overallTp(BigDecimal overallTp) { this.overallTp = overallTp; return this; } public Builder maxLoss(BigDecimal maxLoss) { this.maxLoss = maxLoss; return this; } public Builder quantity(String quantity) { this.quantity = quantity; return this; } public Builder isProduction(boolean isProduction) { this.isProduction = isProduction; return this; } public Builder reopenMaxRetries(int reopenMaxRetries) { this.reopenMaxRetries = reopenMaxRetries; return this; } public Builder contractMultiplier(BigDecimal contractMultiplier) { this.contractMultiplier = contractMultiplier; return this; } public Builder unrealizedPnlPriceMode(PnLPriceMode mode) { this.unrealizedPnlPriceMode = mode; return this; } public GateConfig build() { return new GateConfig(this); } } } src/main/java/com/xcong/excoin/modules/gateApi/GateGridTradeService.java
@@ -3,415 +3,500 @@ import io.gate.gateapi.ApiClient; import io.gate.gateapi.ApiException; import io.gate.gateapi.GateApiException; import io.gate.gateapi.api.AccountApi; import io.gate.gateapi.api.FuturesApi; import io.gate.gateapi.models.AccountDetail; import io.gate.gateapi.models.FuturesAccount; import io.gate.gateapi.models.FuturesInitialOrder; import io.gate.gateapi.models.FuturesOrder; import io.gate.gateapi.models.FuturesPriceTrigger; import io.gate.gateapi.models.FuturesPriceTriggeredOrder; import io.gate.gateapi.models.TriggerOrderResponse; import io.gate.gateapi.models.Position; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; import java.util.Collections; import java.util.List; /** * Gate 网格交易服务类,使用 gate-api SDK 进行合约下单。 * 策略:多空双开 → 设置止盈止损点位 → 网格循环交易 * Gate 网格交易服务 — 策略核心。 * * 测试参数: * 品种: XAU_USDT(黄金) * 杠杆: 100x(全仓) * 数量: 0.01 XAU * 网格: 0.0035(千分之三点五) * 整体止盈: 0.5 USDT * 循环次数: 3 * 报警: 本金亏损 15%(初始本金 50 USDT) * <h3>策略</h3> * 多空双开基底 → 生成价格网格队列 → K线触达网格线 → 开仓+设止盈 → 队列动态转移。 * 每根 K 线更新 {@code unrealizedPnl}(浮动盈亏),平仓后累加到 {@code cumulativePnl}(已实现盈亏)。 * * <h3>未实现盈亏公式(正向合约)</h3> * <pre> * 多仓: 持仓量 × 合约乘数 × (计价价格 − 开仓均价) * 空仓: 持仓量 × 合约乘数 × (开仓均价 − 计价价格) * </pre> * 计价价格支持切换:{@link GateConfig.PnLPriceMode#LAST_PRICE 最新成交价} 或 * {@link GateConfig.PnLPriceMode#MARK_PRICE 标记价格}(通过 {@link #setMarkPrice(BigDecimal)} 注入)。 * 入场价和持仓量由 {@link #onPositionUpdate(String, Position.ModeEnum, BigDecimal, BigDecimal)} 实时更新。 * * <h3>状态机</h3> * <pre> * WAITING_KLINE → (首K线) → 异步双开基底 * * 仓位推送(dual_long/dual_short) → 基底成交 → 记录入场价 → 双基底都成交 → 生成队列 → ACTIVE * * ACTIVE: * ├─ 每根K线 → 更新 unrealizedPnl + processShortGrid + processLongGrid * │ ├─ 当前价 < 空仓队列元素 → 匹配 → 开空 + 队列元素转移到多仓队列 * │ └─ 当前价 > 多仓队列元素 → 匹配 → 开多 + 队列元素转移到空仓队列 * ├─ 仓位推送(非基底) → 设止盈条件单 entry × (1±gridRate) * ├─ 保证金≥初始本金 marginRatioLimit → 跳过开仓,队列照常更新 * └─ cumulativePnl ≥ overallTp 或 ≤ -maxLoss → STOPPED * </pre> * * @author Administrator */ @Slf4j public class GateGridTradeService { private final ApiClient apiClient; public enum StrategyState { WAITING_KLINE, OPENING, ACTIVE, STOPPED } private static final String AUTO_SIZE_LONG = "close_long"; private static final String AUTO_SIZE_SHORT = "close_short"; private static final String ORDER_TYPE_CLOSE_LONG = "close-long-position"; private static final String ORDER_TYPE_CLOSE_SHORT = "close-short-position"; private final GateConfig config; private final GateTradeExecutor executor; private final FuturesApi futuresApi; private static final String SETTLE = "usdt"; private final String contract; private final String leverage; private final String marginMode; private final BigDecimal gridRate; private final BigDecimal overallTp; private final int maxCycles; private final BigDecimal maxLoss; private final String quantity; private final String positionMode; private volatile StrategyState state = StrategyState.WAITING_KLINE; private volatile boolean strategyActive = false; private int currentCycle = 0; private BigDecimal totalProfit = BigDecimal.ZERO; private BigDecimal longEntryPrice; private BigDecimal shortEntryPrice; private Long longOrderId; private Long shortOrderId; /** 空仓价格队列,降序排列(大→小),容量 gridQueueSize */ private final List<BigDecimal> shortPriceQueue = Collections.synchronizedList(new ArrayList<>()); /** 多仓价格队列,升序排列(小→大),容量 gridQueueSize */ private final List<BigDecimal> longPriceQueue = Collections.synchronizedList(new ArrayList<>()); private volatile BigDecimal lastClosePrice; /** 基底空头入场价 */ private BigDecimal shortBaseEntryPrice; /** 基底多头入场价 */ private BigDecimal longBaseEntryPrice; /** 基底多头是否已开 */ private volatile boolean baseLongOpened = false; /** 基底空头是否已开 */ private volatile boolean baseShortOpened = false; public GateGridTradeService(String apiKey, String apiSecret, String contract, String leverage, String marginMode,String positionMode, BigDecimal gridRate, BigDecimal overallTp, int maxCycles, BigDecimal maxLoss, String quantity) { this.contract = contract; this.leverage = leverage; this.marginMode = marginMode; this.gridRate = gridRate; this.overallTp = overallTp; this.maxCycles = maxCycles; this.maxLoss = maxLoss; this.quantity = quantity; this.positionMode = positionMode; /** 空头是否活跃(有仓位) */ private volatile boolean shortActive = false; /** 多头是否活跃(有仓位) */ private volatile boolean longActive = false; this.apiClient = new ApiClient(); this.apiClient.setBasePath("https://api-testnet.gateapi.io/api/v4"); this.apiClient.setApiKeySecret(apiKey, apiSecret); private volatile BigDecimal lastKlinePrice; private volatile BigDecimal markPrice = BigDecimal.ZERO; private volatile BigDecimal cumulativePnl = BigDecimal.ZERO; private volatile BigDecimal unrealizedPnl = BigDecimal.ZERO; private volatile BigDecimal longEntryPrice = BigDecimal.ZERO; private volatile BigDecimal shortEntryPrice = BigDecimal.ZERO; private volatile BigDecimal longPositionSize = BigDecimal.ZERO; private volatile BigDecimal shortPositionSize = BigDecimal.ZERO; private Long userId; private volatile BigDecimal initialPrincipal = BigDecimal.ZERO; public GateGridTradeService(GateConfig config) { this.config = config; ApiClient apiClient = new ApiClient(); apiClient.setBasePath(config.getRestBasePath()); apiClient.setApiKeySecret(config.getApiKey(), config.getApiSecret()); this.futuresApi = new FuturesApi(apiClient); this.executor = new GateTradeExecutor(apiClient, config.getContract()); } /** * 初始化账户:设置持仓模式 + 杠杆 */ // ---- 初始化 ---- public void init() { try { futuresApi.updateContractPositionLeverageCall( SETTLE, contract, leverage, marginMode, positionMode, null); log.info("[GateGrid] 已设置杠杆: {}x, 保证金模式: {}", leverage, marginMode); ApiClient detailClient = new ApiClient(); detailClient.setBasePath(config.getRestBasePath()); detailClient.setApiKeySecret(config.getApiKey(), config.getApiSecret()); AccountDetail detail = new AccountApi(detailClient).getAccountDetail(); this.userId = detail.getUserId(); log.info("[Gate] 用户ID: {}", userId); FuturesAccount account = futuresApi.listFuturesAccounts(SETTLE); log.info("[GateGrid] 账户可用余额: {}, 总资产: {}", account.getAvailable(), account.getTotal()); String positionModeSet = account.getPositionMode(); if (!positionMode.equals(positionModeSet)){ futuresApi.setPositionMode(SETTLE, positionMode); this.initialPrincipal = new BigDecimal(account.getTotal()); log.info("[Gate] 初始本金: {} USDT", initialPrincipal); if (!config.getPositionMode().equals(account.getPositionMode())) { futuresApi.setPositionMode(SETTLE, config.getPositionMode()); } log.info("[GateGrid] 已设置双向持仓模式"); log.info("[Gate] 持仓模式: {} 余额: {}", config.getPositionMode(), account.getAvailable()); futuresApi.cancelPriceTriggeredOrderList(SETTLE, config.getContract()); log.info("[Gate] 旧条件单已清除"); closeExistingPositions(); futuresApi.updateDualModePositionLeverageCall( SETTLE, config.getContract(), config.getLeverage(), config.getMarginMode(), null); log.info("[Gate] 杠杆: {}x {}", config.getLeverage(), config.getMarginMode()); } catch (GateApiException e) { log.error("[GateGrid] 初始化失败, label: {}, msg: {}", e.getErrorLabel(), e.getMessage()); log.error("[Gate] 初始化失败, label:{}, msg:{}", e.getErrorLabel(), e.getMessage()); } catch (ApiException e) { log.error("[GateGrid] 初始化API调用失败, code: {}", e.getCode()); log.error("[Gate] 初始化失败, code:{}", e.getCode()); } } /** * 启动网格策略 */ public void startGrid() { if (strategyActive) { log.warn("[GateGrid] 策略已在运行中"); return; } strategyActive = true; currentCycle = 0; totalProfit = BigDecimal.ZERO; log.info("[GateGrid] 网格策略启动, cycle: {}", currentCycle + 1); dualOpenPositions(); } /** * 停止网格策略 */ public void stopGrid() { strategyActive = false; closeAllPositions(); log.info("[GateGrid] 网格策略已停止, 总盈亏: {}, 循环: {}", totalProfit, currentCycle); } /** * K线回调:收到新的收盘价 */ public void onKline(BigDecimal closePrice) { lastClosePrice = closePrice; if (!strategyActive) { return; } checkPositions(closePrice); } /** * 多空双开 */ private void dualOpenPositions() { private void closeExistingPositions() { try { FuturesOrder longOrder = new FuturesOrder(); longOrder.setContract(contract); longOrder.setSize(quantity); longOrder.setPrice("0"); longOrder.setTif(FuturesOrder.TifEnum.IOC); longOrder.setText("t-grid-long-" + (currentCycle + 1)); FuturesOrder longResult = futuresApi.createFuturesOrder(SETTLE, longOrder, null); longOrderId = longResult.getId(); longEntryPrice = safeDecimal(longResult.getFillPrice()); log.info("[GateGrid] 开多成功, price: {}, id: {}", longEntryPrice, longOrderId); placeLongTpSl(longEntryPrice); FuturesOrder shortOrder = new FuturesOrder(); shortOrder.setContract(contract); shortOrder.setSize(negateQuantity(quantity)); shortOrder.setPrice("0"); shortOrder.setTif(FuturesOrder.TifEnum.IOC); shortOrder.setText("t-grid-short-" + (currentCycle + 1)); FuturesOrder shortResult = futuresApi.createFuturesOrder(SETTLE, shortOrder, null); shortOrderId = shortResult.getId(); shortEntryPrice = safeDecimal(shortResult.getFillPrice()); log.info("[GateGrid] 开空成功, price: {}, id: {}", shortEntryPrice, shortOrderId); placeShortTpSl(shortEntryPrice); printGridInfo(); List<Position> positions = futuresApi.listPositions(SETTLE).execute(); if (positions == null || positions.isEmpty()) { log.info("[Gate] 无已有仓位"); return; } for (Position pos : positions) { if (!config.getContract().equals(pos.getContract())) { continue; } String sizeStr = pos.getSize(); long size = Long.parseLong(sizeStr); if (size == 0) { continue; } String closeSize = size > 0 ? String.valueOf(-size) : String.valueOf(Math.abs(size)); Position.ModeEnum mode = pos.getMode(); FuturesOrder closeOrder = new FuturesOrder(); closeOrder.setContract(config.getContract()); closeOrder.setPrice("0"); closeOrder.setTif(FuturesOrder.TifEnum.IOC); closeOrder.setReduceOnly(true); if (mode != null && mode.getValue() != null && mode.getValue().contains("dual")) { closeOrder.setSize("0"); closeOrder.setClose(false); closeOrder.setAutoSize(size > 0 ? FuturesOrder.AutoSizeEnum.LONG : FuturesOrder.AutoSizeEnum.SHORT); } else { closeOrder.setSize(closeSize); } closeOrder.setText("t-grid-init-close"); futuresApi.createFuturesOrder(SETTLE, closeOrder, null); log.info("[Gate] 平已有仓位, 方向:{}, size:{}, mode:{}", size > 0 ? "多" : "空", sizeStr, mode); } } catch (GateApiException e) { log.error("[GateGrid] 双开失败, label: {}, msg: {}", e.getErrorLabel(), e.getMessage()); strategyActive = false; log.warn("[Gate] 平仓位失败, label:{}, msg:{}", e.getErrorLabel(), e.getMessage()); } catch (Exception e) { log.error("[GateGrid] 双开异常", e); strategyActive = false; log.warn("[Gate] 平仓位异常", e); } } private void placeLongTpSl(BigDecimal entryPrice) { BigDecimal tpPrice = entryPrice.multiply(BigDecimal.ONE.add(gridRate)).setScale(1, RoundingMode.HALF_UP); placePriceTriggeredOrder(tpPrice, FuturesPriceTrigger.RuleEnum.NUMBER_1, "close-long-position", "close_long"); log.info("[GateGrid] 多头止盈已设置, TP:{}", tpPrice); // ---- 启动/停止 ---- public void startGrid() { if (state != StrategyState.WAITING_KLINE && state != StrategyState.STOPPED) { log.warn("[Gate] 策略已在运行中, state:{}", state); return; } state = StrategyState.WAITING_KLINE; cumulativePnl = BigDecimal.ZERO; unrealizedPnl = BigDecimal.ZERO; markPrice = BigDecimal.ZERO; longEntryPrice = BigDecimal.ZERO; shortEntryPrice = BigDecimal.ZERO; longPositionSize = BigDecimal.ZERO; shortPositionSize = BigDecimal.ZERO; baseLongOpened = false; baseShortOpened = false; longActive = false; shortActive = false; shortPriceQueue.clear(); longPriceQueue.clear(); log.info("[Gate] 网格策略已启动"); } private void placeShortTpSl(BigDecimal entryPrice) { BigDecimal tpPrice = entryPrice.multiply(BigDecimal.ONE.subtract(gridRate)).setScale(1, RoundingMode.HALF_UP); placePriceTriggeredOrder(tpPrice, FuturesPriceTrigger.RuleEnum.NUMBER_2, "close-short-position", "close_short"); log.info("[GateGrid] 空头止盈已设置, TP:{}", tpPrice); public void stopGrid() { state = StrategyState.STOPPED; executor.cancelAllPriceTriggeredOrders(); executor.shutdown(); log.info("[Gate] 策略已停止, 累计盈亏: {}", cumulativePnl); } private void placePriceTriggeredOrder(BigDecimal triggerPrice, FuturesPriceTrigger.RuleEnum rule, String orderType, String autoSize) { // ---- K线回调 ---- public void onKline(BigDecimal closePrice) { lastKlinePrice = closePrice; updateUnrealizedPnl(); if (state == StrategyState.STOPPED) { return; } if (state == StrategyState.WAITING_KLINE) { state = StrategyState.OPENING; log.info("[Gate] 首根K线到达,开基底仓位..."); executor.openLong(config.getQuantity(), () -> { log.info("[Gate] 基底多单已提交"); }, null); executor.openShort(negate(config.getQuantity()), () -> { log.info("[Gate] 基底空单已提交"); }, null); return; } if (state != StrategyState.ACTIVE) { return; } processShortGrid(closePrice); processLongGrid(closePrice); } // ---- 仓位推送回调 ---- public void onPositionUpdate(String contract, Position.ModeEnum mode, BigDecimal size, BigDecimal entryPrice) { if (state == StrategyState.STOPPED || state == StrategyState.WAITING_KLINE) { return; } boolean hasPosition = size.abs().compareTo(BigDecimal.ZERO) > 0; if (Position.ModeEnum.DUAL_LONG == mode) { if (hasPosition) { longActive = true; longEntryPrice = entryPrice; longPositionSize = size; if (!baseLongOpened) { longBaseEntryPrice = entryPrice; baseLongOpened = true; log.info("[Gate] 基底多成交价: {}", longBaseEntryPrice); tryGenerateQueues(); } else { BigDecimal tpPrice = entryPrice.multiply(BigDecimal.ONE.add(config.getGridRate())).setScale(1, RoundingMode.HALF_UP); executor.placeTakeProfit(tpPrice, FuturesPriceTrigger.RuleEnum.NUMBER_1, ORDER_TYPE_CLOSE_LONG, AUTO_SIZE_LONG); log.info("[Gate] 多单止盈已设, entry:{}, tp:{}", entryPrice, tpPrice); } } else { longActive = false; longPositionSize = BigDecimal.ZERO; } } else if (Position.ModeEnum.DUAL_SHORT == mode) { if (hasPosition) { shortActive = true; shortEntryPrice = entryPrice; shortPositionSize = size.abs(); if (!baseShortOpened) { shortBaseEntryPrice = entryPrice; baseShortOpened = true; log.info("[Gate] 基底空成交价: {}", shortBaseEntryPrice); tryGenerateQueues(); } else { BigDecimal tpPrice = entryPrice.multiply(BigDecimal.ONE.subtract(config.getGridRate())).setScale(1, RoundingMode.HALF_UP); executor.placeTakeProfit(tpPrice, FuturesPriceTrigger.RuleEnum.NUMBER_2, ORDER_TYPE_CLOSE_SHORT, AUTO_SIZE_SHORT); log.info("[Gate] 空单止盈已设, entry:{}, tp:{}", entryPrice, tpPrice); } } else { shortActive = false; shortPositionSize = BigDecimal.ZERO; } } } // ---- 平仓推送回调 ---- public void onPositionClose(String contract, String side, BigDecimal pnl) { if (state == StrategyState.STOPPED) { return; } cumulativePnl = cumulativePnl.add(pnl); log.info("[Gate] 盈亏累加:{}, 方向:{}, 累计:{}", pnl, side, cumulativePnl); if (cumulativePnl.compareTo(config.getOverallTp()) >= 0) { log.info("[Gate] 已达止盈目标 {}→已停止", cumulativePnl); state = StrategyState.STOPPED; } else if (cumulativePnl.compareTo(config.getMaxLoss().negate()) <= 0) { log.info("[Gate] 已达亏损上限 {}→已停止", cumulativePnl); state = StrategyState.STOPPED; } } // ---- 网格队列处理 ---- private void tryGenerateQueues() { if (baseLongOpened && baseShortOpened) { generateShortQueue(); generateLongQueue(); state = StrategyState.ACTIVE; log.info("[Gate] 网格队列已生成, 空队首:{} → 尾:{}, 多队首:{} → 尾:{}, 已激活", shortPriceQueue.get(0), shortPriceQueue.get(shortPriceQueue.size() - 1), longPriceQueue.get(0), longPriceQueue.get(longPriceQueue.size() - 1)); } } private void generateShortQueue() { shortPriceQueue.clear(); BigDecimal step = config.getGridRate(); for (int i = 1; i <= config.getGridQueueSize(); i++) { shortPriceQueue.add(shortBaseEntryPrice.multiply(BigDecimal.ONE.subtract(step.multiply(BigDecimal.valueOf(i)))).setScale(1, RoundingMode.HALF_UP)); } shortPriceQueue.sort((a, b) -> b.compareTo(a)); //输出队列:shortPriceQueue; log.info("[Gate] 空队列:{}", shortPriceQueue); } private void generateLongQueue() { longPriceQueue.clear(); BigDecimal step = config.getGridRate(); for (int i = 1; i <= config.getGridQueueSize(); i++) { longPriceQueue.add(longBaseEntryPrice.multiply(BigDecimal.ONE.add(step.multiply(BigDecimal.valueOf(i)))).setScale(1, RoundingMode.HALF_UP)); } longPriceQueue.sort(BigDecimal::compareTo); log.info("[Gate] 多队列:{}", longPriceQueue); } private void processShortGrid(BigDecimal currentPrice) { List<BigDecimal> matched = new ArrayList<>(); synchronized (shortPriceQueue) { for (BigDecimal p : shortPriceQueue) { if (p.compareTo(currentPrice) > 0) { matched.add(p); } else { break; } } } if (matched.isEmpty()) { return; } log.info("[Gate] 空仓队列触发, 匹配{}个元素, 当前价:{}", matched.size(), currentPrice); if (!isMarginSafe()) { log.warn("[Gate] 保证金超限,跳过空单开仓"); } else { executor.openShort(negate(config.getQuantity()), null, null); } synchronized (shortPriceQueue) { shortPriceQueue.removeAll(matched); BigDecimal min = shortPriceQueue.isEmpty() ? matched.get(matched.size() - 1) : shortPriceQueue.get(shortPriceQueue.size() - 1); BigDecimal step = config.getGridRate(); for (int i = 0; i < matched.size(); i++) { min = min.multiply(BigDecimal.ONE.subtract(step)).setScale(1, RoundingMode.HALF_UP); shortPriceQueue.add(min); } shortPriceQueue.sort((a, b) -> b.compareTo(a)); } synchronized (longPriceQueue) { longPriceQueue.addAll(matched); longPriceQueue.sort(BigDecimal::compareTo); while (longPriceQueue.size() > config.getGridQueueSize()) { longPriceQueue.remove(longPriceQueue.size() - 1); } } } private void processLongGrid(BigDecimal currentPrice) { List<BigDecimal> matched = new ArrayList<>(); synchronized (longPriceQueue) { for (BigDecimal p : longPriceQueue) { if (p.compareTo(currentPrice) < 0) { matched.add(p); } else { break; } } } if (matched.isEmpty()) { return; } log.info("[Gate] 多仓队列触发, 匹配{}个元素, 当前价:{}", matched.size(), currentPrice); if (!isMarginSafe()) { log.warn("[Gate] 保证金超限,跳过多单开仓"); } else { executor.openLong(config.getQuantity(), null, null); } synchronized (longPriceQueue) { longPriceQueue.removeAll(matched); BigDecimal max = longPriceQueue.isEmpty() ? matched.get(matched.size() - 1) : longPriceQueue.get(longPriceQueue.size() - 1); BigDecimal step = config.getGridRate(); for (int i = 0; i < matched.size(); i++) { max = max.multiply(BigDecimal.ONE.add(step)).setScale(1, RoundingMode.HALF_UP); longPriceQueue.add(max); } longPriceQueue.sort(BigDecimal::compareTo); } synchronized (shortPriceQueue) { shortPriceQueue.addAll(matched); shortPriceQueue.sort((a, b) -> b.compareTo(a)); while (shortPriceQueue.size() > config.getGridQueueSize()) { shortPriceQueue.remove(shortPriceQueue.size() - 1); } } } // ---- 保证金安全阀 ---- private boolean isMarginSafe() { try { FuturesPriceTrigger trigger = new FuturesPriceTrigger(); trigger.setStrategyType(FuturesPriceTrigger.StrategyTypeEnum.NUMBER_0); trigger.setPriceType(FuturesPriceTrigger.PriceTypeEnum.NUMBER_0); trigger.setPrice(triggerPrice.toString()); trigger.setRule(rule); trigger.setExpiration(0); FuturesInitialOrder initial = new FuturesInitialOrder(); initial.setContract(contract); initial.setSize(0L); initial.setPrice("0"); initial.setTif(FuturesInitialOrder.TifEnum.IOC); initial.setReduceOnly(true); initial.setAutoSize(autoSize); FuturesPriceTriggeredOrder order = new FuturesPriceTriggeredOrder(); order.setTrigger(trigger); order.setInitial(initial); order.setOrderType(orderType); TriggerOrderResponse response = futuresApi.createPriceTriggeredOrder(SETTLE, order); log.info("[GateGrid] 止盈条件单已创建, triggerPrice:{}, rule:{}, orderType:{}, autoSize:{}, id:{}", triggerPrice, rule, orderType, autoSize, response.getId()); FuturesAccount account = futuresApi.listFuturesAccounts(SETTLE); BigDecimal margin = new BigDecimal(account.getPositionInitialMargin()); BigDecimal ratio = margin.divide(initialPrincipal, 4, RoundingMode.HALF_UP); log.debug("[Gate] 保证金比例: {}/{}={}", margin, initialPrincipal, ratio); return ratio.compareTo(config.getMarginRatioLimit()) < 0; } catch (Exception e) { log.error("[GateGrid] 止盈条件单创建失败, triggerPrice:{}, rule:{}, orderType:{}, autoSize:{}", triggerPrice, rule, orderType, autoSize, e); log.warn("[Gate] 查保证金失败,默认放行", e); return true; } } // ---- 工具 ---- private String negate(String qty) { return qty.startsWith("-") ? qty.substring(1) : "-" + qty; } /** * 检查多空仓位是否触及止盈止损 * 根据持仓和当前价格计算未实现盈亏。 * * <h3>正向合约公式</h3> * <pre> * 多仓: 持仓量 × 合约乘数 × (计价价格 − 开仓均价) * 空仓: 持仓量 × 合约乘数 × (开仓均价 − 计价价格) * </pre> * 计价价格由 {@link GateConfig.PnLPriceMode} 决定:LAST_PRICE 用最新成交价,MARK_PRICE 用标记价格。 */ private void checkPositions(BigDecimal currentPrice) { if (longEntryPrice == null || shortEntryPrice == null) { private void updateUnrealizedPnl() { BigDecimal price = resolvePnlPrice(); if (price == null || price.compareTo(BigDecimal.ZERO) == 0) { return; } BigDecimal multiplier = config.getContractMultiplier(); BigDecimal longPnl = BigDecimal.ZERO; BigDecimal shortPnl = BigDecimal.ZERO; if (longPositionSize.compareTo(BigDecimal.ZERO) > 0 && longEntryPrice.compareTo(BigDecimal.ZERO) > 0) { longPnl = longPositionSize.multiply(multiplier).multiply(price.subtract(longEntryPrice)); } if (shortPositionSize.compareTo(BigDecimal.ZERO) > 0 && shortEntryPrice.compareTo(BigDecimal.ZERO) > 0) { shortPnl = shortPositionSize.multiply(multiplier).multiply(shortEntryPrice.subtract(price)); } unrealizedPnl = longPnl.add(shortPnl); BigDecimal longTp = longEntryPrice.multiply(BigDecimal.ONE.add(gridRate)).setScale(1, RoundingMode.HALF_UP); BigDecimal longSl = longEntryPrice.multiply(BigDecimal.ONE.subtract(gridRate)).setScale(1, RoundingMode.HALF_UP); BigDecimal shortTp = shortEntryPrice.multiply(BigDecimal.ONE.subtract(gridRate)).setScale(1, RoundingMode.HALF_UP); BigDecimal shortSl = shortEntryPrice.multiply(BigDecimal.ONE.add(gridRate)).setScale(1, RoundingMode.HALF_UP); System.out.println("========== Gate 网格状态 =========="); System.out.println("当前价格: " + currentPrice); System.out.println("多头入场: " + longEntryPrice + " TP: " + longTp + " SL: " + longSl); System.out.println("空头入场: " + shortEntryPrice + " TP: " + shortTp + " SL: " + shortSl); System.out.println("累计盈亏: " + totalProfit + " 循环: " + currentCycle + "/" + maxCycles); System.out.println("==================================="); // 多头止盈 if (currentPrice.compareTo(longTp) >= 0) { log.info("[GateGrid] 多头止盈触发! entry:{}, current:{}", longEntryPrice, currentPrice); BigDecimal cnt = new BigDecimal(quantity); BigDecimal profit = currentPrice.subtract(longEntryPrice).multiply(cnt); totalProfit = totalProfit.add(profit); log.info("[GateGrid] 多头止盈 profit:{}, totalProfit:{}", profit, totalProfit); closeLongPosition(); closeShortPosition(); currentCycle++; checkStopConditions(); return; } // 多头止损 if (currentPrice.compareTo(longSl) <= 0) { log.info("[GateGrid] 多头止损触发! entry:{}, current:{}", longEntryPrice, currentPrice); BigDecimal cnt = new BigDecimal(quantity); BigDecimal loss = longEntryPrice.subtract(currentPrice).multiply(cnt); totalProfit = totalProfit.subtract(loss); log.info("[GateGrid] 多头止损 loss:{}, totalProfit:{}", loss, totalProfit); closeLongPosition(); currentCycle++; checkStopConditions(); return; } // 空头止盈 if (currentPrice.compareTo(shortTp) <= 0) { log.info("[GateGrid] 空头止盈触发! entry:{}, current:{}", shortEntryPrice, currentPrice); BigDecimal cnt = new BigDecimal(quantity); BigDecimal profit = shortEntryPrice.subtract(currentPrice).multiply(cnt); totalProfit = totalProfit.add(profit); log.info("[GateGrid] 空头止盈 profit:{}, totalProfit:{}", profit, totalProfit); closeShortPosition(); closeLongPosition(); currentCycle++; checkStopConditions(); return; } // 空头止损 if (currentPrice.compareTo(shortSl) >= 0) { log.info("[GateGrid] 空头止损触发! entry:{}, current:{}", shortEntryPrice, currentPrice); BigDecimal cnt = new BigDecimal(quantity); BigDecimal loss = currentPrice.subtract(shortEntryPrice).multiply(cnt); totalProfit = totalProfit.subtract(loss); log.info("[GateGrid] 空头止损 loss:{}, totalProfit:{}", loss, totalProfit); closeShortPosition(); currentCycle++; checkStopConditions(); } log.info("[Gate] 未实现盈亏: {}", unrealizedPnl); } private void checkStopConditions() { if (totalProfit.compareTo(overallTp) >= 0) { log.info("[GateGrid] 达到整体止盈 {} USDT,停止策略", overallTp); strategyActive = false; return; /** * 根据配置的 PnLPriceMode 返回计价价格。 */ private BigDecimal resolvePnlPrice() { if (config.getUnrealizedPnlPriceMode() == GateConfig.PnLPriceMode.MARK_PRICE && markPrice.compareTo(BigDecimal.ZERO) > 0) { return markPrice; } if (BigDecimal.ZERO.subtract(totalProfit).compareTo(maxLoss) >= 0) { log.info("[GateGrid] 亏损 {} 达到上限 {} USDT,停止策略", totalProfit.negate(), maxLoss); strategyActive = false; return; } if (currentCycle >= maxCycles) { log.info("[GateGrid] 达到最大循环次数 {},停止策略", maxCycles); strategyActive = false; return; } log.info("[GateGrid] 进入下一轮循环: {}", currentCycle + 1); dualOpenPositions(); return lastKlinePrice; } private void closeLongPosition() { if (longEntryPrice == null) { return; } try { FuturesOrder closeOrder = new FuturesOrder(); closeOrder.setContract(contract); closeOrder.setSize(negateQuantity(quantity)); closeOrder.setPrice("0"); closeOrder.setTif(FuturesOrder.TifEnum.IOC); closeOrder.setReduceOnly(true); closeOrder.setText("t-grid-close-long"); FuturesOrder result = futuresApi.createFuturesOrder(SETTLE, closeOrder, null); log.info("[GateGrid] 平多成功, id: {}, fillPrice: {}", result.getId(), result.getFillPrice()); } catch (Exception e) { log.error("[GateGrid] 平多失败", e); } longEntryPrice = null; longOrderId = null; } private void closeShortPosition() { if (shortEntryPrice == null) { return; } try { FuturesOrder closeOrder = new FuturesOrder(); closeOrder.setContract(contract); closeOrder.setSize(quantity); closeOrder.setPrice("0"); closeOrder.setTif(FuturesOrder.TifEnum.IOC); closeOrder.setReduceOnly(true); closeOrder.setText("t-grid-close-short"); FuturesOrder result = futuresApi.createFuturesOrder(SETTLE, closeOrder, null); log.info("[GateGrid] 平空成功, id: {}, fillPrice: {}", result.getId(), result.getFillPrice()); } catch (Exception e) { log.error("[GateGrid] 平空失败", e); } shortEntryPrice = null; shortOrderId = null; } private void closeAllPositions() { closeLongPosition(); closeShortPosition(); } private void printGridInfo() { BigDecimal longTp = BigDecimal.ZERO; BigDecimal longSl = BigDecimal.ZERO; BigDecimal shortTp = BigDecimal.ZERO; BigDecimal shortSl = BigDecimal.ZERO; if (longEntryPrice != null) { longTp = longEntryPrice.multiply(BigDecimal.ONE.add(gridRate)).setScale(1, RoundingMode.HALF_UP); longSl = longEntryPrice.multiply(BigDecimal.ONE.subtract(gridRate)).setScale(1, RoundingMode.HALF_UP); } if (shortEntryPrice != null) { shortTp = shortEntryPrice.multiply(BigDecimal.ONE.subtract(gridRate)).setScale(1, RoundingMode.HALF_UP); shortSl = shortEntryPrice.multiply(BigDecimal.ONE.add(gridRate)).setScale(1, RoundingMode.HALF_UP); } System.out.println("========== Gate 网格开仓 =========="); System.out.println("合约: " + contract + " 杠杆: " + leverage + "x " + marginMode); System.out.println("多头入场: " + longEntryPrice + " TP: " + longTp + " SL: " + longSl); System.out.println("空头入场: " + shortEntryPrice + " TP: " + shortTp + " SL: " + shortSl); System.out.println("数量: " + quantity + " 网格间距: " + gridRate.multiply(new BigDecimal("100")) + "%"); System.out.println("整体止盈: " + overallTp + " USDT 最大循环: " + maxCycles); System.out.println("最大亏损: " + maxLoss + " USDT"); System.out.println("====================================="); } private String negateQuantity(String qty) { if (qty.startsWith("-")) { return qty.substring(1); } return "-" + qty; } private BigDecimal safeDecimal(String val) { if (val == null || val.isEmpty()) { return BigDecimal.ZERO; } return new BigDecimal(val); } public BigDecimal getLastClosePrice() { return lastClosePrice; } public boolean isStrategyActive() { return strategyActive; } public BigDecimal getTotalProfit() { return totalProfit; } public int getCurrentCycle() { return currentCycle; } public BigDecimal getLastKlinePrice() { return lastKlinePrice; } public void setMarkPrice(BigDecimal markPrice) { this.markPrice = markPrice; } public boolean isStrategyActive() { return state != StrategyState.STOPPED && state != StrategyState.WAITING_KLINE; } public BigDecimal getCumulativePnl() { return cumulativePnl; } public BigDecimal getUnrealizedPnl() { return unrealizedPnl; } public Long getUserId() { return userId; } public StrategyState getState() { return state; } } src/main/java/com/xcong/excoin/modules/gateApi/GateKlineWebSocketClient.java
@@ -1,94 +1,107 @@ package com.xcong.excoin.modules.gateApi; import cn.hutool.core.collection.CollUtil; import cn.hutool.json.JSONException; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.blackchain.service.DateUtil; import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService; import com.xcong.excoin.modules.okxNewPrice.indicator.macdAndMatrategy.MacdEmaStrategy; import com.xcong.excoin.modules.okxNewPrice.indicator.macdAndMatrategy.MacdMaStrategy; import com.xcong.excoin.modules.okxNewPrice.okxWs.InstrumentsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.TradeOrderWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; import com.xcong.excoin.modules.okxNewPrice.okxWs.param.Kline; import com.xcong.excoin.modules.okxNewPrice.okxWs.param.TradeRequestParam; import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.ExchangeInfoEnum; import com.xcong.excoin.modules.okxNewPrice.okxpi.config.ExchangeLoginService; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import com.xcong.excoin.modules.okxNewPrice.utils.SSLConfig; import com.xcong.excoin.modules.okxNewPrice.utils.WsParamBuild; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.math.BigDecimal; import java.net.URI; import java.net.URISyntaxException; import java.util.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** * Gate K线 WebSocket 客户端类,用于连接 Gate 的 WebSocket 接口, * 实时获取并处理 K线(candlestick)数据。 * 同时支持心跳检测、自动重连以及异常恢复机制。 * Gate WebSocket 连接管理器。 * * <h3>职责</h3> * 负责 TCP 连接的建立、维持和恢复。频道逻辑(订阅/解析)全部委托给 {@link GateChannelHandler} 实现类。 * * <h3>生命周期</h3> * <pre> * init() → connect() → startHeartbeat() * destroy() → unsubscribe 所有 handler → closeBlocking() → shutdown 线程池 * onClose() → reconnectWithBackoff() (最多 3 次,指数退避) * </pre> * * <h3>消息路由</h3> * <pre> * onMessage → handleMessage: * 1. futures.pong → cancelPongTimeout * 2. subscribe/unsubscribe → 日志 * 3. error → 错误日志 * 4. update/all → 遍历 channelHandlers → handler.handleMessage(response) * </pre> * * <h3>心跳机制</h3> * 采用双重检测:TCP 层的 WebSocket ping/pong + 应用层 futures.ping/futures.pong。 * 10 秒未收到任何消息 → 发送 futures.ping;25 秒周期检查。 * * <h3>线程安全</h3> * 连接状态用 AtomicBoolean(isConnected, isConnecting, isInitialized)。 * 消息时间戳用 AtomicReference。心跳任务用 synchronized 保护。 * * @author Administrator */ @SuppressWarnings("ALL") @Slf4j public class GateKlineWebSocketClient { private final CaoZuoService caoZuoService; private final GateWebSocketClientManager clientManager; private final WangGeListService wangGeListService; private WebSocketClient webSocketClient; private ScheduledExecutorService heartbeatExecutor; private volatile ScheduledFuture<?> pongTimeoutFuture; private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); // 连接状态标志 private final AtomicBoolean isConnected = new AtomicBoolean(false); private final AtomicBoolean isConnecting = new AtomicBoolean(false); private final AtomicBoolean isInitialized = new AtomicBoolean(false); private static final String CHANNEL = "futures.candlesticks"; private static final String FUTURES_PING = "futures.ping"; private static final String FUTURES_PONG = "futures.pong"; private static final String GATE_INTERVAL = "1m"; private static final String GATE_CONTRACT = "XAUT_USDT"; private GateGridTradeService gridTradeService; // 心跳超时时间(秒),小于30秒 private static final int HEARTBEAT_TIMEOUT = 10; // 共享线程池用于重连等异步任务 /** WebSocket 地址,由 GateConfig 提供 */ private final String wsUrl; /** Java-WebSocket 客户端实例 */ private WebSocketClient webSocketClient; /** 心跳检测调度器 */ private ScheduledExecutorService heartbeatExecutor; /** 心跳超时 Future */ private volatile ScheduledFuture<?> pongTimeoutFuture; /** 最后收到消息的时间戳(毫秒) */ private final AtomicReference<Long> lastMessageTime = new AtomicReference<>(System.currentTimeMillis()); /** 连接状态 */ private final AtomicBoolean isConnected = new AtomicBoolean(false); /** 连接中标记,防重入 */ private final AtomicBoolean isConnecting = new AtomicBoolean(false); /** 初始化标记,防重复 init */ private final AtomicBoolean isInitialized = new AtomicBoolean(false); /** 频道处理器列表,通过 addChannelHandler 注册 */ private final List<GateChannelHandler> channelHandlers = new ArrayList<>(); /** 重连等异步任务的缓存线程池(daemon 线程) */ private final ExecutorService sharedExecutor = Executors.newCachedThreadPool(r -> { Thread t = new Thread(r, "gate-kline-worker"); Thread t = new Thread(r, "gate-ws-worker"); t.setDaemon(true); return t; }); public GateKlineWebSocketClient(CaoZuoService caoZuoService, GateWebSocketClientManager clientManager, WangGeListService wangGeListService, GateGridTradeService gridTradeService ) { this.caoZuoService = caoZuoService; this.clientManager = clientManager; this.wangGeListService = wangGeListService; this.gridTradeService = gridTradeService; public GateKlineWebSocketClient(String wsUrl) { this.wsUrl = wsUrl; } /** * 初始化方法,创建并初始化WebSocket客户端实例 * 注册频道处理器。需在 init() 前调用。 */ public void addChannelHandler(GateChannelHandler handler) { channelHandlers.add(handler); } /** * 初始化:建立 WebSocket 连接 → 启动心跳。 */ public void init() { if (!isInitialized.compareAndSet(false, true)) { log.warn("GateKlineWebSocketClient 已经初始化过,跳过重复初始化"); log.warn("[WS] 已初始化过,跳过重复初始化"); return; } connect(); @@ -96,23 +109,23 @@ } /** * 销毁方法,关闭WebSocket连接和相关资源 * 销毁:取消订阅 → 关闭连接 → 关闭线程池。 * <p>注意:先 closeBlocking 再 shutdown sharedExecutor, * 避免 onClose 回调中的 reconnectWithBackoff 访问已关闭的线程池。 */ public void destroy() { log.info("开始销毁GateKlineWebSocketClient"); log.info("[WS] 开始销毁..."); if (webSocketClient != null && webSocketClient.isOpen()) { unsubscribeKlineChannels(); for (GateChannelHandler handler : channelHandlers) { handler.unsubscribe(webSocketClient); } try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("取消订阅等待被中断"); log.warn("[WS] 取消订阅等待被中断"); } } if (sharedExecutor != null && !sharedExecutor.isShutdown()) { sharedExecutor.shutdown(); } if (webSocketClient != null && webSocketClient.isOpen()) { @@ -120,8 +133,12 @@ webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("关闭WebSocket连接时被中断"); log.warn("[WS] 关闭连接时被中断"); } } if (sharedExecutor != null && !sharedExecutor.isShutdown()) { sharedExecutor.shutdown(); } shutdownExecutorGracefully(heartbeatExecutor); @@ -130,308 +147,139 @@ } shutdownExecutorGracefully(sharedExecutor); log.info("GateKlineWebSocketClient销毁完成"); log.info("[WS] 销毁完成"); } private static final String WS_URL_MONIPAN = "wss://ws-testnet.gate.com/v4/ws/futures/usdt"; private static final String WS_URL_SHIPAN = "wss://fx-ws.gateio.ws/v4/ws/usdt"; private static final boolean isAccountType = false; /** * 建立与 Gate WebSocket 服务器的连接。 * 设置回调函数以监听连接打开、接收消息、关闭和错误事件。 * 建立 WebSocket 连接。使用 SSLContext 配置 TLS 协议。 * 连接成功后依次订阅所有已注册的频道处理器。 */ private void connect() { // 避免重复连接 if (isConnecting.get()) { log.info("连接已在进行中,跳过重复连接请求"); if (isConnecting.get() || !isConnecting.compareAndSet(false, true)) { log.info("[WS] 连接进行中,跳过重复请求"); return; } if (!isConnecting.compareAndSet(false, true)) { log.info("连接已在进行中,跳过重复连接请求"); return; } try { SSLConfig.configureSSL(); System.setProperty("https.protocols", "TLSv1.2,TLSv1.3"); String WS_URL = WS_URL_MONIPAN; if (isAccountType){ WS_URL = WS_URL_SHIPAN; } URI uri = new URI(WS_URL); // 关闭之前的连接(如果存在) URI uri = new URI(wsUrl); if (webSocketClient != null) { try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.warn("关闭之前连接时被中断"); } try { webSocketClient.closeBlocking(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } webSocketClient = new WebSocketClient(uri) { @Override public void onOpen(ServerHandshake handshake) { log.info("Gate Kline WebSocket连接成功"); log.info("[WS] 连接成功"); isConnected.set(true); isConnecting.set(false); // 检查应用是否正在关闭 if (sharedExecutor != null && !sharedExecutor.isShutdown()) { resetHeartbeatTimer(); subscribeKlineChannels(); subscribePingChannels(); for (GateChannelHandler handler : channelHandlers) { handler.subscribe(webSocketClient); } sendPing(); } else { log.warn("应用正在关闭,忽略WebSocket连接成功回调"); log.warn("[WS] 应用正在关闭,忽略连接成功回调"); } } @Override public void onMessage(String message) { lastMessageTime.set(System.currentTimeMillis()); handleWebSocketMessage(message); handleMessage(message); resetHeartbeatTimer(); } @Override public void onClose(int code, String reason, boolean remote) { log.warn("Gate Kline WebSocket连接关闭: code={}, reason={}", code, reason); log.warn("[WS] 连接关闭, code:{}, reason:{}", code, reason); isConnected.set(false); isConnecting.set(false); cancelPongTimeout(); if (sharedExecutor != null && !sharedExecutor.isShutdown() && !sharedExecutor.isTerminated()) { sharedExecutor.execute(() -> { try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("重连线程被中断", e); } catch (Exception e) { log.error("重连失败", e); } try { reconnectWithBackoff(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (Exception e) { log.error("[WS] 重连失败", e); } }); } else { log.warn("共享线程池已关闭,无法执行重连任务"); log.warn("[WS] 线程池已关闭,不执行重连"); } } @Override public void onError(Exception ex) { log.error("Gate Kline WebSocket发生错误", ex); log.error("[WS] 发生错误", ex); isConnected.set(false); } }; webSocketClient.connect(); } catch (URISyntaxException e) { log.error("WebSocket URI格式错误", e); log.error("[WS] URI格式错误", e); isConnecting.set(false); } } /** * 订阅K线频道。 * 构造 Gate 格式的订阅请求并发送给服务端。 * payload: ["1m", "BTC_USDT"] (间隔, 合约名) * 消息分发:先处理系统事件(pong/subscribe/error), * 再把 update/all 事件路由到各 channelHandler。 * <p>每个 handler 内部通过 channel 名称做二次匹配,匹配成功返回 true 则停止遍历。 */ private void subscribeKlineChannels() { JSONObject subscribeMsg = new JSONObject(); subscribeMsg.put("time", System.currentTimeMillis() / 1000); subscribeMsg.put("channel", CHANNEL); subscribeMsg.put("event", "subscribe"); JSONArray payload = new JSONArray(); payload.add(GATE_INTERVAL); payload.add(GATE_CONTRACT); subscribeMsg.put("payload", payload); webSocketClient.send(subscribeMsg.toJSONString()); log.info("已发送 K线频道订阅请求,合约: {}, 周期: {}", GATE_CONTRACT, GATE_INTERVAL); } private void subscribePingChannels() { JSONObject subscribeMsg = new JSONObject(); subscribeMsg.put("time", System.currentTimeMillis() / 1000); subscribeMsg.put("channel", FUTURES_PING); webSocketClient.send(subscribeMsg.toJSONString()); log.info("已发送 futures.ping"); } private void unsubscribeKlineChannels() { JSONObject unsubscribeMsg = new JSONObject(); unsubscribeMsg.put("time", System.currentTimeMillis() / 1000); unsubscribeMsg.put("channel", CHANNEL); unsubscribeMsg.put("event", "unsubscribe"); JSONArray payload = new JSONArray(); payload.add(GATE_INTERVAL); payload.add(GATE_CONTRACT); unsubscribeMsg.put("payload", payload); webSocketClient.send(unsubscribeMsg.toJSONString()); log.info("已发送 K线频道取消订阅请求,合约: {}, 周期: {}", GATE_CONTRACT, GATE_INTERVAL); } /** * 处理从 WebSocket 收到的消息。 * 包括订阅确认、错误响应、心跳响应以及实际的数据推送。 * * @param message 来自 WebSocket 的原始字符串消息 */ private void handleWebSocketMessage(String message) { private void handleMessage(String message) { try { JSONObject response = JSON.parseObject(message); String channel = response.getString("channel"); String event = response.getString("event"); if (FUTURES_PONG.equals(channel)) { log.debug("收到futures.pong响应"); log.debug("[WS] 收到 pong 响应"); cancelPongTimeout(); } else if ("subscribe".equals(event)) { log.info("{} 频道订阅成功: {}", channel, response.getJSONObject("result")); } else if ("unsubscribe".equals(event)) { log.info("{} 频道取消订阅成功", channel); } else if ("error".equals(event)) { return; } if ("subscribe".equals(event)) { log.info("[WS] {} 订阅成功: {}", channel, response.getJSONObject("result")); return; } if ("unsubscribe".equals(event)) { log.info("[WS] {} 取消订阅成功", channel); return; } if ("error".equals(event)) { JSONObject error = response.getJSONObject("error"); log.error("{} 频道错误: code={}, msg={}", log.error("[WS] {} 错误, code:{}, msg:{}", channel, error != null ? error.getInteger("code") : "N/A", error != null ? error.getString("message") : response.getString("msg")); } else if ("update".equals(event) || "all".equals(event)) { processPushDataV2(response); return; } if ("update".equals(event) || "all".equals(event)) { for (GateChannelHandler handler : channelHandlers) { if (handler.handleMessage(response)) return; } } } catch (Exception e) { log.error("处理WebSocket消息失败: {}", message, e); log.error("[WS] 处理消息失败: {}", message, e); } } /** * 解析并处理K线推送数据。 * 控制台输出K线数据,并在K线完结时触发策略和量化操作。 * * @param response 包含K线数据的 JSON 对象 */ private void processPushDataV2(JSONObject response) { try { /** * Gate K线推送格式: * { * "time": 1542162490, * "time_ms": 1542162490123, * "channel": "futures.candlesticks", * "event": "update", * "result": [ * { * "t": 1545129300, * "v": "27525555", * "c": "95.4", * "h": "96.9", * "l": "89.5", * "o": "94.3", * "n": "1m_BTC_USD", * "a": "314732.87412", * "w": false * } * ] * } */ String channel = response.getString("channel"); if (!CHANNEL.equals(channel)) { return; } // ---- heartbeat ---- JSONArray resultArray = response.getJSONArray("result"); if (resultArray == null || resultArray.isEmpty()) { log.warn("K线频道数据为空"); return; } JSONObject data = resultArray.getJSONObject(0); BigDecimal openPx = new BigDecimal(data.getString("o")); BigDecimal highPx = new BigDecimal(data.getString("h")); BigDecimal lowPx = new BigDecimal(data.getString("l")); BigDecimal closePx = new BigDecimal(data.getString("c")); BigDecimal vol = new BigDecimal(data.getString("v")); BigDecimal baseVol = new BigDecimal(data.getString("a")); String name = data.getString("n"); long t = data.getLong("t"); boolean windowClosed = data.getBooleanValue("w"); String time = DateUtil.TimeStampToDateTime(t); log.info("========== Gate K线数据 =========="); log.info("名称(n): " + name); log.info("时间 : " + time); log.info("开盘(o): " + openPx); log.info("最高(h): " + highPx); log.info("最低(l): " + lowPx); log.info("收盘(c): " + closePx); log.info("成交量(v): " + vol); log.info("成交额(a): " + baseVol); log.info("K线完结(w): " + windowClosed); log.info("=================================="); if (gridTradeService != null) { gridTradeService.onKline(closePx); } } catch (Exception e) { log.error("处理 K线频道推送数据失败", e); } } /** * 启动心跳检测任务。 * 使用 ScheduledExecutorService 定期检查是否需要发送 ping 请求来维持连接。 */ private void startHeartbeat() { if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) { heartbeatExecutor.shutdownNow(); } heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-kline-heartbeat"); t.setDaemon(true); return t; }); if (heartbeatExecutor != null && !heartbeatExecutor.isTerminated()) heartbeatExecutor.shutdownNow(); heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(r -> { Thread t = new Thread(r, "gate-ws-heartbeat"); t.setDaemon(true); return t; }); heartbeatExecutor.scheduleWithFixedDelay(this::checkHeartbeatTimeout, 25, 25, TimeUnit.SECONDS); } /** * 重置心跳计时器。 * 当收到新消息或发送 ping 后取消当前超时任务并重新安排下一次超时检查。 */ private synchronized void resetHeartbeatTimer() { cancelPongTimeout(); if (heartbeatExecutor != null && !heartbeatExecutor.isShutdown()) { pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); pongTimeoutFuture = heartbeatExecutor.schedule(this::checkHeartbeatTimeout, HEARTBEAT_TIMEOUT, TimeUnit.SECONDS); } } /** * 检查心跳超时情况。 * 若长时间未收到任何消息则主动发送 ping 请求保持连接活跃。 */ private void checkHeartbeatTimeout() { // 只有在连接状态下才检查心跳 if (!isConnected.get()) { return; } long currentTime = System.currentTimeMillis(); long lastTime = lastMessageTime.get(); if (currentTime - lastTime >= HEARTBEAT_TIMEOUT * 1000L) { sendPing(); } if (!isConnected.get()) return; if (System.currentTimeMillis() - lastMessageTime.get() >= HEARTBEAT_TIMEOUT * 1000L) sendPing(); } /** * 发送 ping 请求至 WebSocket 服务端。 * 用于维持长连接有效性。 */ private void sendPing() { try { if (webSocketClient != null && webSocketClient.isOpen()) { @@ -439,62 +287,29 @@ pingMsg.put("time", System.currentTimeMillis() / 1000); pingMsg.put("channel", FUTURES_PING); webSocketClient.send(pingMsg.toJSONString()); log.debug("发送futures.ping请求"); log.debug("[WS] 发送 ping 请求"); } } catch (Exception e) { log.warn("发送ping失败", e); } } catch (Exception e) { log.warn("[WS] 发送 ping 失败", e); } } /** * 取消当前的心跳超时任务。 * 在收到 pong 或其他有效消息时调用此方法避免不必要的断开重连。 */ private synchronized void cancelPongTimeout() { if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) { pongTimeoutFuture.cancel(true); } if (pongTimeoutFuture != null && !pongTimeoutFuture.isDone()) pongTimeoutFuture.cancel(true); } /** * 执行 WebSocket 重连操作。 * 在连接意外中断后尝试重新建立连接。 */ // ---- reconnect ---- private void reconnectWithBackoff() throws InterruptedException { int attempt = 0; int maxAttempts = 3; int attempt = 0, maxAttempts = 3; long delayMs = 5000; while (attempt < maxAttempts) { try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } try { Thread.sleep(delayMs); connect(); return; } catch (Exception e) { log.warn("[WS] 第{}次重连失败", attempt + 1, e); delayMs *= 2; attempt++; } } log.error("超过最大重试次数({})仍未连接成功", maxAttempts); log.error("[WS] 超过最大重试次数({}),放弃重连", maxAttempts); } /** * 优雅关闭线程池 */ private void shutdownExecutorGracefully(ExecutorService executor) { if (executor == null || executor.isTerminated()) { return; } try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } if (executor == null || executor.isTerminated()) return; try { executor.shutdown(); if (!executor.awaitTermination(5, TimeUnit.SECONDS)) executor.shutdownNow(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } } } src/main/java/com/xcong/excoin/modules/gateApi/GateTradeExecutor.java
New file @@ -0,0 +1,209 @@ package com.xcong.excoin.modules.gateApi; import io.gate.gateapi.ApiClient; import io.gate.gateapi.GateApiException; import io.gate.gateapi.api.FuturesApi; import io.gate.gateapi.models.FuturesInitialOrder; import io.gate.gateapi.models.FuturesOrder; import io.gate.gateapi.models.FuturesPriceTrigger; import io.gate.gateapi.models.FuturesPriceTriggeredOrder; import io.gate.gateapi.models.TriggerOrderResponse; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * Gate REST API 执行器。 * * <h3>设计目的</h3> * WebSocket 消息在回调线程中处理(如 {@code WebSocketClient} 的 {@code onMessage} 线程)。 * 下单 REST API 调用可能耗时数百毫秒,若同步执行会阻塞 WS 回调线程,导致心跳超时误判。 * 本类将所有 REST 调用提交到独立线程池异步执行。 * * <h3>回调设计</h3> * 每个下单方法接受 onSuccess/onFailure 两个 Runnable。 * 基底开仓时 onSuccess 用于标记基底已开,网格触发时通常为 null(成交状态由仓位推送驱动)。 * * <h3>线程模型</h3> * <ul> * <li><b>单线程</b>:保证下单顺序(开多→开空→止盈单),避免并发竞争</li> * <li><b>有界队列 64</b>:防止堆积。极端行情下最多累积 64 个任务</li> * <li><b>CallerRunsPolicy</b>:队列满时由提交线程直接同步执行,形成自然背压</li> * <li><b>allowCoreThreadTimeOut</b>:60s 空闲后线程回收,不浪费资源</li> * </ul> * * <h3>调用链</h3> * <pre> * GateGridTradeService.onKline → executor.openLong/openShort (基底双开 + 网格触发) * GateGridTradeService.onPositionUpdate → executor.placeTakeProfit (开仓成交后设止盈) * GateGridTradeService.stopGrid → executor.cancelAllPriceTriggeredOrders * </pre> * * @author Administrator */ @Slf4j public class GateTradeExecutor { private static final String SETTLE = "usdt"; private final FuturesApi futuresApi; private final String contract; /** 交易线程池:单线程 + 有界队列 + 背压策略 */ private final ExecutorService executor; public GateTradeExecutor(ApiClient apiClient, String contract) { this.futuresApi = new FuturesApi(apiClient); this.contract = contract; this.executor = new ThreadPoolExecutor( 1, 1, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(64), r -> { Thread t = new Thread(r, "gate-trade-worker"); t.setDaemon(true); return t; }, new ThreadPoolExecutor.CallerRunsPolicy() ); ((ThreadPoolExecutor) executor).allowCoreThreadTimeOut(true); } /** * 优雅关闭:等待 10 秒,超时则强制中断。 */ public void shutdown() { executor.shutdown(); try { executor.awaitTermination(10, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); executor.shutdownNow(); } } /** * 异步市价开多。quantity 为正数(如 "10")。 */ public void openLong(String quantity, Runnable onSuccess, Runnable onFailure) { openPosition(quantity, "t-grid-long", "开多", onSuccess, onFailure); } /** * 异步市价开空。quantity 为负数(如 "-10")。 */ public void openShort(String quantity, Runnable onSuccess, Runnable onFailure) { openPosition(quantity, "t-grid-short", "开空", onSuccess, onFailure); } private void openPosition(String size, String text, String label, Runnable onSuccess, Runnable onFailure) { executor.execute(() -> { try { FuturesOrder order = new FuturesOrder(); order.setContract(contract); order.setSize(size); order.setPrice("0"); order.setTif(FuturesOrder.TifEnum.IOC); order.setText(text); FuturesOrder result = futuresApi.createFuturesOrder(SETTLE, order, null); log.info("[TradeExec] {}成功, 价格:{}, id:{}", label, result.getFillPrice(), result.getId()); if (onSuccess != null) { onSuccess.run(); } } catch (Exception e) { log.error("[TradeExec] {}失败", label, e); if (onFailure != null) { onFailure.run(); } } }); } /** * 异步创建止盈条件单。 * <p>使用 Gate 的 PriceTriggeredOrder:服务器监控价格,达到触发价后自动平仓。 * 如果账户已有同方向同规则的条件单(label=UNIQUE),自动清除后重试一次。 * * @param triggerPrice 触发价格 * @param rule 触发规则(NUMBER_1: ≥ 触发价,NUMBER_2: ≤ 触发价) * @param orderType stop 类型(close-long-position / close-short-position) * @param autoSize 双仓平仓方向(close_long / close_short) */ public void placeTakeProfit(BigDecimal triggerPrice, FuturesPriceTrigger.RuleEnum rule, String orderType, String autoSize) { executor.execute(() -> { FuturesPriceTriggeredOrder order = buildTriggeredOrder(triggerPrice, rule, orderType, autoSize); try { TriggerOrderResponse response = futuresApi.createPriceTriggeredOrder(SETTLE, order); log.info("[TradeExec] 止盈单已创建, 触发价:{}, 类型:{}, id:{}", triggerPrice, orderType, response.getId()); } catch (GateApiException e) { if ("AUTO_USER_EXIST_POSITION_ORDER".equals(e.getErrorLabel())) { log.warn("[TradeExec] 止盈单已存在,清除旧单后重试"); try { futuresApi.cancelPriceTriggeredOrderList(SETTLE, contract); TriggerOrderResponse response = futuresApi.createPriceTriggeredOrder(SETTLE, order); log.info("[TradeExec] 止盈单重试成功, 触发价:{}, id:{}", triggerPrice, response.getId()); } catch (Exception retryEx) { log.error("[TradeExec] 止盈单重试失败", retryEx); } } else { log.error("[TradeExec] 止盈单创建失败, 触发价:{}", triggerPrice, e); } } catch (Exception e) { log.error("[TradeExec] 止盈单创建失败, 触发价:{}", triggerPrice, e); } }); } /** * 异步清除指定合约的所有止盈止损条件单。 */ public void cancelAllPriceTriggeredOrders() { executor.execute(() -> { try { futuresApi.cancelPriceTriggeredOrderList(SETTLE, contract); log.info("[TradeExec] 已清除所有止盈止损条件单"); } catch (Exception e) { log.error("[TradeExec] 清除止盈止损条件单失败", e); } }); } /** * 构建 FuturesPriceTriggeredOrder 对象。 * <p>策略=0(价格触发),price_type=0(最新价),expiration=0(永不过期), * tif=IOC(立即成交或取消),reduce_only=true(只减仓不开新仓)。 */ private FuturesPriceTriggeredOrder buildTriggeredOrder(BigDecimal triggerPrice, FuturesPriceTrigger.RuleEnum rule, String orderType, String autoSize) { FuturesPriceTrigger trigger = new FuturesPriceTrigger(); trigger.setStrategyType(FuturesPriceTrigger.StrategyTypeEnum.NUMBER_0); trigger.setPriceType(FuturesPriceTrigger.PriceTypeEnum.NUMBER_0); trigger.setPrice(triggerPrice.toString()); trigger.setRule(rule); trigger.setExpiration(0); FuturesInitialOrder initial = new FuturesInitialOrder(); initial.setContract(contract); initial.setSize(0L); initial.setPrice("0"); initial.setTif(FuturesInitialOrder.TifEnum.IOC); initial.setReduceOnly(true); initial.setAutoSize(autoSize); FuturesPriceTriggeredOrder order = new FuturesPriceTriggeredOrder(); order.setTrigger(trigger); order.setInitial(initial); order.setOrderType(orderType); return order; } } src/main/java/com/xcong/excoin/modules/gateApi/GateWebSocketClientMain.java
@@ -1,19 +1,21 @@ package com.xcong.excoin.modules.gateApi; import com.xcong.excoin.modules.okxNewPrice.OkxWebSocketClientManager; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Gate 网格交易的独立测试入口(main 方法)。 * 通过 Spring XML 上下文初始化管理器,运行一段时间后手动关闭。 * * @author Administrator */ public class GateWebSocketClientMain { public static void main(String[] args) throws InterruptedException { // 使用Spring上下文初始化管理器 ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); GateWebSocketClientManager manager = context.getBean(GateWebSocketClientManager.class); // 运行一段时间以观察结果 Thread.sleep(1200000000L); // 运行一小时 // 关闭连接 Thread.sleep(1200000000L); manager.destroy(); } } } src/main/java/com/xcong/excoin/modules/gateApi/GateWebSocketClientManager.java
@@ -1,10 +1,9 @@ package com.xcong.excoin.modules.gateApi; import com.xcong.excoin.modules.okxNewPrice.celue.CaoZuoService; import com.xcong.excoin.modules.okxNewPrice.okxWs.wanggeList.WangGeListService; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.modules.gateApi.wsHandler.handler.CandlestickChannelHandler; import com.xcong.excoin.modules.gateApi.wsHandler.handler.PositionClosesChannelHandler; import com.xcong.excoin.modules.gateApi.wsHandler.handler.PositionsChannelHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @@ -12,75 +11,90 @@ import java.math.BigDecimal; /** * 管理 Gate WebSocket 客户端和网格交易服务实例 * Gate 模块 Spring 入口,组装所有组件并管理生命周期。 * * <h3>启动流程 ({@code @PostConstruct})</h3> * <ol> * <li>构建 {@link GateConfig}(Builder 模式,含 API 密钥、合约、策略参数)</li> * <li>创建 {@link GateGridTradeService} → init():切持仓模式、清旧条件单、设杠杆</li> * <li>创建 {@link GateKlineWebSocketClient} → 注册 3 个 Handler → init():建立 WS 连接</li> * <li>gridTradeService.startGrid():激活策略,等待 K 线触发首次双开</li> * </ol> * * <h3>销毁流程 ({@code @PreDestroy})</h3> * <ol> * <li>gridTradeService.stopGrid():取消条件单 → 关闭交易线程池</li> * <li>wsClient.destroy():取消订阅 → 断开 WS → 关闭线程池</li> * </ol> * * <h3>配置</h3> * 当前在代码中硬编码测试网参数。切换到生产网只需改为 {@code .isProduction(true)}。 * * @author Administrator */ @Slf4j @Component public class GateWebSocketClientManager { @Autowired private CaoZuoService caoZuoService; @Autowired private WangGeListService wangGeListService; private GateKlineWebSocketClient klinePriceClient; /** WebSocket 连接管理器 */ private GateKlineWebSocketClient wsClient; /** 网格交易策略服务 */ private GateGridTradeService gridTradeService; private static final String API_KEY = "d90ca272391992b8e74f8f92cedb21ec"; private static final String API_SECRET = "1861e4f52de4bb53369ea3208d9ede38ece4777368030f96c77d27934c46c274"; /** 统一配置 */ private GateConfig config; @PostConstruct public void init() { log.info("开始初始化GateWebSocketClientManager"); log.info("[管理器] 开始初始化..."); try { gridTradeService = new GateGridTradeService( API_KEY, API_SECRET, "XAUT_USDT", "30", "cross", "dual", new BigDecimal("0.0035"), new BigDecimal("0.5"), 3, new BigDecimal("7.5"), "10" ); config = GateConfig.builder() .apiKey("d90ca272391992b8e74f8f92cedb21ec") .apiSecret("1861e4f52de4bb53369ea3208d9ede38ece4777368030f96c77d27934c46c274") .contract("ETH_USDT") .leverage("100") .marginMode("cross") .positionMode("dual") .gridRate(new BigDecimal("0.004")) .overallTp(new BigDecimal("0.5")) .maxLoss(new BigDecimal("7.5")) .quantity("1") .contractMultiplier(new BigDecimal("0.01")) .unrealizedPnlPriceMode(GateConfig.PnLPriceMode.LAST_PRICE) .isProduction(false) .reopenMaxRetries(3) .build(); gridTradeService = new GateGridTradeService(config); gridTradeService.init(); klinePriceClient = new GateKlineWebSocketClient(caoZuoService, this, wangGeListService,gridTradeService); klinePriceClient.init(); log.info("已初始化GateKlineWebSocketClient"); wsClient = new GateKlineWebSocketClient(config.getWsUrl()); wsClient.addChannelHandler(new CandlestickChannelHandler(config.getContract(), gridTradeService)); wsClient.addChannelHandler(new PositionsChannelHandler( config.getApiKey(), config.getApiSecret(), config.getContract(), gridTradeService)); wsClient.addChannelHandler(new PositionClosesChannelHandler( config.getApiKey(), config.getApiSecret(), config.getContract(), gridTradeService)); wsClient.init(); log.info("[管理器] WS已连接, 已注册 3 个频道处理器"); gridTradeService.startGrid(); } catch (Exception e) { log.error("初始化GateWebSocketClientManager失败", e); log.error("[管理器] 初始化失败", e); } } @PreDestroy public void destroy() { log.info("开始销毁GateWebSocketClientManager"); log.info("[管理器] 开始销毁..."); if (gridTradeService != null) { gridTradeService.stopGrid(); } if (klinePriceClient != null) { try { klinePriceClient.destroy(); log.info("已销毁GateKlineWebSocketClient"); } catch (Exception e) { log.error("销毁GateKlineWebSocketClient失败", e); } if (wsClient != null) { wsClient.destroy(); } log.info("GateWebSocketClientManager销毁完成"); log.info("[管理器] 销毁完成"); } public GateKlineWebSocketClient getKlineWebSocketClient() { return klinePriceClient; } public GateGridTradeService getGridTradeService() { return gridTradeService; } } public GateKlineWebSocketClient getKlineWebSocketClient() { return wsClient; } public GateGridTradeService getGridTradeService() { return gridTradeService; } } src/main/java/com/xcong/excoin/modules/gateApi/gateApi-logic.md
New file @@ -0,0 +1,403 @@ # Gate Api 模块 — 网格交易系统 ## 文件列表 | 文件 | 类型 | 说明 | |------|------|------| | [GateWebSocketClientManager](#gatewebsocketclientmanager) | `@Component` | Spring 启动入口,组装组件 + 生命周期 | | [GateConfig](#gateconfig) | 配置 | Builder 模式:API 密钥、合约、策略参数、环境切换 | | [GateKlineWebSocketClient](#gateklinewebsocketclient) | WS 连接管理 | 连接/心跳/重连/消息路由 | | [GateGridTradeService](#gategridtradeservice) | 交易服务 | 网格队列策略 + 保证金安全阀 + 盈亏管理 | | [GateTradeExecutor](#gatetradeexecutor) | 异步执行器 | 独立线程池执行 REST 下单,成功/失败双回调 | | [GateWebSocketClientMain](#gatewebsocketclientmain) | main 入口 | 独立测试启动 | | [Example.java](#examplejava) | 示例 | Gate SDK 用法参考 | ### wsHandler 子包 | 文件 | 类型 | 说明 | |------|------|------| | `wsHandler/GateChannelHandler.java` | **接口** | subscribe / unsubscribe / handleMessage / getChannelName | | `wsHandler/AbstractPrivateChannelHandler.java` | **抽象类** | 私有频道基类:HMAC-SHA512 签名 + 认证请求 | | `wsHandler/handler/CandlestickChannelHandler.java` | 公开频道 | K 线解析 → `onKline()` | | `wsHandler/handler/PositionsChannelHandler.java` | 私有频道 | 仓位推送 → `onPositionUpdate()`(传 `Position.ModeEnum`) | | `wsHandler/handler/PositionClosesChannelHandler.java` | 私有频道 | 平仓推送 → `onPositionClose()` | --- ## 架构总览 ``` ┌──────────────────────────────────────────────────────────────────┐ │ GateWebSocketClientManager │ │ (Spring @Component) │ │ │ │ GateConfig.builder() → GateGridTradeService + WS Client │ │ │ │ │ │ │ │ REST BasePath │ GateTradeExecutor │ WS URL │ │ ▼ ▼ ▼ │ │ Gate API (REST) 独立线程池 (async) Gate WebSocket │ │ onSuccess/onFailure双回调 ┌──────────────┐ │ │ │Candlestick H │ │ │ │Positions H │ │ │ │PosCloses H │ │ │ └──────────────┘ │ └──────────────────────────────────────────────────────────────────┘ ``` --- ## 数据流 ``` WebSocket → GateKlineWebSocketClient.handleMessage → 路由 dispatch │ ├─ futures.pong → cancelPongTimeout ├─ subscribe / unsubscribe / error → log │ ├─ futures.candlesticks (公开) │ └─ CandlestickChannelHandler │ └─ gridTradeService.onKline(closePx) │ ├─ 更新 unrealizedPnl(浮动盈亏) │ ├─ state=WAITING_KLINE → 异步双开基底仓位 │ └─ state=ACTIVE → processShortGrid/processLongGrid 网格触发 │ ├─ futures.positions (私有, HMAC-SHA512) │ └─ PositionsChannelHandler │ ├─ 解析 mode → Position.ModeEnum(DUAL_LONG / DUAL_SHORT) │ └─ gridTradeService.onPositionUpdate(contract, mode, size, entryPrice) │ ├─ 有仓位: 标记活跃,首次成交记录入场价并设止盈 │ │ ├─ 基底开仓 → 首次成交 → 记录基底入场价 → 双基底都成交后生成网格队列 │ │ └─ 网格开仓 → 成交后立即设止盈单 │ └─ 无仓位(size=0): 标记不活跃 │ ├─ futures.position_closes (私有, HMAC-SHA512) │ └─ PositionClosesChannelHandler │ └─ gridTradeService.onPositionClose(contract, side, pnl) │ └─ cumulativePnl += pnl → checkStopConditions() │ └─ 所有下单操作 └─ GateTradeExecutor (单线程 + 64队列 + CallerRunsPolicy) ├─ openLong/Short(qty, onSuccess, onFailure) └─ placeTakeProfit → 条件单 (REST) ``` --- ## GateChannelHandler 接口体系 ``` GateChannelHandler (接口) ├── CandlestickChannelHandler (公开频道) └── AbstractPrivateChannelHandler (私有频道基类: HMAC-SHA512) ├── PositionsChannelHandler (解析 mode → Position.ModeEnum) └── PositionClosesChannelHandler ``` - **subscribe**: 发送订阅请求。私有Handler自动附加 auth 字段 - **unsubscribe**: 发送取消订阅请求(私有频道也带签名认证) - **handleMessage**: 解析推送数据并回调GateGridTradeService,返回true表示已处理 - 消息路由: update/all事件 → 遍历channelHandlers → handler内部二次匹配channel名 → 匹配成功回调并停止遍历 - **PositionsChannelHandler 特殊处理**: 推送的 mode 字符串("dual_long")通过 `Position.ModeEnum.fromValue()` 转为枚举 --- ## 策略状态机 ``` WAITING_KLINE ──onKline──→ OPENING ──双基底都成交──→ ACTIVE │ │ │ │ 下单异常 ├─ 每根K线: 更新 unrealizedPnl │ │ ├─ cumPnl ≥ overallTp → STOPPED │ │ ├─ cumPnl ≤ -maxLoss → STOPPED │ │ ├─ 保证金超限 → 跳过开仓,队列照常更新 │ │ └─ 网格触发 → 开仓+设止盈+队列转移 ▼ ▼ STOPPED ←──────────────────┘ ``` | 状态 | 含义 | |------|------| | `WAITING_KLINE` | 等待首次K线价格 | | `OPENING` | 正在异步开基底多空仓位(已提交到GateTradeExecutor) | | `ACTIVE` | 网格队列激活,K线触发网格元素 → 开仓+止盈 | | `STOPPED` | 停止(盈利达标 / 亏损超限) | --- ## 策略核心:网格队列机制 ### 概述 策略采用"基底 + 价格网格队列"模式:先开一对基底多空仓位,然后以基底入场价为基准生成价格队列。每当K线穿破队列元素,就开新仓位并设止盈条件单,同时将穿破的元素转移到反方向队列。 ### 基底开仓 ``` K线到达 → 双开基底(市价开多 + 市价开空) → 成交回调: baseLongOpened=true, longActive=true → 成交回调: baseShortOpened=true, shortActive=true → 两者都成交 → generateShortQueue() + generateLongQueue() → state=ACTIVE ``` ### 网格队列生成 以基底入场价为基准,按 `gridRate`(百分比步长)生成 N 个价格(N = gridQueueSize,默认50): | 队列 | 计算方式 | 排序 | |------|---------|------| | 空仓队列 shortPriceQueue | 基底空入场价 × (1 − gridRate × i) (i=1..N) | 降序(大→小) | | 多仓队列 longPriceQueue | 基底多入场价 × (1 + gridRate × i) (i=1..N) | 升序(小→大) | ### K线触发网格 ``` K线到达(ACTIVE状态): │ ├─ processShortGrid: 当前价 < 空仓队列元素(价格跌破了队列中的高价) │ ├─ 匹配: 收集所有 > 当前价的空仓队列元素 │ ├─ 保证金检查: positionInitialMargin / initialPrincipal < marginRatioLimit(20%) │ │ ├─ 安全 → openShort 开空单(成交后仓位推送会自动设止盈) │ │ └─ 超限 → 跳过开仓,队列照常更新 │ ├─ 空仓队列: 移除匹配元素,尾部补充新价格(尾价 × (1 − gridRate)) │ └─ 多仓队列: 接收匹配元素,升序排列,截断到 gridQueueSize │ └─ processLongGrid: 当前价 > 多仓队列元素(价格涨超了队列中的低价) ├─ 匹配: 收集所有 < 当前价的多仓队列元素 ├─ 保证金检查: 同上 │ ├─ 安全 → openLong 开多单 │ └─ 超限 → 跳过开仓 ├─ 多仓队列: 移除匹配元素,尾部补充新价格(尾价 × (1 + gridRate)) └─ 空仓队列: 接收匹配元素,降序排列,截断到 gridQueueSize ``` ### 队列转移示意 ``` ETH_USDT, gridRate=0.0035, 基底空入场价=2275, 基底多入场价=2275: 初始状态: 空仓队列: [2267.1, 2270.0, 2272.5, 2275.0] (降序,base×0.9965, base×0.993...) 多仓队列: [2275.0, 2277.5, 2280.0, 2282.5] (升序,base×1.0035, base×1.007...) 价格跌到 2271 → processShortGrid 触发: 匹配: [2275.0, 2272.5](都 > 2271) 空仓队列: 移除[2275.0,2272.5] → [2270.0,2267.1] → 补充[2267.1×0.9965≈2259.1, 2259.1×0.9965≈2251.2] → [2270.0,2267.1,2259.1,2251.2] 多仓队列: 接收[2275.0,2272.5] → 合并后截断到4 → [2272.5,2275.0,2277.5,2280.0] ``` --- ## 策略时序 ### 阶段 1:启动与初始化 ``` Spring @PostConstruct → GateConfig.builder()...build() → GateGridTradeService(config) → init(): 1. 查用户ID(用于私有频道订阅) 2. 查账户 → 记录初始本金 initialPrincipal → 如需要切持仓模式 3. 清除旧止盈止损条件单 4. 查当前合约所有仓位 → 逐个市价平仓(reduce_only, IOC) - 单向持仓: size=相反数平仓 - 双向持仓: size=0, close=false, autoSize=LONG/SHORT 5. 设杠杆 → GateKlineWebSocketClient(config.getWsUrl()) → addChannelHandler x3 → init() → connect() → onOpen: handlers依次subscribe → sendPing → gridTradeService.startGrid() → state=WAITING_KLINE ``` ### 阶段 2:首次开仓 → 生成网格队列 ``` K线推送 → onKline(closePrice) → state=OPENING → executor.openLong(qty, onSuccess, onFailure) → 成交 → 仓位推送: DUAL_LONG, size>0, entryPrice=X → baseLongOpened=true, longBaseEntryPrice=X → tryGenerateQueues(): 双基底都成交? → 生成队列 → state=ACTIVE → executor.openShort(-qty, onSuccess, onFailure) → 成交 → 仓位推送: DUAL_SHORT, size<0, entryPrice=Y → baseShortOpened=true, shortBaseEntryPrice=Y → tryGenerateQueues(): 双基底都成交? → 生成队列 → state=ACTIVE ``` ### 阶段 3:ACTIVE 状态 — K线驱动网格 ``` 每根K线 → onKline → updateUnrealizedPnl → processShortGrid + processLongGrid 仓位推送(每次开仓成交后自动触发): → DUAL_LONG, size>0, 非基底 → 设多头止盈单 entryPrice × (1+gridRate) → DUAL_SHORT, size<0, 非基底 → 设空头止盈单 entryPrice × (1-gridRate) ``` > 止盈由 Gate 服务端条件单自动执行。服务端监控价格,达到触发价后自动平仓。 > 平仓后仓位变为0,盈亏通过 position_closes 频道推送到 cumulativePnl。 ### 阶段 4:停止 ``` 平仓推送: pnl=+0.6 → cumulativePnl=0.6 ≥ overallTp(0.5) → state=STOPPED 平仓推送: pnl=-8.0 → cumulativePnl=-8.0 ≤ -maxLoss(7.5) → state=STOPPED ``` --- ## GateConfig **角色**: 统一配置中心。Builder模式管理所有参数,提供 REST/WS URL 环境自动切换。 **核心方法**: - `getRestBasePath()`: isProduction ? 生产网 : 测试网 - `getWsUrl()`: 同上 **配置项**(含默认值): | 参数 | 默认值 | 说明 | |------|--------|------| | contract | BTC_USDT | 合约 | | leverage | 10 | 倍数 | | marginMode | cross | 全仓 | | positionMode | dual | 双向持仓 | | gridRate | 0.0035 | 网格间距 0.35% | | overallTp | 0.5 USDT | 整体止盈 | | maxLoss | 7.5 USDT | 最大亏损 | | quantity | 1 | 下单张数 | | reopenMaxRetries | 3 | 补仓最大重试次数(当前版本未使用) | | gridQueueSize | 50 | 网格价格队列容量 | | marginRatioLimit | 0.2 | 保证金占初始本金比例上限(20%),超限跳过开仓 | | contractMultiplier | 0.001 | 合约乘数(单张合约代表的基础资产数量) | | unrealizedPnlPriceMode | LAST_PRICE | 未实现盈亏计价模式:LAST_PRICE / MARK_PRICE | --- ## GateTradeExecutor **角色**: 独立线程池执行 REST API 下单。采用成功/失败双回调模式。 **线程模型**: - `ThreadPoolExecutor(1, 1, 60s, LinkedBlockingQueue(64), CallerRunsPolicy)` - 单线程保序 + 有界队列防堆积 + CallerRuns背压 - allowCoreThreadTimeOut: 60s 空闲后线程回收 **回调设计**: - 每个下单方法接受 `onSuccess` 和 `onFailure` 两个 `Runnable` - REST 调用成功 → 执行 `onSuccess`(标记基底已开等) - REST 调用失败 → 执行 `onFailure`(当前版本多为 null,依赖 position 推送修正) | 方法 | 说明 | |------|------| | `openLong(qty, onSuccess, onFailure)` | 异步 IOC 市价开多,双回调 | | `openShort(qty, onSuccess, onFailure)` | 异步 IOC 市价开空,双回调 | | `placeTakeProfit(trigger, rule, type, auto)` | 异步条件单。已存在则清除旧单重试 | | `cancelAllPriceTriggeredOrders()` | 清除所有条件单 | | `shutdown()` | 等待10秒,超时强制关闭 | --- ## GateGridTradeService **角色**: 策略核心,管理网格队列状态和执行下单。 **状态**: `StrategyState` enum: `WAITING_KLINE` / `OPENING` / `ACTIVE` / `STOPPED` **关键常量**: ```java private static final String AUTO_SIZE_LONG = "close_long"; private static final String AUTO_SIZE_SHORT = "close_short"; private static final String ORDER_TYPE_CLOSE_LONG = "close-long-position"; private static final String ORDER_TYPE_CLOSE_SHORT = "close-short-position"; ``` **核心数据结构**: | 字段 | 类型 | 说明 | |------|------|------| | shortPriceQueue | List\<BigDecimal\> | 空仓价格队列,降序(大→小),容量 gridQueueSize | | longPriceQueue | List\<BigDecimal\> | 多仓价格队列,升序(小→大),容量 gridQueueSize | | shortBaseEntryPrice | BigDecimal | 基底空头入场价 | | longBaseEntryPrice | BigDecimal | 基底多头入场价 | | shortEntryPrice | BigDecimal | 当前空仓入场价(推送更新) | | longEntryPrice | BigDecimal | 当前多仓入场价(推送更新) | | shortPositionSize | BigDecimal | 当前空仓持仓量(绝对值) | | longPositionSize | BigDecimal | 当前多仓持仓量 | | baseLongOpened | boolean | 基底多头是否已开 | | baseShortOpened | boolean | 基底空头是否已开 | | longActive / shortActive | boolean | 多/空方向是否持有仓位 | | cumulativePnl | BigDecimal | 累计已实现盈亏(平仓推送驱动) | | unrealizedPnl | BigDecimal | 未实现盈亏(每根K线更新,浮动盈亏) | | markPrice | BigDecimal | 标记价格(外部注入,MARK_PRICE 模式使用) | | initialPrincipal | BigDecimal | 初始本金(启动时账户总资产) | **回调方法**: - `onKline(closePrice)`: 更新 lastKlinePrice → 计算 unrealizedPnl → WAITING_KLINE 时触发基底双开,ACTIVE 时驱动 processShortGrid+processLongGrid - `onPositionUpdate(contract, mode, size, entryPrice)`: 记录当前入场价和持仓量 → 有仓位时标记活跃+设止盈,无仓位时清空持仓量并标记不活跃 - `onPositionClose(contract, side, pnl)`: 累加已实现盈亏,检查停止条件 **未实现盈亏计算** (`updateUnrealizedPnl()`): 正向合约公式(含合约乘数): | 方向 | 公式 | |------|------| | 多仓 | 持仓量 × contractMultiplier × (计价价格 − 开仓均价) | | 空仓 | 持仓量(绝对值)× contractMultiplier × (开仓均价 − 计价价格) | 计价价格由 `unrealizedPnlPriceMode` 决定: - `LAST_PRICE`:使用最新成交价(`lastKlinePrice`,每根 K 线更新) - `MARK_PRICE`:使用标记价格(通过 `setMarkPrice()` 外部注入,如未注入则回退到最新成交价) 入场价和持仓量由 `onPositionUpdate` 实时推送更新。 **保证金安全阀** (`isMarginSafe()`): - 实时查询 `positionInitialMargin / initialPrincipal` - 比例 ≥ marginRatioLimit(默认20%)→ 跳过开仓,队列照常更新 - REST 查询失败 → 默认放行(避免因查询异常阻塞策略) **止盈计算**: | 方向 | 公式 | order_type | auto_size | rule | |------|------|------------|-----------|------| | 多头 TP | entry × (1+gridRate) | `close-long-position` | `close_long` | NUMBER_1(≥触发价) | | 空头 TP | entry × (1-gridRate) | `close-short-position` | `close_short` | NUMBER_2(≤触发价) | **REST API 调用**: | 操作 | API | 方法 | 说明 | |------|-----|------|------| | 获取用户ID | `GET /account/detail` | `AccountApi.getAccountDetail()` | | | 切持仓模式 | `POST /futures/usdt/set_position_mode` | `FuturesApi.setPositionMode()` | | | 查仓位 | `GET /futures/usdt/positions` | `FuturesApi.listPositions()` | 遍历所有仓位,按合约过滤 | | 市价平仓 | `POST /futures/usdt/orders` | `FuturesApi.createFuturesOrder()` | reduce_only, IOC。双向: size=0+close=false+autoSize | | 设杠杆 | `POST /futures/usdt/dual_comp/positions/{contract}/leverage` | `FuturesApi.updateDualModePositionLeverageCall()` | 双向模式专用 | | 查账户 | `GET /futures/usdt/accounts` | `FuturesApi.listFuturesAccounts()` | 获取初始本金和保证金 | | 清除条件单 | `DELETE /futures/usdt/price_orders` | `FuturesApi.cancelPriceTriggeredOrderList()` | | | 市价单 | `POST /futures/usdt/orders` | `FuturesApi.createFuturesOrder()` | price=0, tif=IOC | | 条件单 | `POST /futures/usdt/price_orders` | `FuturesApi.createPriceTriggeredOrder()` | strategy=0, price_type=0, expiration=0 | **初始化顺序** (`init()`): ``` 1. 获取用户 ID 2. 查账户 → 记录初始本金 → 如需要切持仓模式 3. 清除旧的止盈止损条件单 4. 查当前合约所有仓位 → 逐个市价平仓 - 单向持仓(single): size=相反数, reduce_only=true - 双向持仓(dual): size=0, close=false, autoSize=LONG/SHORT, reduce_only=true 5. 设杠杆 6. 打印账户余额 ``` --- ## GateWebSocketClientMain 独立 `main()` 方法入口,通过 Spring XML 上下文启动,运行后手动关闭。 --- ## Example.java Gate SDK 使用示例,展示 `FuturesApi` 的基本用法。仅作参考。 src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/AbstractPrivateChannelHandler.java
New file @@ -0,0 +1,146 @@ package com.xcong.excoin.modules.gateApi.wsHandler; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.gateApi.GateGridTradeService; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.nio.charset.StandardCharsets; /** * 私有频道处理器的抽象基类。 * * <h3>封装内容</h3> * <ul> * <li>HMAC-SHA512 签名计算(UTF-8 编码)</li> * <li>认证请求 JSON 构建(id/time/channel/payload/auth)</li> * <li>subscribe / unsubscribe 的默认实现(含签名)</li> * <li>用户 ID 获取(从 {@link GateGridTradeService#getUserId()})</li> * </ul> * * <h3>签名算法</h3> * {@code SIGN = Hex(HmacSHA512(secret_utf8, "channel={channel}&event={event}&time={timeSec}"_utf8))} * * <h3>子类</h3> * {@link com.xcong.excoin.modules.gateApi.wsHandler.handler.PositionsChannelHandler}、 * {@link com.xcong.excoin.modules.gateApi.wsHandler.handler.PositionClosesChannelHandler} * * @author Administrator */ @Slf4j public abstract class AbstractPrivateChannelHandler implements GateChannelHandler { private static final char[] HEX_ARRAY = "0123456789abcdef".toCharArray(); private final String channelName; private final String apiKey; private final String apiSecret; private final String contract; private final GateGridTradeService gridTradeService; public AbstractPrivateChannelHandler(String channelName, String apiKey, String apiSecret, String contract, GateGridTradeService gridTradeService) { this.channelName = channelName; this.apiKey = apiKey; this.apiSecret = apiSecret; this.contract = contract; this.gridTradeService = gridTradeService; } @Override public String getChannelName() { return channelName; } /** * 发送带签名的订阅请求。 * payload: [userId, contract],auth: {method:"api_key", KEY, SIGN} */ @Override public void subscribe(WebSocketClient ws) { long timeSec = System.currentTimeMillis() / 1000; JSONObject msg = buildAuthRequest("subscribe", buildUid(), timeSec); ws.send(msg.toJSONString()); log.info("[{}] 订阅成功, 合约:{}", channelName, contract); } /** * 发送带签名的取消订阅请求,与 subscribe 对称。 */ @Override public void unsubscribe(WebSocketClient ws) { long timeSec = System.currentTimeMillis() / 1000; JSONObject msg = new JSONObject(); msg.put("id", timeSec * 1000000 + (System.currentTimeMillis() % 1000)); msg.put("time", timeSec); msg.put("channel", channelName); msg.put("event", "unsubscribe"); JSONArray payload = new JSONArray(); payload.add(contract); msg.put("payload", payload); JSONObject auth = new JSONObject(); auth.put("method", "api_key"); auth.put("KEY", apiKey); auth.put("SIGN", hs512Sign("unsubscribe", timeSec)); msg.put("auth", auth); ws.send(msg.toJSONString()); log.info("[{}] 取消订阅成功, 合约:{}", channelName, contract); } protected GateGridTradeService getGridTradeService() { return gridTradeService; } protected String getContract() { return contract; } /** * 从策略服务获取用户 ID,用于私有频道订阅的 payload[0]。 */ private String buildUid() { return gridTradeService != null && gridTradeService.getUserId() != null ? String.valueOf(gridTradeService.getUserId()) : ""; } /** * 构建认证请求 JSON。包含 id、time、channel、event、payload[auth_user_id, contract]、auth 字段。 */ private JSONObject buildAuthRequest(String event, String uid, long timeSec) { JSONObject msg = new JSONObject(); msg.put("id", timeSec * 1000000 + (System.currentTimeMillis() % 1000)); msg.put("time", timeSec); msg.put("channel", channelName); msg.put("event", event); JSONArray payload = new JSONArray(); payload.add(uid); payload.add(contract); msg.put("payload", payload); JSONObject auth = new JSONObject(); auth.put("method", "api_key"); auth.put("KEY", apiKey); auth.put("SIGN", hs512Sign(event, timeSec)); msg.put("auth", auth); return msg; } /** * HMAC-SHA512 签名,使用 UTF-8 编码。 */ private String hs512Sign(String event, long timeSec) { try { String message = "channel=" + channelName + "&event=" + event + "&time=" + timeSec; Mac mac = Mac.getInstance("HmacSHA512"); SecretKeySpec spec = new SecretKeySpec(apiSecret.getBytes(StandardCharsets.UTF_8), "HmacSHA512"); mac.init(spec); byte[] hash = mac.doFinal(message.getBytes(StandardCharsets.UTF_8)); StringBuilder hex = new StringBuilder(hash.length * 2); for (byte b : hash) { hex.append(HEX_ARRAY[(b >> 4) & 0xF]); hex.append(HEX_ARRAY[b & 0xF]); } return hex.toString(); } catch (Exception e) { log.error("[{}] 签名计算失败", channelName, e); return ""; } } } src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/GateChannelHandler.java
New file @@ -0,0 +1,44 @@ package com.xcong.excoin.modules.gateApi.wsHandler; import com.alibaba.fastjson.JSONObject; import org.java_websocket.client.WebSocketClient; /** * WebSocket 频道处理器接口。 * * <p>每个 Gate 频道对应一个实现类。新增频道只需实现此接口, * 然后通过 {@code GateKlineWebSocketClient.addChannelHandler()} 注册即可。 * * <h3>实现类</h3> * <ul> * <li>{@code CandlestickChannelHandler} — 公开频道,K 线数据</li> * <li>{@code AbstractPrivateChannelHandler} — 私有频道抽象基类(签名+认证)</li> * <li>{@code PositionsChannelHandler} — 私有频道,仓位更新</li> * <li>{@code PositionClosesChannelHandler} — 私有频道,平仓推送</li> * </ul> * * <h3>路由机制</h3> * {@code handleMessage()} 返回 {@code true} 表示消息已被该 handler 处理, * 路由循环会停止遍历。返回 {@code false} 表示不匹配(channel 名不相等)。 * * @author Administrator */ public interface GateChannelHandler { /** 频道名称,如 {@code "futures.candlesticks"} */ String getChannelName(); /** 发送订阅请求 */ void subscribe(WebSocketClient ws); /** 发送取消订阅请求 */ void unsubscribe(WebSocketClient ws); /** * 处理频道推送消息。 * * @param response WebSocket 推送的完整 JSON * @return true 表示已处理(循环停止),false 表示频道不匹配(继续遍历下一个 handler) */ boolean handleMessage(JSONObject response); } src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/CandlestickChannelHandler.java
New file @@ -0,0 +1,102 @@ package com.xcong.excoin.modules.gateApi.wsHandler.handler; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.blackchain.service.DateUtil; import com.xcong.excoin.modules.gateApi.GateGridTradeService; import com.xcong.excoin.modules.gateApi.wsHandler.GateChannelHandler; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import java.math.BigDecimal; /** * K 线(Candlestick)频道处理器。 * * <h3>特点</h3> * 公开频道,无需 HMAC-SHA512 认证签名。订阅 1m 周期的 K 线数据。 * * <h3>数据流</h3> * <pre> * WebSocket 推送 update event * → handleMessage() → 解析 OHLCV → log 打印 → gridTradeService.onKline(closePx) * → WAITING_KLINE: 首次 K 线触发基底双开 * → ACTIVE: 驱动 processShortGrid + processLongGrid 网格触发 * </pre> * * <h3>订阅格式</h3> * payload: {@code ["1m", contract]} * * @author Administrator */ @Slf4j public class CandlestickChannelHandler implements GateChannelHandler { private static final String CHANNEL_NAME = "futures.candlesticks"; private static final String INTERVAL = "1m"; private final String contract; private final GateGridTradeService gridTradeService; public CandlestickChannelHandler(String contract, GateGridTradeService gridTradeService) { this.contract = contract; this.gridTradeService = gridTradeService; } @Override public String getChannelName() { return CHANNEL_NAME; } @Override public void subscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("time", System.currentTimeMillis() / 1000); msg.put("channel", CHANNEL_NAME); msg.put("event", "subscribe"); JSONArray payload = new JSONArray(); payload.add(INTERVAL); payload.add(contract); msg.put("payload", payload); ws.send(msg.toJSONString()); log.info("[{}] 订阅成功, 合约:{}, 周期:{}", CHANNEL_NAME, contract, INTERVAL); } @Override public void unsubscribe(WebSocketClient ws) { JSONObject msg = new JSONObject(); msg.put("time", System.currentTimeMillis() / 1000); msg.put("channel", CHANNEL_NAME); msg.put("event", "unsubscribe"); JSONArray payload = new JSONArray(); payload.add(INTERVAL); payload.add(contract); msg.put("payload", payload); ws.send(msg.toJSONString()); log.info("[{}] 取消订阅成功", CHANNEL_NAME); } @Override public boolean handleMessage(JSONObject response) { if (!CHANNEL_NAME.equals(response.getString("channel"))) { return false; } try { JSONArray resultArray = response.getJSONArray("result"); if (resultArray == null || resultArray.isEmpty()) { log.warn("[{}] 数据为空", CHANNEL_NAME); return true; } JSONObject data = resultArray.getJSONObject(0); BigDecimal closePx = new BigDecimal(data.getString("c")); log.info("========== Gate K线数据 =========="); log.info("名称: {} 时间: {}", data.getString("n"), DateUtil.TimeStampToDateTime(data.getLong("t"))); log.info("开盘: {} 最高: {} 最低: {} 收盘: {} 成交量: {} 成交额: {} 已完结: {}", data.getString("o"), data.getString("h"), data.getString("l"), data.getString("c"), data.getString("v"), data.getString("a"), data.getBooleanValue("w")); log.info("=================================="); if (gridTradeService != null) { gridTradeService.onKline(closePx); } } catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); } return true; } } src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/PositionClosesChannelHandler.java
New file @@ -0,0 +1,63 @@ package com.xcong.excoin.modules.gateApi.wsHandler.handler; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.gateApi.GateGridTradeService; import com.xcong.excoin.modules.gateApi.wsHandler.AbstractPrivateChannelHandler; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; /** * 平仓频道处理器。 * * <h3>数据用途</h3> * 每笔平仓发生时推送 pnl(盈亏金额),累加到 {@code cumulativePnl} 用于判断策略停止条件: * cumulativePnl ≥ overallTp(达到止盈目标)或 ≤ -maxLoss(超过亏损上限)。 * 止盈由 Gate 服务端条件单自动触发,平仓后仓位变为 0,盈亏通过本频道推送。 * * <h3>推送字段</h3> * contract, side(long / short), pnl(该次平仓的盈亏,如 "+0.2" 或 "-0.1") * * <h3>注意</h3> * 平仓盈亏来自服务器端,不受本地计算误差影响。这是策略停止的唯一盈亏判断来源。 * * @author Administrator */ @Slf4j public class PositionClosesChannelHandler extends AbstractPrivateChannelHandler { private static final String CHANNEL_NAME = "futures.position_closes"; public PositionClosesChannelHandler(String apiKey, String apiSecret, String contract, GateGridTradeService gridTradeService) { super(CHANNEL_NAME, apiKey, apiSecret, contract, gridTradeService); } @Override public boolean handleMessage(JSONObject response) { if (!CHANNEL_NAME.equals(response.getString("channel"))) { return false; } try { JSONArray resultArray = response.getJSONArray("result"); if (resultArray == null || resultArray.isEmpty()) { return true; } for (int i = 0; i < resultArray.size(); i++) { JSONObject item = resultArray.getJSONObject(i); if (!getContract().equals(item.getString("contract"))) { continue; } BigDecimal pnl = new BigDecimal(item.getString("pnl")); String side = item.getString("side"); log.info("[{}] 平仓更新, 方向:{}, 盈亏:{}", CHANNEL_NAME, side, pnl); if (getGridTradeService() != null) { getGridTradeService().onPositionClose(getContract(), side, pnl); } } } catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); } return true; } } src/main/java/com/xcong/excoin/modules/gateApi/wsHandler/handler/PositionsChannelHandler.java
New file @@ -0,0 +1,68 @@ package com.xcong.excoin.modules.gateApi.wsHandler.handler; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.gateApi.GateGridTradeService; import com.xcong.excoin.modules.gateApi.wsHandler.AbstractPrivateChannelHandler; import io.gate.gateapi.models.Position; import lombok.extern.slf4j.Slf4j; import java.math.BigDecimal; /** * 仓位频道处理器。 * * <h3>数据用途</h3> * 监控仓位数量(size)和入场价(entry_price)。 * 有仓位时(size.abs > 0):标记方向活跃,记录入场价 → 基底首次成交记录基底入场价并等待生成网格队列, * 非基底成交立即设止盈条件单。无仓位时(size=0):标记方向不活跃。 * * <h3>推送字段</h3> * contract, mode(dual_long / dual_short), size(正=多头,负=空头),entry_price * * <h3>注意</h3> * 双向持仓模式下空头 size 为负数,使用 {@code size.abs()} 判断是否有仓位。 * 累计盈亏不由本频道计算,而是由 {@link PositionClosesChannelHandler} 独立处理。 * 止盈条件单由服务端自动触发平仓,本频道不负责开仓操作。 * * @author Administrator */ @Slf4j public class PositionsChannelHandler extends AbstractPrivateChannelHandler { private static final String CHANNEL_NAME = "futures.positions"; public PositionsChannelHandler(String apiKey, String apiSecret, String contract, GateGridTradeService gridTradeService) { super(CHANNEL_NAME, apiKey, apiSecret, contract, gridTradeService); } @Override public boolean handleMessage(JSONObject response) { if (!CHANNEL_NAME.equals(response.getString("channel"))) { return false; } try { JSONArray resultArray = response.getJSONArray("result"); if (resultArray == null || resultArray.isEmpty()) { return true; } for (int i = 0; i < resultArray.size(); i++) { JSONObject pos = resultArray.getJSONObject(i); if (!getContract().equals(pos.getString("contract"))) { continue; } String modeStr = pos.getString("mode"); Position.ModeEnum mode = Position.ModeEnum.fromValue(modeStr); BigDecimal size = new BigDecimal(pos.getString("size")); BigDecimal entryPrice = new BigDecimal(pos.getString("entry_price")); log.info("[{}] 持仓更新, 模式:{}, 数量:{}, 入场价:{}", CHANNEL_NAME, modeStr, size, entryPrice); if (getGridTradeService() != null) { getGridTradeService().onPositionUpdate(getContract(), mode, size, entryPrice); } } } catch (Exception e) { log.error("[{}] 处理数据失败", CHANNEL_NAME, e); } return true; } }