package com.xcong.excoin.modules.okxNewPrice.celue; import com.xcong.excoin.modules.okxNewPrice.okxWs.InstrumentsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.PositionsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.OrderParamEnums; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeQueue; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService; import com.xcong.excoin.rabbit.pricequeue.AscBigDecimal; import com.xcong.excoin.rabbit.pricequeue.DescBigDecimal; import com.xcong.excoin.utils.RedisUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.concurrent.PriorityBlockingQueue; /** * 操作服务实现类,用于处理与交易相关的逻辑操作。 * 包括根据市场行情判断是否进行加仓或减仓,并维护相关价格队列。 * * @author Administrator */ @Slf4j @Service @RequiredArgsConstructor public class CaoZuoServiceImpl implements CaoZuoService { private final RedisUtils redisUtils; private final WangGeService wangGeService; /** * 执行主要的操作逻辑,包括读取合约状态、获取市场价格信息, * 并根据当前持仓均价和标记价格决定是否执行买卖操作。 * * @return 返回操作类型字符串(如买入BUY、卖出SELL等) */ @Override public String caoZuo() { // 构造Redis键名 final String coinCode = CoinEnums.HE_YUE.getCode(); final String instrumentsKey = InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + coinCode + ":state"; final String positionsMarkPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":markPx"; final String positionsAvgPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":avgPx"; final String positionsOrderPriceKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":orderPrice"; final String uplKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":upl"; final String realizedPnlKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":realizedPnl"; final String imrKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":imr"; // 获取合约状态 String state = (String) redisUtils.get(instrumentsKey); if (state == null || !OrderParamEnums.STATE_1.getValue().equals(state)) { return OrderParamEnums.HOLDING.getValue(); } if (OrderParamEnums.STATE_4.getValue().equals(state)) { return OrderParamEnums.ORDERING.getValue(); } log.info(OrderParamEnums.getNameByValue(state)); // 获取标记价格和平均持仓价格 Object markPxObj = redisUtils.get(positionsMarkPxKey); Object avgPxObj = redisUtils.get(positionsAvgPxKey); if (markPxObj == null || avgPxObj == null) { return OrderParamEnums.INIT.getValue(); } try { BigDecimal markPx = new BigDecimal((String) markPxObj); BigDecimal avgPx = new BigDecimal((String) avgPxObj); log.info("开仓价格: {}, 当前价格:{},匹配队列中......", avgPx, markPx); // 初始化网格队列 PriorityBlockingQueue queueAsc = WangGeQueue.getQueueAsc(); PriorityBlockingQueue queueKaiCang = wangGeService.initKaiCang(avgPx, queueAsc); PriorityBlockingQueue queuePingCang = wangGeService.initPingCang(avgPx, queueAsc); // 处理订单价格在队列中的情况 String orderPrice = (String) redisUtils.get(positionsOrderPriceKey); handleOrderPriceInQueues(orderPrice, queueKaiCang, queuePingCang); String side = OrderParamEnums.HOLDING.getValue(); // 判断是加仓还是减仓 if (avgPx.compareTo(markPx) > 0) { DescBigDecimal kaiCang = queueKaiCang.peek(); if (kaiCang != null && markPx.compareTo(kaiCang.getValue()) <= 0 && avgPx.compareTo(kaiCang.getValue()) >= 0) { log.info("开始加仓...开仓队列价格大于当前价格{}>{}", kaiCang.getValue(), markPx); side = OrderParamEnums.BUY.getValue(); redisUtils.set(positionsOrderPriceKey, String.valueOf(kaiCang.getValue()), 0); } else { log.info("未触发加仓......,等待"); } } else if (avgPx.compareTo(markPx) < 0) { AscBigDecimal pingCang = queuePingCang.peek(); if (pingCang != null && markPx.compareTo(pingCang.getValue()) >= 0 && avgPx.compareTo(pingCang.getValue()) < 0) { log.info("开始减仓...平仓队列价格小于当前价格{}<={}", pingCang.getValue(), markPx); //判断当前是否盈利 String upl = (String) redisUtils.get(uplKey); String realizedPnl = (String) redisUtils.get(realizedPnlKey); String imr = (String) redisUtils.get(imrKey); if (upl != null && realizedPnl != null && imr != null) { BigDecimal uplValue = new BigDecimal(upl); BigDecimal realizedPnlValue = new BigDecimal(realizedPnl); BigDecimal imrValue = new BigDecimal(imr).multiply(new BigDecimal(OrderParamEnums.PING_CANG_SHOUYI.getValue())); if (realizedPnlValue.compareTo(BigDecimal.ZERO) <= 0) { if (uplValue.compareTo(realizedPnlValue) < 0) { log.info("当前未实现盈亏:{}没有大于已实现收益>{},等待中", uplValue, realizedPnlValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.HOLDING.getValue(); }else if (uplValue.compareTo(realizedPnlValue) > 0 && uplValue.compareTo(imrValue) >= 0) { log.info("当前未实现盈亏:{}大于预计收益>{},赚钱咯", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.SELL.getValue(); }else{ log.info("当前未实现盈亏:{}没有大于预计收益>{},钱在路上了", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.HOLDING.getValue(); } }else { if (uplValue.compareTo(imrValue) >= 0) { log.info("当前未实现盈亏:{}大于预计收益>{},赚钱咯", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.SELL.getValue(); }else{ log.info("当前未实现盈亏:{}没有大于预计收益>{},钱在路上了", uplValue, imrValue); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); return OrderParamEnums.HOLDING.getValue(); } } }else { return OrderParamEnums.HOLDING.getValue(); } } else { log.info("未触发减仓......,等待"); } } else { log.info("价格波动较小......,等待"); } return side; } catch (NumberFormatException e) { log.error("解析价格失败,请检查Redis中的值是否合法", e); return OrderParamEnums.HOLDING.getValue(); } } /** * 根据订单价格更新开仓和平仓队列的内容。 * 若该价格不在对应队列中则加入;若已存在,则从队列中移除。 * * @param orderPrice 订单价格 * @param queueKaiCang 开仓价格优先队列(降序) * @param queuePingCang 平仓价格优先队列(升序) */ private void handleOrderPriceInQueues(String orderPrice, PriorityBlockingQueue queueKaiCang, PriorityBlockingQueue queuePingCang) { if (orderPrice == null) { return; } log.info("需要移除的价格: {}", orderPrice); BigDecimal priceDecimal; try { priceDecimal = new BigDecimal(orderPrice); } catch (NumberFormatException ex) { log.warn("无效的价格格式: {}", orderPrice); return; } boolean kaiCangExists = queueKaiCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal)); if (!kaiCangExists) { queueKaiCang.add(new DescBigDecimal(orderPrice)); } else { queueKaiCang.removeIf(item -> item.getValue().equals(priceDecimal)); } // 打印开仓队列 StringBuilder kaiCangStr = new StringBuilder(); kaiCangStr.append("开仓队列: ["); boolean first = true; for (DescBigDecimal item : queueKaiCang) { if (!first) { kaiCangStr.append(", "); } kaiCangStr.append(item.getValue()); first = false; } kaiCangStr.append("]"); log.info(kaiCangStr.toString()); boolean pingCangExists = queuePingCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal)); if (!pingCangExists) { queuePingCang.add(new AscBigDecimal(orderPrice)); } else { queuePingCang.removeIf(item -> item.getValue().equals(priceDecimal)); } // 打印平仓队列 StringBuilder pingCangStr = new StringBuilder(); pingCangStr.append("平仓队列: ["); first = true; for (AscBigDecimal item : queuePingCang) { if (!first) { pingCangStr.append(", "); } pingCangStr.append(item.getValue()); first = false; } pingCangStr.append("]"); log.info(pingCangStr.toString()); } }