package com.xcong.excoin.modules.okxNewPrice.celue; import cn.hutool.core.util.StrUtil; import com.xcong.excoin.modules.okxNewPrice.okxWs.AccountWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.InstrumentsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.PositionsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.TradeOrderWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.OrderParamEnums; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeQueue; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService; import com.xcong.excoin.rabbit.pricequeue.AscBigDecimal; import com.xcong.excoin.rabbit.pricequeue.DescBigDecimal; import com.xcong.excoin.utils.RedisUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.concurrent.PriorityBlockingQueue; /** * 操作服务实现类,用于处理与交易相关的逻辑操作。 * 包括根据市场行情判断是否进行加仓或减仓,并维护相关价格队列。 * * @author Administrator */ @Slf4j @Service @RequiredArgsConstructor public class CaoZuoServiceImpl implements CaoZuoService { private final RedisUtils redisUtils; private final WangGeService wangGeService; // 构造Redis键名 final String coinCode = CoinEnums.HE_YUE.getCode(); final String instrumentsStateKey = InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + coinCode + ":state"; final String positionsMarkPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":markPx"; final String positionsAvgPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":avgPx"; final String positionsOrderPriceKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":orderPrice"; final String positionsUplKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":upl"; final String positionsRealizedPnlKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":realizedPnl"; final String positionsImrKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":imr"; final String positionsPosKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":pos"; /** * 执行主要的操作逻辑,包括读取合约状态、获取市场价格信息, * 并根据当前持仓均价和标记价格决定是否执行买卖操作。 * * @return 返回操作类型字符串(如买入BUY、卖出SELL等) */ @Override public String caoZuo() { log.info("开始执行操作......"); String pos = (String) redisUtils.get(positionsPosKey); if (StrUtil.isBlank(pos) || BigDecimal.ZERO.compareTo( new BigDecimal(pos)) <= 0) { log.error("未获取到持仓数量"); return OrderParamEnums.INIT.getValue(); } // 获取合约执行操作状态 String state = (String) redisUtils.get(instrumentsStateKey); if (OrderParamEnums.STATE_4.getValue().equals(state)) { log.error("操作下单中,等待......"); return OrderParamEnums.ORDERING.getValue(); } if (OrderParamEnums.STATE_3.getValue().equals(state)){ log.error("持仓盈亏超过下单总保证金,止损冷静一天......"); return OrderParamEnums.OUT.getValue(); } if (OrderParamEnums.STATE_2.getValue().equals(state)){ log.error("持仓盈亏抗压......"); return OrderParamEnums.HOLDING.getValue(); } if (OrderParamEnums.STATE_0.getValue().equals(state)){ log.error("请检查系统参数,不允许开仓......"); return OrderParamEnums.HOLDING.getValue(); } String uplStr = (String) redisUtils.get(positionsUplKey); if (StrUtil.isBlank(uplStr)){ return OrderParamEnums.INIT.getValue(); } //可使用的总保证金 String totalOrderUsdtKey = AccountWs.ACCOUNTWS_CHANNEL + ":" + CoinEnums.USDT.getCode() + ":totalOrderUsdt"; String totalOrderUsdt = (String) redisUtils.get(totalOrderUsdtKey); BigDecimal upl = new BigDecimal(uplStr); if (BigDecimal.ZERO.compareTo(upl) >= 0){ upl = upl.multiply(new BigDecimal("-1")); if (upl.compareTo(new BigDecimal(totalOrderUsdt)) >= 0) { log.error("持仓盈亏超过下单总保证金,止损冷静一天......"); return OrderParamEnums.OUT.getValue(); } } log.info(OrderParamEnums.getNameByValue(state)); // 获取标记价格和平均持仓价格 String markPxObj = (String) redisUtils.get(positionsMarkPxKey); String avgPxObj = (String) redisUtils.get(positionsAvgPxKey); if (StrUtil.isBlank(markPxObj) || StrUtil.isBlank(avgPxObj)) { return OrderParamEnums.INIT.getValue(); } try { BigDecimal markPx = new BigDecimal( markPxObj); BigDecimal avgPx = new BigDecimal( avgPxObj); log.info("开仓价格: {}, 当前价格:{},匹配队列中......", avgPx, markPx); // 初始化网格队列 PriorityBlockingQueue queueAsc = WangGeQueue.getQueueAsc(); PriorityBlockingQueue queueKaiCang = wangGeService.initKaiCang(avgPx, queueAsc); PriorityBlockingQueue queuePingCang = wangGeService.initPingCang(avgPx, queueAsc); // 处理订单价格在队列中的情况 String orderPrice = (String) redisUtils.get(positionsOrderPriceKey); handleOrderPriceInQueues(orderPrice, queueKaiCang, queuePingCang); String side = OrderParamEnums.HOLDING.getValue(); // 判断是加仓还是减仓 if (avgPx.compareTo(markPx) > 0) { log.info("开始加仓..."); if (queueKaiCang.isEmpty()) { // 队列为空 log.info("开始加仓,但是超出了网格设置..."); return side; } DescBigDecimal kaiCang = queueKaiCang.peek(); if (kaiCang != null && markPx.compareTo(kaiCang.getValue()) <= 0 && avgPx.compareTo(kaiCang.getValue()) >= 0) { log.info("开始加仓...开仓队列价格大于当前价格{}>{}", kaiCang.getValue(), markPx); side = OrderParamEnums.BUY.getValue(); redisUtils.set(positionsOrderPriceKey, String.valueOf(kaiCang.getValue()), 0); } else { log.info("未触发加仓......,等待"); } } else if (avgPx.compareTo(markPx) < 0) { log.info("开始减仓..."); if (queuePingCang.isEmpty()) { // 队列为空 log.info("开始减仓,但是超出了网格设置..."); return side; } AscBigDecimal pingCang = queuePingCang.peek(); if (pingCang != null && markPx.compareTo(pingCang.getValue()) >= 0 && avgPx.compareTo(pingCang.getValue()) < 0) { log.info("开始减仓...平仓队列价格小于当前价格{}<={}", pingCang.getValue(), markPx); //判断当前是否盈利 String uplstr = (String) redisUtils.get(positionsUplKey); String realizedPnl = (String) redisUtils.get(positionsRealizedPnlKey); String imr = (String) redisUtils.get(positionsImrKey); if (uplstr != null && realizedPnl != null && imr != null) { BigDecimal uplValue = new BigDecimal(uplstr); BigDecimal realizedPnlValue = new BigDecimal(realizedPnl); BigDecimal imrValue = new BigDecimal(imr).multiply(new BigDecimal(OrderParamEnums.PING_CANG_SHOUYI.getValue())); if (realizedPnlValue.compareTo(BigDecimal.ZERO) <= 0) { BigDecimal realizedPnlValueZheng = realizedPnlValue.multiply(new BigDecimal("-1")); if (uplValue.compareTo(realizedPnlValue) > 0 && uplValue.compareTo(imrValue.add(realizedPnlValueZheng)) >= 0) { log.info("当前未实现盈亏:{}大于预计收益>{},赚钱咯", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.SELL.getValue(); }else{ log.info("当前未实现盈亏:{}没有大于预计收益>{},钱在路上了", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.HOLDING.getValue(); } }else { if (uplValue.compareTo(imrValue) >= 0) { log.info("当前未实现盈亏:{}大于预计收益>{},赚钱咯", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.SELL.getValue(); }else{ log.info("当前未实现盈亏:{}没有大于预计收益>{},钱在路上了", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.HOLDING.getValue(); } } }else { return OrderParamEnums.HOLDING.getValue(); } } else { log.info("未触发减仓......,等待"); } } else { log.info("价格波动较小......,等待"); } return side; } catch (NumberFormatException e) { log.error("解析价格失败,请检查Redis中的值是否合法", e); return OrderParamEnums.HOLDING.getValue(); } } /** * 根据订单价格更新开仓和平仓队列的内容。 * 若该价格不在对应队列中则加入;若已存在,则从队列中移除。 * * @param orderPrice 订单价格 * @param queueKaiCang 开仓价格优先队列(降序) * @param queuePingCang 平仓价格优先队列(升序) */ private void handleOrderPriceInQueues(String orderPrice, PriorityBlockingQueue queueKaiCang, PriorityBlockingQueue queuePingCang) { if (orderPrice == null) { return; } log.info("需要移除的价格: {}", orderPrice); BigDecimal priceDecimal; try { priceDecimal = new BigDecimal(orderPrice); } catch (NumberFormatException ex) { log.warn("无效的价格格式: {}", orderPrice); return; } // 删除比该价格大的数据(由于是降序队列,所以是删除value.compareTo(priceDecimal) < 0的元素) queueKaiCang.removeIf(item -> item.getValue().compareTo(priceDecimal) <= 0); // 打印开仓队列 StringBuilder kaiCangStr = new StringBuilder(); kaiCangStr.append("开仓队列: ["); boolean first = true; for (DescBigDecimal item : queueKaiCang) { if (!first) { kaiCangStr.append(", "); } kaiCangStr.append(item.getValue()); first = false; } kaiCangStr.append("]"); log.info(kaiCangStr.toString()); // 删除比该价格小的数据(由于是升序队列,所以是删除value.compareTo(priceDecimal) > 0的元素) queuePingCang.removeIf(item -> item.getValue().compareTo(priceDecimal) >= 0); // 打印平仓队列 StringBuilder pingCangStr = new StringBuilder(); pingCangStr.append("平仓队列: ["); first = true; for (AscBigDecimal item : queuePingCang) { if (!first) { pingCangStr.append(", "); } pingCangStr.append(item.getValue()); first = false; } pingCangStr.append("]"); log.info(pingCangStr.toString()); } }