xiaoyong931011
2021-12-08 f5e6133809c553cfd9fb28ee61019927c547c374
src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -3,9 +3,7 @@
import com.alibaba.fastjson.JSON;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,8 +35,7 @@
    //是否暂停交易
    private boolean tradingHalt = false;
    private boolean ready = false;
    //交易对信息
    //private ExchangeCoinPublishType publishType;
    private String clearTime;
    private SimpleDateFormat dateTimeFormat;
@@ -53,7 +50,7 @@
     * 初始化交易线程
     */
    public void initialize() {
        logger.info("init CoinTrader for symbol {}", symbol);
        //logger.info("init CoinTrader for symbol {}", symbol);
        //买单队列价格降序排列
        buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder());
        //卖单队列价格升序排列
@@ -107,7 +104,7 @@
        if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            return;
        }
        logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
        //logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
        LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue;
        synchronized (list) {
            list.addLast(exchangeOrder);
@@ -136,13 +133,20 @@
        }
        //logger.info("trade order={}",exchangeOrder);
        if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) {
            logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
            //logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
            return;
        }
        // 如果
        if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) {
            return;
        if(OrderCoinsEntity.ORDERTYPE_BUY==exchangeOrder.getOrderType()){
            if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) {
                return;
            }
        }else{
            if (exchangeOrder.getEntrustCnt().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt()).compareTo(BigDecimal.ZERO) <= 0) {
                return;
            }
        }
        TreeMap<BigDecimal, MergeOrder> limitPriceOrderList;
        LinkedList<OrderCoinsEntity> marketPriceOrderList;
@@ -203,7 +207,10 @@
                    OrderCoinsEntity matchOrder = orderIterator.next();
                    //处理匹配
                    ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
                    exchangeTrades.add(trade);
                    if(trade!=null){
                        exchangeTrades.add(trade);
                    }
                    //判断匹配单是否完成
                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //当前匹配的订单完成交易,删除该订单
@@ -251,7 +258,6 @@
            while (iterator.hasNext()) {
                OrderCoinsEntity matchOrder = iterator.next();
                ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
                logger.info(">>>>>" + trade);
                if (trade != null) {
                    exchangeTrades.add(trade);
                }
@@ -324,8 +330,11 @@
            }
        }
        //如果还没有交易完,订单压入列表中,市价买单按成交量算
        if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0
                || focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) {
        if ((focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0)
                || (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0)) {
            logger.info("市价单未交易完成:#{}"+JSON.toJSONString(focusedOrder));
            // 打印此时的限价买单
            logger.info("此时的买单:#{}"+JSON.toJSONString(lpList));
            addMarketPriceOrder(focusedOrder);
        }
        //每个订单的匹配批量推送
@@ -346,7 +355,7 @@
     */
    private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) {
        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //剩余成交量 TODO ?
            //剩余成交量
            // 委托量-成交量=剩余量
            BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
            return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN);
@@ -364,12 +373,12 @@
     */
    private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) {
        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
//            BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover());
//            if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
//                    .compareTo(BigDecimal.ZERO)==0){
//                order.setTurnover(order.getAmount());
//                return leftTurnover;
//            }
            BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
            if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
                    .compareTo(BigDecimal.ZERO)==0){
                order.setDealAmount(order.getEntrustAmount());
                return leftTurnover;
            }
        }
        return BigDecimal.ZERO;
    }
@@ -400,7 +409,7 @@
        availAmount = calculateTradedAmount(matchOrder, dealPrice);
        //计算成交量 取少的
        BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount);
        logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
        //logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
        //如果成交额为0说明剩余额度无法成交,退出
        if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) {
            return null;
@@ -416,6 +425,7 @@
        // 用户单成交金额
        focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover));
        //创建成交记录
        ExchangeTrade exchangeTrade = new ExchangeTrade();
        exchangeTrade.setSymbol(symbol);
