src/main/java/com/xcong/excoin/modules/okxNewPrice/OkxQuantWebSocketClient.java
@@ -268,41 +268,8 @@ AccountWs.handleEvent(response, redisUtils); } else if (PositionsWs.POSITIONSWS_CHANNEL.equals(channel)) { PositionsWs.handleEvent(response, redisUtils); String posKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + CoinEnums.HE_YUE.getCode() + ":pos"; String pos = (String) redisUtils.get(posKey); if (StrUtil.isBlank(pos)) { log.error("未获取到持仓数量"); TradeOrderWs.orderEvent(webSocketClient, redisUtils, OrderParamEnums.INIT.getValue()); return; } String state = (String) redisUtils.get(InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + CoinEnums.HE_YUE.getCode() + ":state"); if (OrderParamEnums.STATE_3.getValue().equals(state)){ log.error("持仓盈亏超过下单总保证金,止损冷静一天......"); TradeOrderWs.orderEvent(webSocketClient, redisUtils, OrderParamEnums.OUT.getValue()); return; } String uplKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + CoinEnums.HE_YUE.getCode() + ":upl"; String upl = (String) redisUtils.get(uplKey); if (StrUtil.isBlank(upl)){ upl = "0"; } String totalOrderUsdtKey = AccountWs.ACCOUNTWS_CHANNEL + ":" + CoinEnums.USDT.getCode() + ":totalOrderUsdt"; String totalOrderUsdt = (String) redisUtils.get(totalOrderUsdtKey); BigDecimal multiply = new BigDecimal(upl).multiply(new BigDecimal("-1")); if (new BigDecimal(totalOrderUsdt).compareTo(multiply) < 0) { log.error("持仓盈亏超过下单总保证金,止损冷静一天......"); TradeOrderWs.orderEvent(webSocketClient, redisUtils, OrderParamEnums.OUT.getValue()); return; } String side = caoZuoService.caoZuo(); if (StrUtil.isNotBlank(pos)) { TradeOrderWs.orderEvent(webSocketClient, redisUtils, side); } TradeOrderWs.orderEvent(webSocketClient, redisUtils, side); } else if (BalanceAndPositionWs.CHANNEL_NAME.equals(channel)) { BalanceAndPositionWs.handleEvent(response); } src/main/java/com/xcong/excoin/modules/okxNewPrice/celue/CaoZuoServiceImpl.java
@@ -1,8 +1,10 @@ package com.xcong.excoin.modules.okxNewPrice.celue; import cn.hutool.core.util.StrUtil; import com.xcong.excoin.modules.okxNewPrice.okxWs.AccountWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.InstrumentsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.PositionsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.TradeOrderWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.OrderParamEnums; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeQueue; @@ -31,6 +33,18 @@ private final RedisUtils redisUtils; private final WangGeService wangGeService; // 构造Redis键名 final String coinCode = CoinEnums.HE_YUE.getCode(); final String instrumentsStateKey = InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + coinCode + ":state"; final String positionsMarkPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":markPx"; final String positionsAvgPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":avgPx"; final String positionsOrderPriceKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":orderPrice"; final String positionsUplKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":upl"; final String positionsRealizedPnlKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":realizedPnl"; final String positionsImrKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":imr"; final String positionsPosKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":pos"; /** * 执行主要的操作逻辑,包括读取合约状态、获取市场价格信息, * 并根据当前持仓均价和标记价格决定是否执行买卖操作。 @@ -39,23 +53,47 @@ */ @Override public String caoZuo() { // 构造Redis键名 final String coinCode = CoinEnums.HE_YUE.getCode(); final String instrumentsKey = InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + coinCode + ":state"; final String positionsMarkPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":markPx"; final String positionsAvgPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":avgPx"; final String positionsOrderPriceKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":orderPrice"; final String uplKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":upl"; final String realizedPnlKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":realizedPnl"; final String imrKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":imr"; log.info("开始执行操作......"); String pos = (String) redisUtils.get(positionsPosKey); if (StrUtil.isBlank(pos) || BigDecimal.ZERO.compareTo( new BigDecimal(pos)) <= 0) { log.error("未获取到持仓数量"); return OrderParamEnums.INIT.getValue(); } // 获取合约状态 String state = (String) redisUtils.get(instrumentsKey); if (state == null || !OrderParamEnums.STATE_1.getValue().equals(state)) { // 获取合约执行操作状态 String state = (String) redisUtils.get(instrumentsStateKey); if (OrderParamEnums.STATE_4.getValue().equals(state)) { log.error("操作下单中,等待......"); return OrderParamEnums.ORDERING.getValue(); } if (OrderParamEnums.STATE_3.getValue().equals(state)){ log.error("持仓盈亏超过下单总保证金,止损冷静一天......"); return OrderParamEnums.OUT.getValue(); } if (OrderParamEnums.STATE_2.getValue().equals(state)){ log.error("持仓盈亏抗压......"); return OrderParamEnums.HOLDING.getValue(); } if (OrderParamEnums.STATE_4.getValue().equals(state)) { return OrderParamEnums.ORDERING.getValue(); if (OrderParamEnums.STATE_0.getValue().equals(state)){ log.error("请检查系统参数,不允许开仓......"); return OrderParamEnums.HOLDING.getValue(); } String uplStr = (String) redisUtils.get(positionsUplKey); if (StrUtil.isBlank(uplStr)){ return OrderParamEnums.INIT.getValue(); } //可使用的总保证金 String totalOrderUsdtKey = AccountWs.ACCOUNTWS_CHANNEL + ":" + CoinEnums.USDT.getCode() + ":totalOrderUsdt"; String totalOrderUsdt = (String) redisUtils.get(totalOrderUsdtKey); BigDecimal upl = new BigDecimal(uplStr); if (BigDecimal.ZERO.compareTo(upl) >= 0){ upl = upl.multiply(new BigDecimal("-1")); if (upl.compareTo(new BigDecimal(totalOrderUsdt)) >= 0) { log.error("持仓盈亏超过下单总保证金,止损冷静一天......"); return OrderParamEnums.OUT.getValue(); } } log.info(OrderParamEnums.getNameByValue(state)); @@ -87,6 +125,12 @@ // 判断是加仓还是减仓 if (avgPx.compareTo(markPx) > 0) { log.info("开始加仓..."); if (queueKaiCang.isEmpty()) { // 队列为空 log.info("开始加仓,但是超出了网格设置..."); return side; } DescBigDecimal kaiCang = queueKaiCang.peek(); if (kaiCang != null && markPx.compareTo(kaiCang.getValue()) <= 0 && avgPx.compareTo(kaiCang.getValue()) >= 0) { log.info("开始加仓...开仓队列价格大于当前价格{}>{}", kaiCang.getValue(), markPx); @@ -96,23 +140,26 @@ log.info("未触发加仓......,等待"); } } else if (avgPx.compareTo(markPx) < 0) { log.info("开始减仓..."); if (queuePingCang.isEmpty()) { // 队列为空 log.info("开始减仓,但是超出了网格设置..."); return side; } AscBigDecimal pingCang = queuePingCang.peek(); if (pingCang != null && markPx.compareTo(pingCang.getValue()) >= 0 && avgPx.compareTo(pingCang.getValue()) < 0) { log.info("开始减仓...平仓队列价格小于当前价格{}<={}", pingCang.getValue(), markPx); //判断当前是否盈利 String upl = (String) redisUtils.get(uplKey); String realizedPnl = (String) redisUtils.get(realizedPnlKey); String imr = (String) redisUtils.get(imrKey); if (upl != null && realizedPnl != null && imr != null) { BigDecimal uplValue = new BigDecimal(upl); String uplstr = (String) redisUtils.get(positionsUplKey); String realizedPnl = (String) redisUtils.get(positionsRealizedPnlKey); String imr = (String) redisUtils.get(positionsImrKey); if (uplstr != null && realizedPnl != null && imr != null) { BigDecimal uplValue = new BigDecimal(uplstr); BigDecimal realizedPnlValue = new BigDecimal(realizedPnl); BigDecimal imrValue = new BigDecimal(imr).multiply(new BigDecimal(OrderParamEnums.PING_CANG_SHOUYI.getValue())); if (realizedPnlValue.compareTo(BigDecimal.ZERO) <= 0) { if (uplValue.compareTo(realizedPnlValue) < 0) { log.info("当前未实现盈亏:{}没有大于已实现收益>{},等待中", uplValue, realizedPnlValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.HOLDING.getValue(); }else if (uplValue.compareTo(realizedPnlValue) > 0 && uplValue.compareTo(imrValue) >= 0) { BigDecimal realizedPnlValueZheng = realizedPnlValue.multiply(new BigDecimal("-1")); if (uplValue.compareTo(realizedPnlValue) > 0 && uplValue.compareTo(imrValue.add(realizedPnlValueZheng)) >= 0) { log.info("当前未实现盈亏:{}大于预计收益>{},赚钱咯", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.SELL.getValue(); @@ -121,6 +168,7 @@ redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.HOLDING.getValue(); } }else { if (uplValue.compareTo(imrValue) >= 0) { log.info("当前未实现盈亏:{}大于预计收益>{},赚钱咯", uplValue, imrValue); @@ -172,13 +220,8 @@ log.warn("无效的价格格式: {}", orderPrice); return; } boolean kaiCangExists = queueKaiCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal)); if (!kaiCangExists) { queueKaiCang.add(new DescBigDecimal(orderPrice)); } else { queueKaiCang.removeIf(item -> item.getValue().equals(priceDecimal)); } // 删除比该价格大的数据(由于是降序队列,所以是删除value.compareTo(priceDecimal) < 0的元素) queueKaiCang.removeIf(item -> item.getValue().compareTo(priceDecimal) <= 0); // 打印开仓队列 StringBuilder kaiCangStr = new StringBuilder(); kaiCangStr.append("开仓队列: ["); @@ -193,14 +236,10 @@ kaiCangStr.append("]"); log.info(kaiCangStr.toString()); boolean pingCangExists = queuePingCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal)); if (!pingCangExists) { queuePingCang.add(new AscBigDecimal(orderPrice)); } else { queuePingCang.removeIf(item -> item.getValue().equals(priceDecimal)); } // 删除比该价格小的数据(由于是升序队列,所以是删除value.compareTo(priceDecimal) > 0的元素) queuePingCang.removeIf(item -> item.getValue().compareTo(priceDecimal) >= 0); // 打印平仓队列 // 打印平仓队列 StringBuilder pingCangStr = new StringBuilder(); pingCangStr.append("平仓队列: ["); first = true; src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/AccountWs.java
@@ -83,6 +83,7 @@ JSONObject accountData = dataArray.getJSONObject(i); JSONArray detailsArray = accountData.getJSONArray(DETAILS_KEY); if (detailsArray == null || detailsArray.isEmpty()) { log.warn("账户频道{}数据为空",CoinEnums.USDT.getCode()); continue; } @@ -95,7 +96,7 @@ String eq = detail.getString(EQ_KEY); if (StrUtil.isBlank(ccy) || StrUtil.isBlank(availBalStr) || StrUtil.isBlank(cashBalStr)) { log.warn("账户详情缺失必要字段,跳过处理"); log.warn("账户频道缺失必要字段,跳过处理"); continue; } @@ -103,7 +104,7 @@ BigDecimal cashBal = parseBigDecimalSafe(cashBalStr); if (availBal == null || cashBal == null || cashBal.compareTo(BigDecimal.ZERO) == 0) { log.warn("无效的账户余额数据,跳过处理"); log.warn("账户频道无效的账户余额数据,跳过处理"); continue; } @@ -111,16 +112,21 @@ BigDecimal divide = availBal.divide(cashBal, 4, RoundingMode.DOWN); String state = (String) redisUtils.get(InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + CoinEnums.HE_YUE.getCode() + ":state"); if (!OrderParamEnums.STATE_3.getValue().equals(state)){ if (divide.compareTo(KANG_CANG_THRESHOLD) > 0) { log.info(OrderParamEnums.STATE_1.getName()); state = OrderParamEnums.STATE_1.getValue(); } else if (divide.compareTo(ZHI_SUN_THRESHOLD) > 0) { log.warn(OrderParamEnums.STATE_2.getName()); state = OrderParamEnums.STATE_2.getValue(); } else { log.error(OrderParamEnums.STATE_3.getName()); state = OrderParamEnums.STATE_3.getValue(); if (OrderParamEnums.STATE_4.getValue().equals(state)){ log.info(OrderParamEnums.STATE_4.getName()); state = OrderParamEnums.STATE_4.getValue(); }else { if (!OrderParamEnums.STATE_3.getValue().equals(state)){ if (divide.compareTo(KANG_CANG_THRESHOLD) > 0) { log.info(OrderParamEnums.STATE_1.getName()); state = OrderParamEnums.STATE_1.getValue(); } else if (divide.compareTo(ZHI_SUN_THRESHOLD) > 0) { log.warn(OrderParamEnums.STATE_2.getName()); state = OrderParamEnums.STATE_2.getValue(); } else { log.error(OrderParamEnums.STATE_3.getName()); state = OrderParamEnums.STATE_3.getValue(); } } } src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/OrderInfoWs.java
@@ -59,6 +59,10 @@ JSONObject detail = dataArray.getJSONObject(i); String instId = detail.getString(INSTID_KEY); if (!CoinEnums.HE_YUE.getCode().equals(instId)){ log.info( "订单详情-币种: {} 没有成交订单", CoinEnums.HE_YUE.getCode() ); continue; } String ordId = detail.getString(ORDID_KEY); String clOrdId = detail.getString(CLORDID_KEY); String side = detail.getString(SIDE_KEY); @@ -77,9 +81,9 @@ String clOrdIdStr = (String) redisUtils.get(TradeOrderWs.ORDERWS_CHANNEL + ":" + CoinEnums.HE_YUE.getCode() + ":clOrdId"); String stateStr = (String) redisUtils.get(TradeOrderWs.ORDERWS_CHANNEL + ":" + CoinEnums.HE_YUE.getCode() + ":state"); if ( clOrdIdStr != null StrUtil.isNotBlank(clOrdIdStr) && clOrdId.equals(clOrdIdStr) && stateStr != null && StrUtil.isNotBlank(stateStr) && state.equals(stateStr) ){ redisUtils.set(InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + CoinEnums.HE_YUE.getCode() + ":state", OrderParamEnums.STATE_1.getValue(), 0); src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/PositionsWs.java
@@ -48,49 +48,49 @@ JSONArray dataArray = response.getJSONArray("data"); if (dataArray == null || dataArray.isEmpty()) { log.info("账户持仓频道数据为空,已当前价买入,并且初始化网格"); JSONObject posData = new JSONObject(); processPositionData(posData, redisUtils); return; } for (int i = 0; i < dataArray.size(); i++) { JSONObject posData = dataArray.getJSONObject(i); String instId = posData.getString("instId"); if (!CoinEnums.HE_YUE.getCode().equals(instId)) { continue; if (CoinEnums.HE_YUE.getCode().equals(instId)) { log.info("查询到账户{}持仓数据",CoinEnums.HE_YUE.getCode()); String mgnMode = posData.getString("mgnMode"); String posSide = posData.getString("posSide"); String pos = posData.getString("pos"); String avgPx = posData.getString("avgPx"); String upl = posData.getString("upl"); String uplRatio = posData.getString("uplRatio"); String lever = posData.getString("lever"); String liqPx = posData.getString("liqPx"); String markPx = posData.getString("markPx"); String imr = posData.getString("imr"); String mgnRatio = posData.getString("mgnRatio"); String mmr = posData.getString("mmr"); String notionalUsd = posData.getString("notionalUsd"); String ccy = posData.getString("ccy"); String last = posData.getString("last"); String idxPx = posData.getString("idxPx"); String bePx = posData.getString("bePx"); String realizedPnl = posData.getString("realizedPnl"); String settledPnl = posData.getString("settledPnl"); log.info( "账户持仓频道-产品类型: {}, 保证金模式: {}, 持仓方向: {}, 持仓数量: {}, 开仓平均价: {}, " + "未实现收益: {}, 未实现收益率: {}, 杠杆倍数: {}, 预估强平价: {}, 初始保证金: {}, " + "维持保证金率: {}, 维持保证金: {}, 以美金价值为单位的持仓数量: {}, 占用保证金的币种: {}, " + "最新成交价: {}, 最新指数价格: {}, 盈亏平衡价: {}, 已实现收益: {}, 累计已结算收益: {}" + "最新标记价格: {}", instId, mgnMode, posSide, pos, avgPx, upl, uplRatio, lever, liqPx, imr, mgnRatio, mmr, notionalUsd, ccy, last, idxPx, bePx, realizedPnl, settledPnl, markPx ); processPositionData(posData, redisUtils); } String mgnMode = posData.getString("mgnMode"); String posSide = posData.getString("posSide"); String pos = posData.getString("pos"); String avgPx = posData.getString("avgPx"); String upl = posData.getString("upl"); String uplRatio = posData.getString("uplRatio"); String lever = posData.getString("lever"); String liqPx = posData.getString("liqPx"); String markPx = posData.getString("markPx"); String imr = posData.getString("imr"); String mgnRatio = posData.getString("mgnRatio"); String mmr = posData.getString("mmr"); String notionalUsd = posData.getString("notionalUsd"); String ccy = posData.getString("ccy"); String last = posData.getString("last"); String idxPx = posData.getString("idxPx"); String bePx = posData.getString("bePx"); String realizedPnl = posData.getString("realizedPnl"); String settledPnl = posData.getString("settledPnl"); log.info( "账户持仓频道-产品类型: {}, 保证金模式: {}, 持仓方向: {}, 持仓数量: {}, 开仓平均价: {}, " + "未实现收益: {}, 未实现收益率: {}, 杠杆倍数: {}, 预估强平价: {}, 初始保证金: {}, " + "维持保证金率: {}, 维持保证金: {}, 以美金价值为单位的持仓数量: {}, 占用保证金的币种: {}, " + "最新成交价: {}, 最新指数价格: {}, 盈亏平衡价: {}, 已实现收益: {}, 累计已结算收益: {}" + "最新标记价格: {}", instId, mgnMode, posSide, pos, avgPx, upl, uplRatio, lever, liqPx, imr, mgnRatio, mmr, notionalUsd, ccy, last, idxPx, bePx, realizedPnl, settledPnl, markPx ); processPositionData(posData, redisUtils); } } catch (Exception e) { log.error("处理持仓频道推送数据失败", e); src/main/java/com/xcong/excoin/modules/okxNewPrice/okxWs/TradeOrderWs.java
@@ -24,32 +24,36 @@ public static void orderEvent(WebSocketClient webSocketClient, RedisUtils redisUtils, String side) { String buyCnt = null; String ctval = getRedisValue(redisUtils, InstrumentsWs.INSTRUMENTSWS_CHANNEL, ":ctVal"); String buyCntNormal = getRedisValue(redisUtils, PositionsWs.POSITIONSWS_CHANNEL, ":buyCnt"); String pos = getRedisValue(redisUtils, PositionsWs.POSITIONSWS_CHANNEL, ":pos"); if (OrderParamEnums.ORDERING.getValue().equals(side)) { return; } else if (OrderParamEnums.HOLDING.getValue().equals(side)) { return; } else if (OrderParamEnums.INIT.getValue().equals(side)) { side = OrderParamEnums.BUY.getValue(); String buyCntNormal = getRedisValue(redisUtils, PositionsWs.POSITIONSWS_CHANNEL, ":buyCnt"); if (StrUtil.isNotBlank(buyCntNormal) && new BigDecimal(buyCntNormal).compareTo(BigDecimal.ZERO) > 0) { if (StrUtil.isNotBlank(buyCntNormal) && BigDecimal.ZERO.compareTo(new BigDecimal(buyCntNormal)) > 0) { buyCnt = buyCntNormal; }else{ buyCnt = getRedisValue(redisUtils, InstrumentsWs.INSTRUMENTSWS_CHANNEL, ":ctVal"); buyCnt = ctval; } } else if (OrderParamEnums.OUT.getValue().equals(side)) { side = OrderParamEnums.SELL.getValue(); buyCnt = getRedisValue(redisUtils, PositionsWs.POSITIONSWS_CHANNEL, ":pos"); } else { if (OrderParamEnums.BUY.getValue().equals(side)){ String buyCntNormal = getRedisValue(redisUtils, PositionsWs.POSITIONSWS_CHANNEL, ":buyCnt"); if (StrUtil.isNotBlank(buyCntNormal)) { buyCnt = buyCntNormal; } buyCnt = pos; } else if (OrderParamEnums.BUY.getValue().equals(side)){ side = OrderParamEnums.BUY.getValue(); if (StrUtil.isNotBlank(buyCntNormal) && BigDecimal.ZERO.compareTo(new BigDecimal(buyCntNormal)) > 0) { buyCnt = buyCntNormal; }else{ side = OrderParamEnums.SELL.getValue(); buyCnt = getRedisValue(redisUtils, PositionsWs.POSITIONSWS_CHANNEL, ":pos"); buyCnt = ctval; } }else if (OrderParamEnums.SELL.getValue().equals(side)){ side = OrderParamEnums.SELL.getValue(); buyCnt = getRedisValue(redisUtils, PositionsWs.POSITIONSWS_CHANNEL, ":pos"); }else{ log.warn("操作信号异常,请检查下单操作..."); return; } // 校验必要参数