┌──────────────────────────────────────────────────────────────┐
│ OkxWebSocketClientManager (Spring @Component) │
│ · 读取配置 · 组装所有组件 · 启动 WS 连接 · 生命周期管理 │
└──────────────────────┬───────────────────────────────────────┘
│
┌─────────────┼─────────────┐
▼ ▼ ▼
OkxConfig OkxGridTradeService OkxKlineWebSocketClient
(Builder配置) (策略核心) (WS连接客户端)
│ │
│ ┌────────┴────────┐
│ │ 4 个频道处理器 │
│ ├─────────────────┤
│ │ K线 | 持仓 │
│ │ 账户 | 订单成交 │
│ └────────┬────────┘
│ │
▼ │
OkxTradeExecutor │
(异步下单线程池) ◄────────────────┘
│
▼
通过 WS 发送下单 JSON
核心数据流:
K线推送 → OkxGridTradeService.onKline() → 匹配网格队列 → OkxTradeExecutor 异步下单
持仓推送 → OkxGridTradeService.onPositionUpdate() → 识别基底成交 → 设置止盈单 → 队列就绪 → 激活策略
订单推送 → OkxGridTradeService.onOrderFilled() → 累计盈亏跟踪 → 达标/超限自动停止
设计原则:
- 包自包含:okxApi 包不依赖任何其他业务包(okxNewPrice、gateApi、newPrice、blackchain 等)
- WS 回调不阻塞:所有下单操作通过 OkxTradeExecutor 单线程池异步执行
- 状态机驱动:策略状态(WAITING_KLINE → OPENING → ACTIVE → STOPPED)严格控制执行流程
文件:OkxConfig.java
使用 Builder 模式构造配置对象,不可变设计,所有字段 private final。
| 分组 | 字段 | 说明 |
|---|---|---|
| API 密钥 | apiKey |
OKX API Key |
secretKey |
OKX Secret Key | |
passphrase |
OKX Passphrase | |
| 合约参数 | contract |
合约品种(如 BTC-USDT-SWAP) |
marginMode |
保证金模式(cross 全仓 / isolated 逐仓) |
|
tickSz |
价格精度 | |
contractMultiplier |
合约乘数(用于盈亏计算) | |
leverage |
杠杆倍数 | |
| 策略参数 | quantity |
每次开仓张数 |
gridRate |
网格间距比例(如 0.01 = 1%) | |
gridQueueSize |
网格队列长度 | |
marginRatioLimit |
保证金占用比例上限 | |
overallTp |
全局止盈目标(累计盈亏 ≥ 此值停止) | |
maxLoss |
最大亏损限制(累计盈亏 ≤ 此值停止) | |
| 环境 | isProduction |
是否生产环境(决定 WS URL 域名) |
OkxConfig config = OkxConfig.builder()
.apiKey("xxx")
.secretKey("xxx")
.passphrase("xxx")
.contract("BTC-USDT-SWAP")
.marginMode("cross")
.leverage(1)
.quantity("1")
.gridRate(new BigDecimal("0.01"))
.gridQueueSize(10)
.overallTp(new BigDecimal("100"))
.isProduction(false)
.build();
文件:enums/OkxEnums.java
集中管理所有 OKX API 相关字符串常量,替代外部 CoinEnums 依赖。
| 常量 | 值 | 用途 |
|---|---|---|
INSTTYPE_SPOT |
SPOT |
现货 |
INSTTYPE_SWAP |
SWAP |
永续合约 |
POSSIDE_LONG |
long |
多仓方向 |
POSSIDE_SHORT |
short |
空仓方向 |
SIDE_BUY |
buy |
买入 |
SIDE_SELL |
sell |
卖出 |
ORDTYPE_MARKET |
market |
市价单 |
ORDTYPE_LIMIT |
limit |
限价单 |
CHANNEL_POSITIONS |
positions |
持仓频道 |
CHANNEL_CANDLE |
candle + 周期 |
K线频道 |
CHANNEL_ACCOUNT |
account |
账户频道 |
CHANNEL_ORDERS |
orders |
订单频道 |
CHANNEL_ORDERS_ALGO |
orders-algo |
策略委托频道 |
文件:OkxWsUtil.java
替代外部 SSLConfig、SignUtils、WsParamBuild、DateUtil 等依赖,提供以下静态方法:
| 方法 | 用途 |
|---|---|
configureSSL(wsClient) |
为 WebSocketClient 配置 SSL(跳过证书验证,仅测试环境) |
generateSignature(timestamp, method, requestPath, body, secretKey) |
OKX 签名算法:HMAC-SHA256 + Base64 |
getOrderNum(side) |
生成唯一订单 ID(时间戳 + 随机数 + side) |
timestampToDateTime(timestamp) |
毫秒时间戳 → yyyy/MM/dd HH:mm:ss 格式 |
timestampToDateToString(timestamp) |
毫秒时间戳 → yyyy/MM/dd 格式 |
buildJsonObject(connId, channel, args) |
构建 WS 请求 JSON 对象 |
buildLoginParam(okxConfig) |
构建登录认证参数(sign + timestamp) |
签名算法: sign = Base64(HMAC-SHA256(timestamp + "GET" + requestPath + body, secretKey))
文件:param/TradeRequestParam.java
纯 POJO,替代外部 TradeRequestParam 依赖。
| 字段 | 说明 |
|---|---|
accountName |
账户标识 |
instId |
合约 ID |
tdMode |
保证金模式(cross/isolated) |
posSide |
持仓方向(long/short) |
ordType |
订单类型(market/limit) |
side |
买卖方向(buy/sell) |
clOrdId |
客户端订单 ID(唯一) |
sz |
下单数量 |
markPx |
标记价格(限价单用) |
tradeType |
交易类型(1=开仓,3=平仓) |
文件:OkxWebSocketClientManager.java
Spring @Component,管理完整的 WS 生命周期。
OkxConfig → OkxGridTradeService → OkxTradeExecutor(注入 WS Client)→ 4 个频道处理器 → OkxKlineWebSocketClient@PostConstruct init():初始化和连接(生产环境)或注册 MBean(测试环境)@PreDestroy close():优雅关闭(停止策略 → 取消条件单 → 关闭 WS)init()
├── configMap — 从本地缓存读取账户配置
├── OkxGridTradeService.startGrid()
├── OkxTradeExecutor.setWebSocketClient(wsClient)
├── 创建 4 个频道处理器
├── OkxKlineWebSocketClient.connect()
│ ├── 连接 OSKL 公开 WS(K线频道不需要登录)
│ ├── 登录私有 WS → 订阅 持仓/账户/订单/策略委托频道
│ └── 启动心跳定时器(30s ping/pong)
└── isProduction ? 直接启动 : MBean 注册(JMX 手动控制)
文件:OkxKlineWebSocketClient.java
封装 java-websocket 客户端,管理物理连接。
wss://ws.okx.com:8443/ws/v5/public 或模拟盘域名):K线推送不需要登录wss://ws.okx.com:8443/ws/v5/private 或模拟盘域名):需要登录认证,订阅持仓/账户/订单/策略委托频道connect()
├── 1. 创建公开 WS Client → connect()
│ └── onOpen → subscribePublicChannels() → 订阅 K线频道
├── 2. 创建私有 WS Client → connect()
│ └── onOpen → login() → onLoginSuccess → subscribePrivateChannels()
│ ├── 订阅 positions 频道
│ ├── 订阅 account 频道
│ ├── 订阅 orders 频道
│ └── 订阅 orders-algo 频道
└── 3. 启动心跳定时器(30s 间隔 ping/pong,60s 超时检测)
MAX_RECONNECT_ATTEMPTS 次(默认 30)RECONNECT_DELAY_MS(默认 5000ms)onMessage → 遍历所有 OkxChannelHandler → 调用 handleMessage(response) 分发到具体处理器
文件:wsHandler/OkxChannelHandler.java
统一接口,所有频道处理器实现此接口:
public interface OkxChannelHandler {
String getChannelName(); // 频道名称
void subscribe(WebSocketClient ws); // 订阅
void unsubscribe(WebSocketClient ws); // 取消订阅
boolean handleMessage(JSONObject response); // 处理推送消息
}
文件:wsHandler/handler/OkxCandlestickChannelHandler.java
| 参数 | 值 |
|---|---|
channel |
candle{period}(如 candle1H) |
instId |
合约 ID(如 BTC-USDT-SWAP) |
data[0] → [ts, o, h, l, c, vol, volCcy, ...]c → 调用 gridTradeService.onKline(closePrice)candle1H 时打印整点日志onKline(closePrice)
├── WAITING_KLINE → 进入 OPENING 状态,开基底多+空
├── ACTIVE → processShortGrid(closePrice) + processLongGrid(closePrice)
└── STOPPED → 仅更新未实现盈亏
文件:wsHandler/handler/OkxPositionsChannelHandler.java
| 参数 | 值 |
|---|---|
channel |
positions |
instType |
SWAP |
instId |
合约 ID |
解析 data[] 数组 → 提取 posSide、pos(数量)、avgPx(均价)→ 调用 gridTradeService.onPositionUpdate(posSide, size, avgPx)
onPositionUpdate() → 区分 3 种场景:
1. 仓位从无到有(基底开仓成交)→ 标记 baseOpened → 双基底都成后生成网格队列
2. 仓位量增加(网格触发开仓成交)→ 取队列首元素做止盈价 → 设止盈条件单
3. 仓位归零(止盈平仓完成)→ 标记 active=false
文件:wsHandler/handler/OkxAccountChannelHandler.java
| 参数 | 值 |
|---|---|
channel |
account |
解析 data[] → 提取 availBal(可用余额)、cashBal(现金余额)、eq(权益)、upl(未实现盈亏)、imr(保证金占用)
当前版本:仅做日志输出,不做业务判断。后续可扩展保证金安全阀功能。
文件:wsHandler/handler/OkxOrderInfoChannelHandler.java
| 参数 | 值 |
|---|---|
channel |
orders |
instType |
SWAP |
instId |
合约 ID |
state=filled 且 accFillSz>0 的订单posSide、accFillSz(成交数量)、fillPnl(已实现盈亏)gridTradeService.onOrderFilled(posSide, accFillSz, fillPnl)onOrderFilled()
├── 累计盈亏 += fillPnl
├── 累计盈亏 ≥ overallTp → 策略停止(止盈达标)
└── 累计盈亏 ≤ -maxLoss → 策略停止(亏损超限)
文件:OkxTradeExecutor.java
WS 消息在回调线程处理,下单操作提交到**独立线程池异步执行**,避免阻塞 WS 回调线程。
| 参数 | 值 |
|---|---|
| 核心线程 | 1 |
| 最大线程 | 1 |
| 空闲超时 | 60s(allowCoreThreadTimeOut) |
| 队列类型 | LinkedBlockingQueue |
| 队列容量 | 64 |
| 拒绝策略 | CallerRunsPolicy(队列满时由提交线程直接同步执行,形成自然背压) |
| 守护线程 | 是 |
单线程的作用:保证下单顺序(开多 → 开空 → 止盈单),避免并发竞争。
| 方法 | 说明 |
|---|---|
openLong(quantity, onSuccess, onFailure) |
异步市价开多 |
openShort(quantity, onSuccess, onFailure) |
异步市价开空 |
placeTakeProfit(triggerPrice, orderType, size) |
异步创建止盈条件单(通过 batch-orders 发送 algo 委托) |
cancelAllPriceTriggeredOrders() |
撤销所有条件单(cancel-algos) |
setWebSocketClient(wsClient) |
注入 WS 客户端引用 |
shutdown() |
优雅关闭(等待 10s,超时强制中断) |
placeTakeProfit()
├── 成功 → 发送 batch-orders(algo 条件单)
└── 失败 → marketClose() 立即市价平仓兜底
所有下单通过 WebSocket 发送 JSON(非 REST API),sendOrder 构建如下消息:
{
"id": "order_1712345678901_a1b2c3",
"op": "order",
"args": [{
"instId": "BTC-USDT-SWAP",
"tdMode": "cross",
"clOrdId": "buy_1712345678901_d4e5f6",
"side": "buy",
"posSide": "long",
"ordType": "market",
"sz": "1"
}]
}
OkxGridTradeService.onKline
└── executor.openLong() / openShort() ← 基底双开 + 网格触发
OkxGridTradeService.onPositionUpdate
└── executor.placeTakeProfit() ← 仓位成交后设止盈
OkxGridTradeService.stopGrid
└── executor.cancelAllPriceTriggeredOrders() + shutdown()
文件:OkxGridTradeService.java
这是整个包的核心,实现了**多空双开网格交易策略**的所有状态机和网格队列逻辑。
WAITING_KLINE ──(首根K线到达)──▶ OPENING ──(双基底成交)──▶ ACTIVE ──(止盈/止损达标)──▶ STOPPED
▲ │
└──────────────────(startGrid() 重新启动)──────────────────┘
| 状态 | 含义 |
|---|---|
WAITING_KLINE |
等待首根 K 线到达 |
OPENING |
已收到 K 线,正在开基底仓位(多+空各一单) |
ACTIVE |
双基底已成交,网格队列已生成,正常交易中 |
STOPPED |
已停止(止盈达标 / 亏损超限 / 手动停止) |
| 字段 | 类型 | 说明 |
|---|---|---|
shortPriceQueue |
List<BigDecimal> |
空仓价格队列(**降序**) |
longPriceQueue |
List<BigDecimal> |
多仓价格队列(**升序**) |
shortBaseEntryPrice |
BigDecimal |
空仓基底成交价 |
longBaseEntryPrice |
BigDecimal |
多仓基底成交价 |
baseLongOpened |
boolean |
多仓基底是否已开 |
baseShortOpened |
boolean |
空仓基底是否已开 |
cumulativePnl |
BigDecimal |
累计已实现盈亏 |
1. startGrid()
└── 重置所有状态 → WAITING_KLINE
2. onKline(closePrice) → WAITING_KLINE
└── 转为 OPENING → 发送基底多单 + 基底空单
3. onPositionUpdate(posSide, size, entryPrice)
├── 仓位从无到有
│ ├── 标记 baseLongOpened / baseShortOpened
│ ├── 记录 entryPrice
│ ├── 双基底都成交 → tryGenerateQueues() → ACTIVE
│ └── 生成网格队列:
│ · 空仓队列:entryPrice × (1 - gridRate×1), (1 - gridRate×2), ... 降序排列
│ · 多仓队列:entryPrice × (1 + gridRate×1), (1 + gridRate×2), ... 升序排列
├── 仓位量增加(网格触发开仓成交)
│ └── 检查队列非空 → 取队列首元素做止盈价 → executor.placeTakeProfit()
└── 仓位归零
└── 标记 active=false
4. onKline(closePrice) → ACTIVE
├── processShortGrid(closePrice) ← 空仓网格处理
│ ├── 匹配队列中 > closePrice 的元素
│ ├── 移除已匹配 → 尾部补充新元素(递减)
│ ├── 多仓队列转移(以对方队列首元素为种子生成递减元素)
│ ├── 贴近持仓均价过滤(skip)
│ ├── 保证金安全检查
│ ├── 开空一次
│ └── 额外反向开多(价格夹在多/空均价之间且多>空倒挂时)
└── processLongGrid(closePrice) ← 多仓网格处理
└── (对称逻辑,方向反转)
5. onOrderFilled(posSide, fillSz, pnl)
├── cumulativePnl += pnl
├── cumulativePnl ≥ overallTp → STOPPED
└── cumulativePnl ≤ -maxLoss → STOPPED
6. stopGrid()
├── 状态 → STOPPED
├── cancelAllPriceTriggeredOrders()
└── executor.shutdown()
processShortGrid(currentPrice) 详解1. 匹配队列元素(空仓队列降序遍历,收集 > currentPrice 的元素)
└── 为空 → 直接返回
2. 空仓队列更新
├── 移除 matched 元素
└── 尾部补充新元素(尾价 × (1 - gridRate) 循环递减)→ 降序排序
3. 多仓队列转移
├── 以多仓队列首元素(最小价)为种子
├── 生成 matched.size() 个递减元素加入多仓队列
├── 贴近持仓均价过滤:元素与多仓均价差距 < gridRate → skip
└── 升序排序,超长截断
4. 保证金安全检查
└── 超限 → warn 跳过开仓
5. 开空一次 → executor.openShort()
6. 额外反向开多条件(同时满足):
├── longEntryPrice > shortEntryPrice(多>空倒挂)
├── currentPrice > shortEntryPrice(当前价在空仓均价上方)
└── currentPrice < longEntryPrice × (1 - gridRate)(远离多仓均价)
└── 满足 → executor.openLong() 额外开多一次
processLongGrid(currentPrice) 详解对称逻辑,方向反转。
文件:OkxWebSocketClientMain.java
纯 main 方法启动,不依赖 Spring 容器。
用于**本地测试和调试**,无需启动整个 Spring 应用。
public static void main(String[] args) {
OkxConfig config = OkxConfig.builder()
.apiKey("xxx")
.secretKey("xxx")
.passphrase("xxx")
.contract("BTC-USDT-SWAP")
.marginMode("cross")
.leverage(1)
.quantity("1")
.gridRate(new BigDecimal("0.01"))
.gridQueueSize(10)
.overallTp(new BigDecimal("100"))
.isProduction(false)
.build();
OkxGridTradeService service = new OkxGridTradeService(config, "test-account");
service.startGrid();
OkxKlineWebSocketClient wsClient = new OkxKlineWebSocketClient(config, service, ...);
wsClient.connect();
// 注册 JVM 关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
service.stopGrid();
wsClient.close();
}));
}
┌─────────────────────────────────────────────────────────────┐
│ 1. 启动阶段 │
├─────────────────────────────────────────────────────────────┤
│ OkxWebSocketClientManager.init() │
│ ├── new OkxConfig.Builder()...build() │
│ ├── new OkxGridTradeService(config, accountName) │
│ │ └── new OkxTradeExecutor(contract, marginMode, name) │
│ ├── gridTradeService.startGrid() → WAITING_KLINE │
│ ├── new OkxCandlestickChannelHandler(instId, candlePeriod, │
│ │ gridTradeService, config) │
│ ├── new OkxPositionsChannelHandler(instId, gridTradeService)│
│ ├── new OkxAccountChannelHandler() │
│ ├── new OkxOrderInfoChannelHandler(instId, gridTradeService,│
│ │ config) │
│ ├── new OkxKlineWebSocketClient(config, handlers, ...) │
│ ├── wsClient.connect() │
│ │ ├── 连接公开 WS → 订阅 K线频道 │
│ │ ├── 连接私有 WS → 登录 → 订阅 持仓/账户/订单/策略委托 │
│ │ └── 启动心跳定时器 │
│ └── executor.setWebSocketClient(privateWsClient) │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 2. 运行时数据流 │
├─────────────────────────────────────────────────────────────┤
│ [K线推送] │
│ OkxCandlestickChannelHandler.handleMessage() │
│ └── gridTradeService.onKline(closePrice) │
│ ├── WAITING_KLINE → OPENING │
│ │ ├── executor.openLong(quantity, onSuccess, │
│ │ │ onFailure) │
│ │ └── executor.openShort(quantity, onSuccess, │
│ │ onFailure) │
│ └── ACTIVE │
│ ├── processShortGrid(closePrice) │
│ │ ├── 匹配队列 → 更新队列 → 转移对方队列 │
│ │ └── executor.openShort() │
│ └── processLongGrid(closePrice) │
│ └──(对称逻辑) │
│ │
│ [持仓推送] │
│ OkxPositionsChannelHandler.handleMessage() │
│ └── gridTradeService.onPositionUpdate(posSide, size, avgPx)│
│ ├── 基底成交 → 标记 baseOpened → tryGenerateQueues() │
│ └── 增量成交 → executor.placeTakeProfit(tp, type, sz)│
│ │
│ [订单成交推送] │
│ OkxOrderInfoChannelHandler.handleMessage() │
│ └── gridTradeService.onOrderFilled(posSide, fillSz, pnl) │
│ └── cumulativePnl 累加 → 达标则停止 │
└─────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────┐
│ 3. 停止阶段 │
├─────────────────────────────────────────────────────────────┤
│ OkxWebSocketClientManager.close() │
│ ├── gridTradeService.stopGrid() │
│ │ ├── state = STOPPED │
│ │ ├── executor.cancelAllPriceTriggeredOrders() │
│ │ │ └── wsClient.send(cancel-algos) │
│ │ └── executor.shutdown() │
│ └── wsClient.close() │
│ └── 关闭公开WS + 私有WS │
└─────────────────────────────────────────────────────────────┘
| 方面 | Gate API (gateApi) | OKX API (okxApi) |
|---|---|---|
| 下单方式 | REST API (FuturesApi.createFuturesOrder) |
WebSocket JSON 消息 (op: "order") |
| 止盈单 | REST API (createPriceTriggeredOrder),plan-close-*-position |
WS 消息 (op: "batch-orders"),ordType: limit + px 触发价 |
| 仓位方向 | 正数=开多、负数=开空(size 带符号) | posSide: long/short 显式区分,sz 始终正数 |
| 保证金模式 | 无(Gate API 隐含) | tdMode: cross/isolated 显式指定 |
| 客户端订单 ID | 自动生成(Gate API 隐式处理) | clOrdId 显式生成和传入 |
| 取消条件单 | REST API (cancelPriceTriggeredOrderList) |
WS 消息 (op: "cancel-algos") |
| 止损失败兜底 | REST createFuturesOrder IOC 市价平仓 |
WS 消息 marketClose()(tradeType: "3") |
| 成交识别 | 通过 WS FuturesOrderBookTicker 或 REST 查询 |
WS orders 频道推送 state=filled |
| API 认证 | HMAC-SHA256 请求头签名(Gate SDK 封装) | HMAC-SHA256 + Base64 WS 登录消息 |
| WS 连接 | 单一连接,频道订阅混合 | 双连接:公开 WS(K线)+ 私有 WS(持仓/账户/订单) |
placeTakeProfit 签名 |
(triggerPrice, rule, orderType, size) 多一个 rule 参数 |
(triggerPrice, orderType, size) (OKX 无 rule 概念) |
| 止盈单下单 | 单条 FuturesPriceTriggeredOrder |
batch-orders 包装(List 格式,但实际传 1 条) |
| 心跳机制 | 应用层 ping/pong JSON 消息 | java-websocket 自带 ping/pong + 60s 超时重连 |
| 包依赖 | 依赖 Gate SDK (io.gate.gateapi) |
完全自包含,仅依赖 java-websocket + fastjson + lombok |