package com.xcong.excoin.modules.okxNewPrice.celue; import com.xcong.excoin.modules.okxNewPrice.okxWs.InstrumentsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.PositionsWs; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.CoinEnums; import com.xcong.excoin.modules.okxNewPrice.okxWs.enums.OrderParamEnums; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeQueue; import com.xcong.excoin.modules.okxNewPrice.wangge.WangGeService; import com.xcong.excoin.rabbit.pricequeue.AscBigDecimal; import com.xcong.excoin.rabbit.pricequeue.DescBigDecimal; import com.xcong.excoin.utils.RedisUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.util.concurrent.PriorityBlockingQueue; /** * 操作服务实现类,用于处理与交易相关的逻辑操作。 * 包括根据市场行情判断是否进行加仓或减仓,并维护相关价格队列。 * * @author Administrator */ @Slf4j @Service @RequiredArgsConstructor public class CaoZuoServiceImpl implements CaoZuoService { private final RedisUtils redisUtils; private final WangGeService wangGeService; /** * 执行主要的操作逻辑,包括读取合约状态、获取市场价格信息, * 并根据当前持仓均价和标记价格决定是否执行买卖操作。 * * @return 返回操作类型字符串(如买入BUY、卖出SELL等) */ @Override public String caoZuo() { // 构造Redis键名 final String coinCode = CoinEnums.HE_YUE.getCode(); final String instrumentsKey = InstrumentsWs.INSTRUMENTSWS_CHANNEL + ":" + coinCode + ":state"; final String positionsMarkPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":markPx"; final String positionsAvgPxKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":avgPx"; final String positionsOrderPriceKey = PositionsWs.POSITIONSWS_CHANNEL + ":" + coinCode + ":orderPrice"; // 获取合约状态 String state = (String) redisUtils.get(instrumentsKey); if (state == null || !OrderParamEnums.STATE_1.getValue().equals(state)) { return OrderParamEnums.HOLDING.getValue(); } if (OrderParamEnums.STATE_4.getValue().equals(state)) { return OrderParamEnums.ORDERING.getValue(); } log.info(OrderParamEnums.getNameByValue(state)); // 获取标记价格和平均持仓价格 Object markPxObj = redisUtils.get(positionsMarkPxKey); Object avgPxObj = redisUtils.get(positionsAvgPxKey); if (markPxObj == null || avgPxObj == null) { return OrderParamEnums.INIT.getValue(); } try { BigDecimal markPx = new BigDecimal((String) markPxObj); BigDecimal avgPx = new BigDecimal((String) avgPxObj); log.info("开仓价格: {}, 当前价格:{},匹配队列中......", avgPx, markPx); // 初始化网格队列 PriorityBlockingQueue queueAsc = WangGeQueue.getQueueAsc(); PriorityBlockingQueue queueKaiCang = wangGeService.initKaiCang(avgPx, queueAsc); PriorityBlockingQueue queuePingCang = wangGeService.initPingCang(avgPx, queueAsc); // 处理订单价格在队列中的情况 String orderPrice = (String) redisUtils.get(positionsOrderPriceKey); handleOrderPriceInQueues(orderPrice, queueKaiCang, queuePingCang); String side = OrderParamEnums.HOLDING.getValue(); // 判断是加仓还是减仓 if (avgPx.compareTo(markPx) > 0) { DescBigDecimal kaiCang = queueKaiCang.peek(); if (kaiCang != null && kaiCang.getValue().compareTo(markPx) >= 0) { log.info("开始加仓...开仓队列价格大于当前价格{}>{}", kaiCang.getValue(), markPx); side = OrderParamEnums.BUY.getValue(); redisUtils.set(positionsOrderPriceKey, String.valueOf(kaiCang.getValue()), 0); } else { log.info("未触发加仓......,等待"); } } else if (avgPx.compareTo(markPx) < 0) { AscBigDecimal pingCang = queuePingCang.peek(); if (pingCang != null && pingCang.getValue().compareTo(markPx) <= 0) { log.info("开始减仓...平仓队列价格小于当前价格{}<={}", pingCang.getValue(), markPx); side = OrderParamEnums.SELL.getValue(); redisUtils.set(positionsOrderPriceKey, String.valueOf(pingCang.getValue()), 0); } else { log.info("未触发减仓......,等待"); } } else { log.info("价格波动较小......,等待"); } return side; } catch (NumberFormatException e) { log.error("解析价格失败,请检查Redis中的值是否合法", e); return OrderParamEnums.HOLDING.getValue(); } } /** * 根据订单价格更新开仓和平仓队列的内容。 * 若该价格不在对应队列中则加入;若已存在,则从队列中移除。 * * @param orderPrice 订单价格 * @param queueKaiCang 开仓价格优先队列(降序) * @param queuePingCang 平仓价格优先队列(升序) */ private void handleOrderPriceInQueues(String orderPrice, PriorityBlockingQueue queueKaiCang, PriorityBlockingQueue queuePingCang) { if (orderPrice == null) { return; } BigDecimal priceDecimal; try { priceDecimal = new BigDecimal(orderPrice); } catch (NumberFormatException ex) { log.warn("无效的价格格式: {}", orderPrice); return; } boolean kaiCangExists = queueKaiCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal)); if (!kaiCangExists) { queueKaiCang.add(new DescBigDecimal(orderPrice)); } else { queueKaiCang.removeIf(item -> item.getValue().equals(priceDecimal)); } boolean pingCangExists = queuePingCang.stream().anyMatch(item -> item.getValue().equals(priceDecimal)); if (!pingCangExists) { queuePingCang.add(new AscBigDecimal(orderPrice)); } else { queuePingCang.removeIf(item -> item.getValue().equals(priceDecimal)); } } /** * 计算盈亏金额。 * * @param faceValue 面值 * @param position 持仓数量 * @param contractMultiplier 合约乘数 * @param markPrice 标记价格 * @param openPrice 开仓价格 * @param isLong 是否为多头仓位 * @param minTickSz 最小变动单位精度 * @return 盈亏金额,保留指定精度的小数位 */ public BigDecimal profit(BigDecimal faceValue, BigDecimal position, BigDecimal contractMultiplier, BigDecimal markPrice, BigDecimal openPrice, boolean isLong, int minTickSz) { BigDecimal profit = BigDecimal.ZERO; if (isLong) { profit = markPrice.subtract(openPrice) .multiply(faceValue) .multiply(contractMultiplier) .multiply(position); } else { profit = openPrice.subtract(markPrice) .multiply(faceValue) .multiply(contractMultiplier) .multiply(position); } return profit.setScale(minTickSz, BigDecimal.ROUND_DOWN); } }