@@ -430,10 +440,25 @@
        //校正市价单剩余成交额
        if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == focusedOrder.getTradeType() && focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            BigDecimal adjustTurnover = adjustMarketOrderTurnover(focusedOrder, dealPrice);
            exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
        } else if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            //exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
        }
        if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            BigDecimal adjustTurnover = adjustMarketOrderTurnover(matchOrder, dealPrice);
            exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
            //exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
        }
        // 判断两个单是否完成
        if(matchOrder.getEntrustAmount()!=null &&matchOrder.getEntrustAmount().compareTo(BigDecimal.ZERO)>0 && matchOrder.getEntrustAmount().compareTo(matchOrder.getDealAmount())<=0){
            matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        if(matchOrder.getEntrustCnt()!=null &&matchOrder.getEntrustCnt().compareTo(BigDecimal.ZERO)>0 && matchOrder.getEntrustCnt().compareTo(matchOrder.getDealCnt())<=0){
            matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        if(focusedOrder.getEntrustAmount()!=null &&  focusedOrder.getEntrustAmount().compareTo(BigDecimal.ZERO)>0 && focusedOrder.getEntrustAmount().compareTo(focusedOrder.getDealAmount())<=0){
            focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        if(focusedOrder.getEntrustCnt()!=null &&focusedOrder.getEntrustCnt().compareTo(BigDecimal.ZERO)>0 && focusedOrder.getEntrustCnt().compareTo(focusedOrder.getDealCnt())<=0){
            focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
@@ -471,14 +496,10 @@
                    //orderCoinService.handleOrder(subTrades);
                }
            } else {
                trades.forEach(e -> {
                    System.out.println(e);
                });
                exchangeProducer.sendHandleTrade(JSON.toJSONString(trades));
                //orderCoinService.handleOrder(trades);
                // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades));
            }
            // 更新最新K线 TODO
        }
    }
@@ -497,11 +518,11 @@
                for (int index = 0; index < size; index += maxSize) {
                    int length = (size - index) > maxSize ? maxSize : size - index;
                    List<OrderCoinsEntity> subOrders = orders.subList(index, index + length);
                    // TODO 通知订单完成
                    //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders));
                    //  通知订单完成
                    exchangeProducer.sendCompleteMsg(JSON.toJSONString(subOrders));
                }
            } else {
                // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders));
                exchangeProducer.sendCompleteMsg(JSON.toJSONString(orders));
            }
        }
    }
@@ -512,7 +533,7 @@
     * @param buyTradePlate sellTradePlate
     */
    public void sendTradePlateMessage(TradePlate buyTradePlate, TradePlate sellTradePlate) {
        //防止并发引起数组越界,造成盘口倒挂 TODO
        //防止并发引起数组越界,造成盘口倒挂
        List<List<BigDecimal>> plate;
        List<BigDecimal> plateItem;
        TradePlateModel tradePlateModel = new TradePlateModel();
@@ -520,10 +541,10 @@
        if (buyTradePlate != null && buyTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = buyTradePlate.getItems();
            for (TradePlateItem item : items) {
            for (int i = items.size() - 1; i >= 0; i--) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = item.getPrice();
                BigDecimal amount = item.getAmount();
                BigDecimal price = items.get(i).getPrice();
                BigDecimal amount = items.get(i).getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
@@ -534,10 +555,10 @@
        if (sellTradePlate != null && sellTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = sellTradePlate.getItems();
            for (TradePlateItem item : items) {
            for (int i = items.size() - 1; i >= 0; i--) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = item.getPrice();
                BigDecimal amount = item.getAmount();
                BigDecimal price = items.get(i).getPrice();
                BigDecimal amount = items.get(i).getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
@@ -545,9 +566,50 @@
            tradePlateModel.setSell(plate);
        }
        // 盘口发生变化通知TODO
        // 盘口发生变化通知
        exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel));
    }
    /**
     * 发送盘口变化消息
     *
     * @param
     */
    public String sendTradePlateMessage() {
        //防止并发引起数组越界,造成盘口倒挂
        List<List<BigDecimal>> plate;
        List<BigDecimal> plateItem;
        TradePlateModel tradePlateModel = new TradePlateModel();
        // 转换格式
        if (buyTradePlate != null && buyTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = buyTradePlate.getItems();
            for (int i = items.size() - 1; i >= 0; i--) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = items.get(i).getPrice();
                BigDecimal amount = items.get(i).getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
            }
            tradePlateModel.setBuy(plate);
        }
        if (sellTradePlate != null && sellTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = sellTradePlate.getItems();
            for (int i = items.size() - 1; i >= 0; i--) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = items.get(i).getPrice();
                BigDecimal amount = items.get(i).getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
            }
            tradePlateModel.setSell(plate);
        }
        return JSON.toJSONString(tradePlateModel);
    }
    /**
@@ -557,7 +619,7 @@
     * @return
     */
    public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) {
        logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
        //logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
        if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //处理市价单
            Iterator<OrderCoinsEntity> orderIterator;