package com.xcong.excoin.trade;
|
|
import com.alibaba.fastjson.JSON;
|
|
import com.alibaba.fastjson.JSONObject;
|
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
|
import com.xcong.excoin.modules.coin.service.OrderCoinService;
|
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
|
import org.apache.commons.collections.CollectionUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.math.BigDecimal;
|
import java.text.ParseException;
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
|
public class CoinTrader {
|
private String symbol;
|
private ExchangeProducer exchangeProducer;
|
//交易币种的精度
|
private int coinScale = 4;
|
//基币的精度
|
private int baseCoinScale = 4;
|
private Logger logger = LoggerFactory.getLogger(CoinTrader.class);
|
//买入限价订单链表,价格从高到低排列
|
private TreeMap<BigDecimal, MergeOrder> buyLimitPriceQueue;
|
//卖出限价订单链表,价格从低到高排列
|
private TreeMap<BigDecimal, MergeOrder> sellLimitPriceQueue;
|
//买入市价订单链表,按时间从小到大排序
|
private LinkedList<OrderCoinsEntity> buyMarketQueue;
|
//卖出市价订单链表,按时间从小到大排序
|
private LinkedList<OrderCoinsEntity> sellMarketQueue;
|
//卖盘盘口信息
|
private TradePlate sellTradePlate;
|
//买盘盘口信息
|
private TradePlate buyTradePlate;
|
//是否暂停交易
|
private boolean tradingHalt = false;
|
private boolean ready = false;
|
|
private String clearTime;
|
|
private SimpleDateFormat dateTimeFormat;
|
|
|
public CoinTrader(String symbol) {
|
this.symbol = symbol;
|
initialize();
|
}
|
|
/**
|
* 初始化交易线程
|
*/
|
public void initialize() {
|
//logger.info("init CoinTrader for symbol {}", symbol);
|
//买单队列价格降序排列
|
buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder());
|
//卖单队列价格升序排列
|
this.sellLimitPriceQueue = new TreeMap<>(Comparator.naturalOrder());
|
this.buyMarketQueue = new LinkedList<>();
|
this.sellMarketQueue = new LinkedList<>();
|
this.sellTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_SELL);
|
this.buyTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_BUY);
|
this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
}
|
|
/**
|
* 增加限价订单到队列,买入单按从价格高到低排,卖出单按价格从低到高排
|
*
|
* @param exchangeOrder
|
*/
|
public void addLimitPriceOrder(OrderCoinsEntity exchangeOrder) {
|
if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
|
return;
|
}
|
//logger.info("addLimitPriceOrder,orderId = {}", exchangeOrder.getOrderId());
|
TreeMap<BigDecimal, MergeOrder> list;
|
if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
list = buyLimitPriceQueue;
|
// 添加盘口信息 TODO
|
buyTradePlate.add(exchangeOrder);
|
if (ready) {
|
// 发送盘口信息
|
sendTradePlateMessage(buyTradePlate, sellTradePlate);
|
}
|
} else {
|
list = sellLimitPriceQueue;
|
sellTradePlate.add(exchangeOrder);
|
if (ready) {
|
sendTradePlateMessage(buyTradePlate, sellTradePlate);
|
}
|
}
|
synchronized (list) {
|
MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
|
if (mergeOrder == null) {
|
mergeOrder = new MergeOrder();
|
mergeOrder.add(exchangeOrder);
|
list.put(exchangeOrder.getEntrustPrice(), mergeOrder);
|
} else {
|
mergeOrder.add(exchangeOrder);
|
}
|
}
|
}
|
|
public void addMarketPriceOrder(OrderCoinsEntity exchangeOrder) {
|
if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
|
return;
|
}
|
//logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
|
LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue;
|
synchronized (list) {
|
list.addLast(exchangeOrder);
|
}
|
}
|
|
public void trade(List<OrderCoinsEntity> orders) throws ParseException {
|
if (tradingHalt) {
|
return;
|
}
|
for (OrderCoinsEntity order : orders) {
|
trade(order);
|
}
|
}
|
|
|
/**
|
* 主动交易输入的订单,交易不完成的会输入到队列
|
*
|
* @param exchangeOrder
|
* @throws ParseException
|
*/
|
public void trade(OrderCoinsEntity exchangeOrder) {
|
if (tradingHalt) {
|
return;
|
}
|
//logger.info("trade order={}",exchangeOrder);
|
if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) {
|
//logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
|
return;
|
}
|
// 如果
|
if(OrderCoinsEntity.ORDERTYPE_BUY==exchangeOrder.getOrderType()){
|
if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) {
|
return;
|
}
|
}else{
|
if (exchangeOrder.getEntrustCnt().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt()).compareTo(BigDecimal.ZERO) <= 0) {
|
return;
|
}
|
}
|
|
|
TreeMap<BigDecimal, MergeOrder> limitPriceOrderList;
|
LinkedList<OrderCoinsEntity> marketPriceOrderList;
|
if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
// 买单 和限价卖单以及市价市场卖单队列对比 完成撮合
|
limitPriceOrderList = sellLimitPriceQueue;
|
marketPriceOrderList = sellMarketQueue;
|
} else {
|
limitPriceOrderList = buyLimitPriceQueue;
|
marketPriceOrderList = buyMarketQueue;
|
}
|
if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
|
//logger.info(">>>>>市价单>>>交易与限价单交易");
|
//与限价单交易
|
matchMarketPriceWithLPList(limitPriceOrderList, exchangeOrder);
|
} else if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
|
//限价单价格必须大于0
|
if (exchangeOrder.getEntrustPrice().compareTo(BigDecimal.ZERO) <= 0) {
|
return;
|
}
|
|
//logger.info(">>>>>限价单>>>交易与限价单交易");
|
//先与限价单交易
|
matchLimitPriceWithLPList(limitPriceOrderList, exchangeOrder, false);
|
if (exchangeOrder.getEntrustCnt().compareTo(exchangeOrder.getDealCnt()) > 0) {
|
//logger.info(">>>>限价单未交易完>>>>与市价单交易>>>>");
|
//后与市价单交易
|
matchLimitPriceWithMPList(marketPriceOrderList, exchangeOrder);
|
}
|
}
|
}
|
|
/**
|
* 限价委托单与限价队列匹配
|
*
|
* @param lpList 限价对手单队列
|
* @param focusedOrder 交易订单
|
*/
|
public void matchLimitPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder, boolean canEnterList) {
|
List<ExchangeTrade> exchangeTrades = new ArrayList<>();
|
List<OrderCoinsEntity> completedOrders = new ArrayList<>();
|
synchronized (lpList) {
|
Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
|
boolean exitLoop = false;
|
while (!exitLoop && mergeOrderIterator.hasNext()) {
|
Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
|
MergeOrder mergeOrder = entry.getValue();
|
Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
|
//买入单需要匹配的价格不大于委托价,否则退出
|
if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) > 0) {
|
break;
|
}
|
//卖出单需要匹配的价格不小于委托价,否则退出
|
if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) < 0) {
|
break;
|
}
|
while (orderIterator.hasNext()) {
|
OrderCoinsEntity matchOrder = orderIterator.next();
|
//处理匹配
|
ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
|
if(trade!=null){
|
exchangeTrades.add(trade);
|
}
|
|
//判断匹配单是否完成
|
if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
|
//当前匹配的订单完成交易,删除该订单
|
orderIterator.remove();
|
completedOrders.add(matchOrder);
|
}
|
//判断交易单是否完成
|
if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
|
//交易完成
|
completedOrders.add(focusedOrder);
|
//退出循环
|
exitLoop = true;
|
break;
|
}
|
}
|
if (mergeOrder.size() == 0) {
|
mergeOrderIterator.remove();
|
}
|
}
|
}
|
//如果还没有交易完,订单压入列表中
|
if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 && canEnterList) {
|
addLimitPriceOrder(focusedOrder);
|
}
|
//每个订单的匹配批量推送
|
handleExchangeTrade(exchangeTrades);
|
if (completedOrders.size() > 0) {
|
orderCompleted(completedOrders);
|
TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
|
sendTradePlateMessage(buyTradePlate, sellTradePlate);
|
}
|
}
|
|
/**
|
* 限价委托单与市价队列匹配
|
*
|
* @param mpList 市价对手单队列
|
* @param focusedOrder 交易订单
|
*/
|
public void matchLimitPriceWithMPList(LinkedList<OrderCoinsEntity> mpList, OrderCoinsEntity focusedOrder) {
|
List<ExchangeTrade> exchangeTrades = new ArrayList<>();
|
List<OrderCoinsEntity> completedOrders = new ArrayList<>();
|
synchronized (mpList) {
|
Iterator<OrderCoinsEntity> iterator = mpList.iterator();
|
while (iterator.hasNext()) {
|
OrderCoinsEntity matchOrder = iterator.next();
|
ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
|
if (trade != null) {
|
exchangeTrades.add(trade);
|
}
|
//判断匹配单是否完成,市价单amount为成交量
|
if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
|
iterator.remove();
|
completedOrders.add(matchOrder);
|
}
|
//判断吃单是否完成,判断成交量是否完成
|
if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
|
//交易完成
|
completedOrders.add(focusedOrder);
|
//退出循环
|
break;
|
}
|
}
|
}
|
//如果还没有交易完,订单压入列表中
|
if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0) {
|
addLimitPriceOrder(focusedOrder);
|
}
|
//每个订单的匹配批量推送
|
handleExchangeTrade(exchangeTrades);
|
orderCompleted(completedOrders);
|
}
|
|
|
/**
|
* 市价委托单与限价对手单列表交易
|
*
|
* @param lpList 限价对手单列表
|
* @param focusedOrder 待交易订单
|
*/
|
public void matchMarketPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder) {
|
List<ExchangeTrade> exchangeTrades = new ArrayList<>();
|
List<OrderCoinsEntity> completedOrders = new ArrayList<>();
|
// 加锁 同步
|
synchronized (lpList) {
|
Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
|
boolean exitLoop = false;
|
while (!exitLoop && mergeOrderIterator.hasNext()) {
|
Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
|
MergeOrder mergeOrder = entry.getValue();
|
Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
|
while (orderIterator.hasNext()) {
|
OrderCoinsEntity matchOrder = orderIterator.next();
|
//处理匹配 用户单和买卖盘内的委托单进行比较,得到可以交易的量
|
// 例如 市价买单 1000个 卖1 500 卖2 300 卖3 500 此时从卖1开始吃到卖3只剩300个
|
ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
|
if (trade != null) {
|
exchangeTrades.add(trade);
|
}
|
//判断匹配单是否完成 TODO
|
if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
|
//当前匹配的订单完成交易,删除该订单
|
orderIterator.remove();
|
completedOrders.add(matchOrder);
|
}
|
//判断焦点订单是否完成
|
if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
|
completedOrders.add(focusedOrder);
|
//退出循环
|
exitLoop = true;
|
break;
|
}
|
}
|
if (mergeOrder.size() == 0) {
|
mergeOrderIterator.remove();
|
}
|
}
|
}
|
//如果还没有交易完,订单压入列表中,市价买单按成交量算
|
if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0
|
|| focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) {
|
addMarketPriceOrder(focusedOrder);
|
}
|
//每个订单的匹配批量推送
|
handleExchangeTrade(exchangeTrades);
|
if (completedOrders.size() > 0) {
|
orderCompleted(completedOrders);
|
TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
|
sendTradePlateMessage(buyTradePlate, sellTradePlate);
|
}
|
}
|
|
/**
|
* 计算委托单剩余可成交的数量
|
*
|
* @param order 委托单
|
* @param dealPrice 成交价
|
* @return
|
*/
|
private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) {
|
if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
|
//剩余成交量 TODO ?
|
// 委托量-成交量=剩余量
|
BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
|
return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN);
|
} else {
|
return order.getEntrustCnt().subtract(order.getDealCnt());
|
}
|
}
|
|
/**
|
* 调整市价单剩余成交额,当剩余成交额不足时设置订单完成
|
*
|
* @param order
|
* @param dealPrice
|
* @return
|
*/
|
private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) {
|
if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
|
BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
|
if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
|
.compareTo(BigDecimal.ZERO)==0){
|
//order.setDealAmount(order.getEntrustAmount());
|
return leftTurnover;
|
}
|
}
|
return BigDecimal.ZERO;
|
}
|
|
/**
|
* 处理两个匹配的委托订单
|
*
|
* @param focusedOrder 焦点单
|
* @param matchOrder 匹配单
|
* @return
|
*/
|
private ExchangeTrade processMatch(OrderCoinsEntity focusedOrder, OrderCoinsEntity matchOrder) {
|
//需要交易的数量,成交量,成交价,可用数量
|
BigDecimal needAmount, dealPrice, availAmount;
|
//如果匹配单是限价单,则以其价格为成交价
|
if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
|
dealPrice = matchOrder.getEntrustPrice();
|
} else {
|
dealPrice = focusedOrder.getEntrustPrice();
|
}
|
//成交价必须大于0
|
if (dealPrice.compareTo(BigDecimal.ZERO) <= 0) {
|
return null;
|
}
|
// 需要的成交量
|
needAmount = calculateTradedAmount(focusedOrder, dealPrice);
|
// 队列单可提供的成交量
|
availAmount = calculateTradedAmount(matchOrder, dealPrice);
|
//计算成交量 取少的
|
BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount);
|
//logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
|
//如果成交额为0说明剩余额度无法成交,退出
|
if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) {
|
return null;
|
}
|
|
//计算成交额,成交额要保留足够精度
|
BigDecimal turnover = tradedAmount.multiply(dealPrice);
|
matchOrder.setDealCnt(matchOrder.getDealCnt().add(tradedAmount));
|
// 成交金额
|
matchOrder.setDealAmount(matchOrder.getDealAmount().add(turnover));
|
// 用户单成交量
|
focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount));
|
// 用户单成交金额
|
focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover));
|
|
// 判断两个单是否完成
|
if(matchOrder.getEntrustAmount()!=null && matchOrder.getEntrustAmount().compareTo(matchOrder.getDealAmount())<=0){
|
matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
|
}
|
if(matchOrder.getEntrustCnt()!=null && matchOrder.getEntrustCnt().compareTo(matchOrder.getDealCnt())<=0){
|
matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
|
}
|
|
if(focusedOrder.getEntrustAmount()!=null && focusedOrder.getEntrustAmount().compareTo(focusedOrder.getDealAmount())<=0){
|
focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
|
}
|
if(focusedOrder.getEntrustCnt()!=null && focusedOrder.getEntrustCnt().compareTo(focusedOrder.getDealCnt())<=0){
|
focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
|
}
|
|
//创建成交记录
|
ExchangeTrade exchangeTrade = new ExchangeTrade();
|
exchangeTrade.setSymbol(symbol);
|
// 成交量
|
exchangeTrade.setAmount(tradedAmount);
|
exchangeTrade.setDirection(focusedOrder.getOrderType());
|
// 成交价格
|
exchangeTrade.setPrice(dealPrice);
|
// 成交金额
|
exchangeTrade.setBuyTurnover(turnover);
|
exchangeTrade.setSellTurnover(turnover);
|
//校正市价单剩余成交额
|
if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == focusedOrder.getTradeType() && focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
BigDecimal adjustTurnover = adjustMarketOrderTurnover(focusedOrder, dealPrice);
|
exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
|
} else if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
BigDecimal adjustTurnover = adjustMarketOrderTurnover(matchOrder, dealPrice);
|
exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
|
}
|
|
if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
exchangeTrade.setBuyOrderId(focusedOrder.getId());
|
exchangeTrade.setSellOrderId(matchOrder.getId());
|
} else {
|
exchangeTrade.setBuyOrderId(matchOrder.getId());
|
exchangeTrade.setSellOrderId(focusedOrder.getId());
|
}
|
|
exchangeTrade.setTime(Calendar.getInstance().getTimeInMillis());
|
if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
|
if (matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
buyTradePlate.remove(matchOrder, tradedAmount);
|
} else {
|
sellTradePlate.remove(matchOrder, tradedAmount);
|
}
|
}
|
return exchangeTrade;
|
}
|
|
|
// 这里是交易完的单
|
public void handleExchangeTrade(List<ExchangeTrade> trades) {
|
//logger.info("handleExchangeTrade:{}", trades);
|
if (trades.size() > 0) {
|
int maxSize = 1000;
|
//发送消息,key为交易对符号
|
if (trades.size() > maxSize) {
|
int size = trades.size();
|
for (int index = 0; index < size; index += maxSize) {
|
int length = (size - index) > maxSize ? maxSize : size - index;
|
List<ExchangeTrade> subTrades = trades.subList(index, index + length);
|
exchangeProducer.sendHandleTrade(JSON.toJSONString(subTrades));
|
//orderCoinService.handleOrder(subTrades);
|
}
|
} else {
|
exchangeProducer.sendHandleTrade(JSON.toJSONString(trades));
|
//orderCoinService.handleOrder(trades);
|
// kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades));
|
}
|
|
}
|
}
|
|
/**
|
* 订单完成,执行消息通知,订单数超1000个要拆分发送
|
*
|
* @param orders
|
*/
|
public void orderCompleted(List<OrderCoinsEntity> orders) {
|
//logger.info("orderCompleted ,order={}",orders);
|
if (orders.size() > 0) {
|
int maxSize = 1000;
|
if (orders.size() > maxSize) {
|
int size = orders.size();
|
for (int index = 0; index < size; index += maxSize) {
|
int length = (size - index) > maxSize ? maxSize : size - index;
|
List<OrderCoinsEntity> subOrders = orders.subList(index, index + length);
|
// TODO 通知订单完成
|
//kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders));
|
}
|
} else {
|
// kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders));
|
}
|
}
|
}
|
|
/**
|
* 发送盘口变化消息
|
*
|
* @param buyTradePlate sellTradePlate
|
*/
|
public void sendTradePlateMessage(TradePlate buyTradePlate, TradePlate sellTradePlate) {
|
//防止并发引起数组越界,造成盘口倒挂
|
List<List<BigDecimal>> plate;
|
List<BigDecimal> plateItem;
|
TradePlateModel tradePlateModel = new TradePlateModel();
|
// 转换格式
|
if (buyTradePlate != null && buyTradePlate.getItems() != null) {
|
plate = new ArrayList<>();
|
LinkedList<TradePlateItem> items = buyTradePlate.getItems();
|
for (int i = items.size() - 1; i >= 0; i--) {
|
plateItem = new ArrayList<>(2);
|
BigDecimal price = items.get(i).getPrice();
|
BigDecimal amount = items.get(i).getAmount();
|
plateItem.add(price);
|
plateItem.add(amount);
|
plate.add(plateItem);
|
}
|
tradePlateModel.setBuy(plate);
|
}
|
|
if (sellTradePlate != null && sellTradePlate.getItems() != null) {
|
plate = new ArrayList<>();
|
LinkedList<TradePlateItem> items = sellTradePlate.getItems();
|
for (int i = items.size() - 1; i >= 0; i--) {
|
plateItem = new ArrayList<>(2);
|
BigDecimal price = items.get(i).getPrice();
|
BigDecimal amount = items.get(i).getAmount();
|
plateItem.add(price);
|
plateItem.add(amount);
|
plate.add(plateItem);
|
}
|
tradePlateModel.setSell(plate);
|
}
|
|
// 盘口发生变化通知
|
exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel));
|
}
|
|
/**
|
* 发送盘口变化消息
|
*
|
* @param
|
*/
|
public String sendTradePlateMessage() {
|
//防止并发引起数组越界,造成盘口倒挂
|
List<List<BigDecimal>> plate;
|
List<BigDecimal> plateItem;
|
TradePlateModel tradePlateModel = new TradePlateModel();
|
// 转换格式
|
if (buyTradePlate != null && buyTradePlate.getItems() != null) {
|
plate = new ArrayList<>();
|
LinkedList<TradePlateItem> items = buyTradePlate.getItems();
|
for (int i = items.size() - 1; i >= 0; i--) {
|
plateItem = new ArrayList<>(2);
|
BigDecimal price = items.get(i).getPrice();
|
BigDecimal amount = items.get(i).getAmount();
|
plateItem.add(price);
|
plateItem.add(amount);
|
plate.add(plateItem);
|
}
|
tradePlateModel.setBuy(plate);
|
}
|
|
if (sellTradePlate != null && sellTradePlate.getItems() != null) {
|
plate = new ArrayList<>();
|
LinkedList<TradePlateItem> items = sellTradePlate.getItems();
|
for (int i = items.size() - 1; i >= 0; i--) {
|
plateItem = new ArrayList<>(2);
|
BigDecimal price = items.get(i).getPrice();
|
BigDecimal amount = items.get(i).getAmount();
|
plateItem.add(price);
|
plateItem.add(amount);
|
plate.add(plateItem);
|
}
|
tradePlateModel.setSell(plate);
|
}
|
|
return JSON.toJSONString(tradePlateModel);
|
}
|
|
/**
|
* 取消委托订单
|
*
|
* @param exchangeOrder
|
* @return
|
*/
|
public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) {
|
//logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
|
if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
|
//处理市价单
|
Iterator<OrderCoinsEntity> orderIterator;
|
List<OrderCoinsEntity> list = null;
|
if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
list = this.buyMarketQueue;
|
} else {
|
list = this.sellMarketQueue;
|
}
|
synchronized (list) {
|
orderIterator = list.iterator();
|
while ((orderIterator.hasNext())) {
|
OrderCoinsEntity order = orderIterator.next();
|
if (order.getId().equals(exchangeOrder.getId())) {
|
orderIterator.remove();
|
onRemoveOrder(order);
|
return order;
|
}
|
}
|
}
|
} else {
|
//处理限价单
|
TreeMap<BigDecimal, MergeOrder> list = null;
|
Iterator<MergeOrder> mergeOrderIterator;
|
if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
list = this.buyLimitPriceQueue;
|
} else {
|
list = this.sellLimitPriceQueue;
|
}
|
synchronized (list) {
|
MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
|
if (mergeOrder != null) {
|
Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
|
while (orderIterator.hasNext()) {
|
OrderCoinsEntity order = orderIterator.next();
|
if (order.getId().equals(exchangeOrder.getId())) {
|
orderIterator.remove();
|
if (mergeOrder.size() == 0) {
|
list.remove(exchangeOrder.getEntrustPrice());
|
}
|
onRemoveOrder(order);
|
return order;
|
}
|
}
|
}
|
}
|
}
|
return null;
|
}
|
|
public void onRemoveOrder(OrderCoinsEntity order) {
|
if (order.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
|
if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
|
buyTradePlate.remove(order);
|
sendTradePlateMessage(buyTradePlate, sellTradePlate);
|
} else {
|
sellTradePlate.remove(order);
|
sendTradePlateMessage(buyTradePlate, sellTradePlate);
|
}
|
}
|
}
|
|
|
public TradePlate getTradePlate(ExchangeOrderDirection direction) {
|
if (direction == ExchangeOrderDirection.BUY) {
|
return buyTradePlate;
|
} else {
|
return sellTradePlate;
|
}
|
}
|
|
|
/**
|
* 查询交易器里的订单
|
*
|
* @param orderId
|
* @param type
|
* @param direction
|
* @return
|
*/
|
public OrderCoinsEntity findOrder(String orderId, int type, int direction) {
|
if (type == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
|
LinkedList<OrderCoinsEntity> list;
|
if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
|
list = this.buyMarketQueue;
|
} else {
|
list = this.sellMarketQueue;
|
}
|
synchronized (list) {
|
Iterator<OrderCoinsEntity> orderIterator = list.iterator();
|
while ((orderIterator.hasNext())) {
|
OrderCoinsEntity order = orderIterator.next();
|
if (order.getId().equals(orderId)) {
|
return order;
|
}
|
}
|
}
|
} else {
|
TreeMap<BigDecimal, MergeOrder> list;
|
if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
|
list = this.buyLimitPriceQueue;
|
} else {
|
list = this.sellLimitPriceQueue;
|
}
|
synchronized (list) {
|
Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = list.entrySet().iterator();
|
while (mergeOrderIterator.hasNext()) {
|
Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
|
MergeOrder mergeOrder = entry.getValue();
|
Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
|
while ((orderIterator.hasNext())) {
|
OrderCoinsEntity order = orderIterator.next();
|
if (order.getId().equals(orderId)) {
|
return order;
|
}
|
}
|
}
|
}
|
}
|
return null;
|
}
|
|
public TreeMap<BigDecimal, MergeOrder> getBuyLimitPriceQueue() {
|
return buyLimitPriceQueue;
|
}
|
|
public LinkedList<OrderCoinsEntity> getBuyMarketQueue() {
|
return buyMarketQueue;
|
}
|
|
public TreeMap<BigDecimal, MergeOrder> getSellLimitPriceQueue() {
|
return sellLimitPriceQueue;
|
}
|
|
public LinkedList<OrderCoinsEntity> getSellMarketQueue() {
|
return sellMarketQueue;
|
}
|
|
|
public void setCoinScale(int scale) {
|
this.coinScale = scale;
|
}
|
|
public void setBaseCoinScale(int scale) {
|
this.baseCoinScale = scale;
|
}
|
|
public boolean isTradingHalt() {
|
return this.tradingHalt;
|
}
|
|
/**
|
* 暂停交易,不接收新的订单
|
*/
|
public void haltTrading() {
|
this.tradingHalt = true;
|
}
|
|
/**
|
* 恢复交易
|
*/
|
public void resumeTrading() {
|
this.tradingHalt = false;
|
}
|
|
public void stopTrading() {
|
//TODO:停止交易,取消当前所有订单
|
}
|
|
public boolean getReady() {
|
return this.ready;
|
}
|
|
public void setReady(boolean ready) {
|
this.ready = ready;
|
}
|
|
public void setClearTime(String clearTime) {
|
this.clearTime = clearTime;
|
}
|
|
public int getLimitPriceOrderCount(ExchangeOrderDirection direction) {
|
int count = 0;
|
TreeMap<BigDecimal, MergeOrder> queue = direction == ExchangeOrderDirection.BUY ? buyLimitPriceQueue : sellLimitPriceQueue;
|
Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = queue.entrySet().iterator();
|
while (mergeOrderIterator.hasNext()) {
|
Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
|
MergeOrder mergeOrder = entry.getValue();
|
count += mergeOrder.size();
|
}
|
return count;
|
}
|
|
|
public ExchangeProducer getExchangeProducer() {
|
return exchangeProducer;
|
}
|
|
public void setExchangeProducer(ExchangeProducer exchangeProducer) {
|
this.exchangeProducer = exchangeProducer;
|
}
|
}
|