package com.xcong.excoin.rabbit.pricequeue; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.rabbit.producer.OrderProducer; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.PriorityBlockingQueue; @Component public class WebsocketPriceService { @Autowired OrderProducer orderProducer; /** * @param symbol * @param price */ public void comparePriceAsc(String symbol, String price) { // 比较价格 正序的 最小元素在头部 开多止盈 开空止损等 PriorityBlockingQueue queue = PricePriorityQueue.getQueueAsc(symbol); // 最小的 AscBigDecimal b = queue.peek(); // 当前价 AscBigDecimal now = new AscBigDecimal(price); List list = new ArrayList(); // 找到所有比当前价格大的 是需要操作的 if (b != null && b.compareTo(now) <= 0) { // 可以操作 System.out.println("当前价格:" + price + "---正序---" + "队列价格:" + b.getValue().toPlainString()+" time:"+new Date()); while (queue.peek() != null && queue.peek().compareTo(now) <= 0) { // 可以发送消息操作 list.add(queue.remove()); } } if(CollectionUtils.isNotEmpty(list)){ dealAscPriceOrderAndSenMq(list,symbol); } } public void comparePriceDesc(String symbol, String price) { // 比较价格 倒叙的 开多止损 开空止盈 PriorityBlockingQueue queue = PricePriorityQueue.getQueueDesc(symbol); // 最大价格 DescBigDecimal b = queue.peek(); // 当前价格 DescBigDecimal now = new DescBigDecimal(price); List list = new ArrayList(); // 找到比当前价格还大的就是需要操作的 开多止损 // 即最大的币当前价大 那么需要开多止损 if (b != null && b.compareTo(now) <= 0) { // 可以操作 System.out.println("当前价格:" + price + "---倒序操作---" + "队列:" + b.getValue().toPlainString()+" time:"+new Date()); while (queue.peek() != null && queue.peek().compareTo(now) <= 0) { // 可以发送消息操作 list.add(queue.remove()); } } if(CollectionUtils.isNotEmpty(list)){ dealDescPriceOrderAndSenMq(list,symbol); } } // 处理消息 正序的 包括 // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空 public void dealAscPriceOrderAndSenMq(List list, String symbol) { if (CollectionUtils.isNotEmpty(list)) { // 根据不同类型发送不同消息 1 倒序 2 正序 List orderModelList = new ArrayList(); // 3 正序 Map> orderMap = PricePriorityQueue.getOrderMap(symbol, 3); // 根据价格查询到对应的订单 for (AscBigDecimal asc : list) { String key = asc.getValue().toPlainString(); if(orderMap.containsKey(key)){ orderModelList.addAll(orderMap.get(key)); orderMap.remove(key); } } if(CollectionUtils.isEmpty(orderModelList)){ return; } System.out.println("本次执行的列表ASC"); System.out.println(JSONObject.toJSONString(orderModelList)); // 根据订单的类型发送消息 // 3:开空 7:爆仓平空 // 9:止盈平多 12:止损平空 for (OrderModel model : orderModelList) { List kkzsList = null; List kdzyList = null; List bcList = null; List wtkkList = null; switch (model.getType()) { case 3: if (wtkkList == null) { wtkkList = new ArrayList(); } wtkkList.add(model); break; case 7: if (bcList == null) { bcList = new ArrayList(); } bcList.add(model); break; case 9: if (kdzyList == null) { kdzyList = new ArrayList(); } kdzyList.add(model); break; case 12: if (kkzsList == null) { kkzsList = new ArrayList(); } kkzsList.add(model); break; } // 发送消息 if(CollectionUtils.isNotEmpty(kkzsList)){ String kkzs= JSONObject.toJSONString(kkzsList); orderProducer.sendLessLoss(kkzs); } if(CollectionUtils.isNotEmpty(kdzyList)){ String kdzy = JSONObject.toJSONString(kdzyList); orderProducer.sendMorePro(kdzy); } if(CollectionUtils.isNotEmpty(bcList)){ orderProducer.sendCoinout(JSONObject.toJSONString(bcList)); } if(CollectionUtils.isNotEmpty(wtkkList)){ orderProducer.sendLimit(JSONObject.toJSONString(wtkkList)); } } } } // 处理消息 正序的 包括 // 1:买入委托2:开多3:开空4:平多5:平空6:爆仓平多7:爆仓平空8:撤单9:止盈平多10:止盈平空11:止损平多12:止损平空 public void dealDescPriceOrderAndSenMq(List list, String symbol) { if (CollectionUtils.isNotEmpty(list)) { // 根据不同类型发送不同消息 1 倒序 2 正序 List orderModelList = new ArrayList(); Map> orderMap = PricePriorityQueue.getOrderMap(symbol, 2); // 根据价格查询到对应的订单 for (DescBigDecimal desc : list) { String key = desc.getValue().toPlainString(); if(orderMap.containsKey(key)){ orderModelList.addAll(orderMap.get(key)); orderMap.remove(key); } } if(CollectionUtils.isEmpty(orderModelList)){ return; } System.out.println("本次执行的列表Desc"); System.out.println(JSONObject.toJSONString(orderModelList)); // 根据订单的类型发送消息 // 2:开多6:爆仓平多 // 10:止盈平空11:止损平多 for (OrderModel model : orderModelList) { List kkzyList = null; List kdzsList = null; List bcList = null; List wtkdList = null; switch (model.getType()) { case 2: if (wtkdList == null) { wtkdList = new ArrayList(); } wtkdList.add(model); break; case 6: if (bcList == null) { bcList = new ArrayList(); } bcList.add(model); break; case 10: if (kkzyList == null) { kkzyList = new ArrayList(); } kkzyList.add(model); break; case 11: if (kdzsList == null) { kdzsList = new ArrayList(); } kdzsList.add(model); break; } // 发送消息 if(CollectionUtils.isNotEmpty(kkzyList)){ String kkzy= JSONObject.toJSONString(kkzyList); orderProducer.sendLessPro(kkzy); } if(CollectionUtils.isNotEmpty(kdzsList)){ String kdzs = JSONObject.toJSONString(kdzsList); orderProducer.sendMoreLoss(kdzs); } if(CollectionUtils.isNotEmpty(bcList)){ orderProducer.sendCoinout(JSONObject.toJSONString(bcList)); } if(CollectionUtils.isNotEmpty(wtkdList)){ orderProducer.sendLimit(JSONObject.toJSONString(wtkdList)); } } } } }