package com.xcong.excoin.rabbit.pricequeue; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.rabbit.producer.OrderProducer; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; @Slf4j @Component public class WebsocketPriceService { @Autowired OrderProducer orderProducer; /** * @param symbol * @param price */ public void comparePriceAsc(String symbol, String price) { // 比较价格 正序的 最小元素在头部 开多止盈 开空止损等 PriorityBlockingQueue queue = PricePriorityQueue.getQueueAsc(symbol); // 最小的 AscBigDecimal b = queue.peek(); // 当前价 AscBigDecimal now = new AscBigDecimal(price); List list = new ArrayList(); // 找到所有比当前价格大的 是需要操作的 if (b != null && b.compareTo(now) <= 0) { // 可以操作 System.out.println("当前价格:" + price + "---正序---" + "队列价格:" + b.getValue().toPlainString() + " time:" + new Date()); while (queue.peek() != null && queue.peek().compareTo(now) <= 0) { // 可以发送消息操作 list.add(queue.remove()); } } if (CollectionUtils.isNotEmpty(list)) { dealAscPriceOrderAndSenMq(list, symbol); } } public void comparePriceDesc(String symbol, String price) { // 比较价格 倒叙的 开多止损 开空止盈 PriorityBlockingQueue queue = PricePriorityQueue.getQueueDesc(symbol); // 最大价格 DescBigDecimal b = queue.peek(); // 当前价格 DescBigDecimal now = new DescBigDecimal(price); List list = new ArrayList(); // 找到比当前价格还大的就是需要操作的 开多止损 // 即最大的币当前价大 那么需要开多止损 if (b != null && b.compareTo(now) <= 0) { // 可以操作 System.out.println("当前价格:" + price + "---倒序操作---" + "队列:" + b.getValue().toPlainString() + " time:" + new Date()); while (queue.peek() != null && queue.peek().compareTo(now) <= 0) { // 可以发送消息操作 list.add(queue.remove()); log.info("#{}#", JSONObject.toJSONString(list)); } } if (CollectionUtils.isNotEmpty(list)) { dealDescPriceOrderAndSenMq(list, symbol); } } // 处理消息 正序的 包括 // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空 public void dealAscPriceOrderAndSenMq(List list, String symbol) { if (CollectionUtils.isNotEmpty(list)) { // 根据不同类型发送不同消息 1 倒序 2 正序 List orderModelList = new ArrayList(); // 3 正序 Map> orderMap = PricePriorityQueue.getOrderMap(symbol, 3); // 根据价格查询到对应的订单 for (AscBigDecimal asc : list) { String key = asc.getValue().toPlainString(); assert orderMap != null; if (orderMap.containsKey(key)) { orderModelList.addAll(orderMap.get(key)); orderMap.remove(key); } } if (CollectionUtils.isEmpty(orderModelList)) { return; } System.out.println("本次执行的列表ASC"); System.out.println(JSONObject.toJSONString(orderModelList)); // 根据订单的类型发送消息 // 3:开空 7:爆仓平空 // 9:止盈平多 12:止损平空 for (OrderModel model : orderModelList) { // 止损平空 List kkzsList = new ArrayList(); // 止盈平多 List kdzyList = new ArrayList(); // 爆仓平空 List bcList = new ArrayList(); // 开空 List wtkkList = new ArrayList(); switch (model.getType()) { case 3: wtkkList.add(model); break; case 7: bcList.add(model); break; case 9: kdzyList.add(model); break; case 12: kkzsList.add(model); break; default: log.info("#price-service unknown type#"); break; } // 发送消息 if (CollectionUtils.isNotEmpty(kkzsList)) { String kkzs = JSONObject.toJSONString(kkzsList); orderProducer.sendLessLoss(kkzs); } if (CollectionUtils.isNotEmpty(kdzyList)) { String kdzy = JSONObject.toJSONString(kdzyList); orderProducer.sendMorePro(kdzy); } if (CollectionUtils.isNotEmpty(bcList)) { orderProducer.sendCoinout(JSONObject.toJSONString(bcList)); } if (CollectionUtils.isNotEmpty(wtkkList)) { orderProducer.sendLimit(JSONObject.toJSONString(wtkkList)); } } } } // 处理消息 正序的 包括 // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空 public void dealDescPriceOrderAndSenMq(List list, String symbol) { if (CollectionUtils.isNotEmpty(list)) { // 根据不同类型发送不同消息 1 倒序 2 正序 List orderModelList = new ArrayList(); Map> orderMap = PricePriorityQueue.getOrderMap(symbol, 2); // 根据价格查询到对应的订单 for (DescBigDecimal desc : list) { String key = desc.getValue().toPlainString(); assert orderMap != null; if (orderMap.containsKey(key)) { orderModelList.addAll(orderMap.get(key)); orderMap.remove(key); } } if (CollectionUtils.isEmpty(orderModelList)) { return; } System.out.println("本次执行的列表Desc"); System.out.println(JSONObject.toJSONString(orderModelList)); // 根据订单的类型发送消息 // 2:开多6:爆仓平多 // 10:止盈平空11:止损平多 for (OrderModel model : orderModelList) { // 开空止盈 List kkzyList = new ArrayList(); // 开多止损 List kdzsList = new ArrayList(); // 爆仓 List bcList = new ArrayList(); // 开多委托 List wtkdList = new ArrayList(); switch (model.getType()) { case 2: wtkdList.add(model); break; case 6: bcList.add(model); break; case 10: kkzyList.add(model); break; case 11: kdzsList.add(model); break; default: break; } // 发送消息 if (CollectionUtils.isNotEmpty(kkzyList)) { String kkzy = JSONObject.toJSONString(kkzyList); orderProducer.sendLessPro(kkzy); } if (CollectionUtils.isNotEmpty(kdzsList)) { String kdzs = JSONObject.toJSONString(kdzsList); orderProducer.sendMoreLoss(kdzs); } if (CollectionUtils.isNotEmpty(bcList)) { orderProducer.sendCoinout(JSONObject.toJSONString(bcList)); } if (CollectionUtils.isNotEmpty(wtkdList)) { orderProducer.sendLimit(JSONObject.toJSONString(wtkdList)); } } } } }