zainali5120
2020-09-16 d174d6963d62b3bd176f9e7ba3cf0d7f75a91b69
撮合交易代码提交
16 files modified
4 files added
548 ■■■■ changed files
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java 3 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java 5 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 79 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java 30 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/MarketService.java 6 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java 79 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java 38 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/CoinTrader.java 91 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/ExchangeTrade.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java 11 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/CandlestickModel.java 18 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/CandlestickResult.java 11 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/NewCandlestick.java 9 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/SubResultModel.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java 109 ●●●● patch | view | raw | blame | history
src/main/resources/application-test.yml 13 ●●●● patch | view | raw | blame | history
src/main/resources/application.yml 2 ●●● patch | view | raw | blame | history
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml 7 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -5,6 +5,7 @@
import javax.annotation.Resource;
import javax.validation.Valid;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -71,7 +72,7 @@
            return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
        }else{
            return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
            return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
        }
    }
    
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -16,7 +16,7 @@
    Result enterTransactionPageOfWalletCoin(String symbol);
    Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price,
            BigDecimal amount);
            BigDecimal amount,BigDecimal entrustAmount);
    /**
     *  需要撮合交易的币种提交买卖单
@@ -51,4 +51,7 @@
    public void handleOrder(List<ExchangeTrade> trades);
    void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
                    BigDecimal amount,BigDecimal entrustAmount);
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -161,13 +161,18 @@
    @Override
    @Transactional
    public Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount) {
    public Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount,BigDecimal entrustAmount) {
        //获取用户ID
        Long memberId = LoginUserUtils.getAppLoginUser().getId();
        BigDecimal nowPriceinBigDecimal = price;
        //查询当前价
        BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
        if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type) && OrderCoinsEntity.TRADETYPE_MARKETPRICE.equals(tradeType)){
            amount = entrustAmount.divide(nowPrice,BigDecimal.ROUND_DOWN);
        }
        // 处理市价
        // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
        symbol = symbol.toUpperCase();
        MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
@@ -319,7 +324,7 @@
    @Override
    public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) {
        //获取用户ID
        Long memberId = 13L;
        Long memberId = LoginUserUtils.getAppLoginUser().getId();
        BigDecimal nowPriceinBigDecimal = price;
        //查询当前价
        //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
@@ -818,11 +823,16 @@
            if(exchangeTrade==null){
                continue;
            }
            // 量
            BigDecimal amount = exchangeTrade.getAmount();
            // 买单ID
            Long buyOrderId = exchangeTrade.getBuyOrderId();
            // 成交金额(usdt)
            BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
            int direction = exchangeTrade.getDirection();
            // 成交价
            BigDecimal price = exchangeTrade.getPrice();
            // 卖单
            Long sellOrderId = exchangeTrade.getSellOrderId();
            // 买卖单都需要处理
            // 买单
@@ -918,4 +928,67 @@
            }
        }
    }
    @Override
    public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
                           BigDecimal amount,BigDecimal entrustAmount) {
        //获取用户ID
        Long memberId = 10L;
        BigDecimal nowPriceinBigDecimal = price;
        //查询当前价
        //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
        // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
        symbol = symbol.toUpperCase();
        // 手续费用(手续费=建仓价X数量X手续费率)
        BigDecimal closingPrice = BigDecimal.ZERO ;
        // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
        // 首先将单插入到数据库主表(委托表)
        // 创建订单
        OrderCoinsEntity order = new OrderCoinsEntity();
        //根据委托类型生成不同数据
        // 如果是限价交易直接插入主表数据
        order.setMemberId(memberId);
        order.setOrderNo(generateSimpleSerialno(memberId.toString()));
        order.setOrderType(type);
        order.setSymbol(symbol);
        //order.setMarkPrice(nowPrice);
        // 成交量 先设置为0
        order.setDealCnt(BigDecimal.ZERO);
        // 成交价
        //order.setDealPrice(price);
        // 成交金额
        order.setDealAmount(BigDecimal.ZERO);
        order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING);
        order.setTradeType(tradeType);
        // 手续费
        order.setFeeAmount(closingPrice);
        if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){
            // 限价 是需要价格和数量 可以得到成交金额
            // 下单量
            order.setEntrustCnt(amount);
            // 下单价格
            order.setEntrustPrice(price);
            order.setEntrustAmount(amount.multiply(price));
        }else{
            if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
                // 市价 只有金额
                order.setEntrustAmount(entrustAmount);
            }else{
                // 下单量
                order.setEntrustCnt(amount);
                // 下单价格
                order.setEntrustPrice(price);
                order.setEntrustAmount(amount.multiply(price));
            }
        }
        orderCoinsDao.insert(order);
        // 加入到撮合
        CoinTrader trader = factory.getTrader(symbol);
        trader.trade(order);
    }
}
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -62,7 +62,7 @@
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        long firstTimeOfToday = calendar.getTimeInMillis();
        String period = "1min";
        logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
        //logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
        List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period);
        coinThumb = new CoinThumb();
        synchronized (coinThumb) {
@@ -202,7 +202,7 @@
                    //处理K线
                    processTrade(currentKLine, exchangeTrade);
                    //处理今日概况信息
                    logger.debug("处理今日概况信息");
                    //logger.debug("处理今日概况信息");
                    handleThumb(exchangeTrade);
                    //存储并推送成交信息
                    handleTradeStorage(exchangeTrade);
@@ -212,7 +212,7 @@
    }
    public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) {
        if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) {
        if (kLine.getClose()==null || kLine.getClose().compareTo(BigDecimal.ZERO)==0) {
            //第一次设置K线值
            kLine.setOpen(exchangeTrade.getPrice());
            kLine.setHigh(exchangeTrade.getPrice());
@@ -224,15 +224,19 @@
            kLine.setClose(exchangeTrade.getPrice());
        }
        kLine.setCount(kLine.getCount() + 1);
        kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount()));
        if(kLine.getVolume()==null){
            kLine.setVolume(BigDecimal.ZERO);
        }
        kLine.setAmount(kLine.getVolume().add(exchangeTrade.getAmount()));
        BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount());
       // kLine.setTurnover(kLine.getTurnover().add(turnover));
        kLine.setVolume(kLine.getVolume().add(turnover));
        //kLine.setTimestamp(System.currentTimeMillis());
    }
    public void handleTradeStorage(ExchangeTrade exchangeTrade) {
        for (MarketHandler storage : handlers) {
            storage.handleTrade(symbol, exchangeTrade, coinThumb);
        }
//        for (MarketHandler storage : handlers) {
//            storage.handleTrade(symbol, exchangeTrade, coinThumb);
//        }
    }
    public void handleKLineStorage(Candlestick kLine) {
@@ -243,7 +247,7 @@
    }
    public void handleThumb(ExchangeTrade exchangeTrade) {
        logger.info("handleThumb symbol = {}", this.symbol);
        //logger.info("handleThumb symbol = {}", this.symbol);
        synchronized (coinThumb) {
            if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
                //第一笔交易记为开盘价
@@ -265,7 +269,7 @@
                coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP));
            }
            if ("USDT".equalsIgnoreCase(baseCoin)) {
                logger.info("setUsdRate", exchangeTrade.getPrice());
               // logger.info("setUsdRate", exchangeTrade.getPrice());
                coinThumb.setUsdRate(exchangeTrade.getPrice());
            } else {
@@ -304,7 +308,7 @@
        calendar.add(field, -range);
        String fromTime = df.format(calendar.getTime());
        long startTick = calendar.getTimeInMillis();
        System.out.println("time range from " + fromTime + " to " + endTime);
        //System.out.println("time range from " + fromTime + " to " + endTime);
        List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick);
        Candlestick kLine = new Candlestick();
@@ -340,7 +344,7 @@
            kLine.setLow(coinThumb.getClose());
            kLine.setHigh(coinThumb.getClose());
        }
        logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
        //logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
        service.saveKLine(symbol,period, kLine);
        // 生成一个对应的新K线 后续的交易会更新这个最新K线数据
        Candlestick newKline = new Candlestick();
@@ -352,6 +356,8 @@
        newKline.setOpen(kLine.getClose());
        newKline.setVolume(BigDecimal.ZERO);
        newKline.setHigh(kLine.getClose());
        calendar.add(field, 2*range);
        newKline.setTimestamp(calendar.getTimeInMillis());
        currentKlineMap.put(period,newKline);
        // 存储昨日K线
src/main/java/com/xcong/excoin/processor/MarketService.java
@@ -75,20 +75,20 @@
//        Query query = new Query(criteria).with(sort);
//
//        return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol);
        return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
        return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeEnd));
        // return null;
    }
    public void saveKLine(String symbol, String period, Candlestick kLine) {
        // 先获取
        String key = "KINE_" + symbol + "_" + period;
        String key = "KINE_" + symbol + "/USDT_" + period;
        Object data = redisUtils.get(key);
        List list = new ArrayList();
        if (data != null) {
            list = (List) data;
        }
        list.add(kLine);
        redisUtils.set("KINE_" + symbol + "_" + period, list);
        redisUtils.set(key, list);
        //  mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod());
    }
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -1,10 +1,12 @@
package com.xcong.excoin.quartz.job;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.huobi.client.model.Candlestick;
import com.huobi.client.model.enums.CandlestickInterval;
import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao;
import com.xcong.excoin.modules.coin.dao.OrderCoinsDao;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
@@ -17,6 +19,7 @@
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
@@ -30,6 +33,7 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 *  开启撮合交易
@@ -44,12 +48,14 @@
    @Resource
    private OrderCoinsDao orderCoinsDao;
    @Resource
    private OrderCoinDealDao orderCoinDealDao;
    @Resource
    private CoinTraderFactory factory;
    @Resource
    private OrderCoinService coinService;
    private RedisUtils redisUtils;
    @Resource
    private MarketService marketService;
@@ -98,7 +104,21 @@
        processor.initializeThumb();
        //processor.initializeUsdRate();
        processor.setIsHalt(false);
        List<ExchangeTrade> nekk = orderCoinDealDao.selectOrderCoinDealByTime("NEKK", null, null);
        processor.process(nekk);
        String symbolUsdt = symbol;
        if(!symbol.contains("USDT")){
            symbolUsdt = symbol+"/USDT";
        }
        String key = "NEW_KINE_{}";
        key = StrUtil.format(key, symbolUsdt);
        Object o = redisUtils.get(key);
        if(o!=null){
            Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
            ((DefaultCoinProcessor) processor).setCurrentKlineMap(currentKlineMap);
        }
        processorFactory.addProcessor(symbol, processor);
    }
}
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -1,10 +1,16 @@
package com.xcong.excoin.quartz.job;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao;
import com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.processor.CoinProcessorFactory;
import com.xcong.excoin.trade.TradePlateModel;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.websocket.CandlestickModel;
import com.xcong.excoin.websocket.NewCandlestick;
import com.xcong.excoin.websocket.TradePlateSendWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -13,9 +19,11 @@
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Random;
/**
 * 生成各时间段的K线信息
@@ -28,45 +36,41 @@
    private CoinProcessorFactory processorFactory;
    @Resource
    private TradePlateSendWebSocket plateSendWebSocket;
    private OrderCoinService orderCoinService;
    @Resource
    private RedisUtils redisUtils;
    @Scheduled(cron = "0/10 * * * * *")
    public void tradePlate(){
        redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20));
        Candlestick candlestick = new Candlestick();
        candlestick.setOpen(new BigDecimal("10.33"));
        candlestick.setHigh(new BigDecimal("15.23"));
        candlestick.setVolume(new BigDecimal("12121.34"));
        candlestick.setLow(new BigDecimal("8.234"));
        candlestick.setAmount(new BigDecimal("1199"));
        candlestick.setTimestamp(1599840000);
        candlestick.setId(1599840000L);
        candlestick.setCount(100002);
        candlestick.setClose(new BigDecimal("12.2323"));
        //redisUtils.set("NEKK/USDT",candlestick);
        // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
        TradePlateModel tradePlateModel = new TradePlateModel();
        List<BigDecimal> buy;
        List<BigDecimal> sell;
        for(int i=0;i<5;i++){
            buy = new ArrayList<>(2);
            buy.add(new BigDecimal(Math.random()*i));
            buy.add(new BigDecimal(Math.random()*i));
            tradePlateModel.getBuy().add(buy);
            sell = new ArrayList<>(2);
            sell.add(new BigDecimal(Math.random()*i*2));
            sell.add(new BigDecimal(Math.random()*i*2));
            tradePlateModel.getSell().add(sell);
    @Scheduled(cron = "0/1 * * * * *")
    public void test(){
//        for(int i=1;i<=2;i++){
//            OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
//            detail.setMemberId(13L);
//            //detail.setOrderId(111);
//            detail.setOrderNo("tete");
//            detail.setOrderType(1);
//            detail.setTradeType(1);
//            detail.setSymbol("NEKK");
//            detail.setSymbolCnt(new BigDecimal(10));
//            detail.setEntrustPrice(new BigDecimal(10));
//            detail.setDealPrice(new BigDecimal(i*10*Math.random()));
//            detail.setDealAmount(new BigDecimal(50));
//            detail.setFeeAmount(new BigDecimal(1));
//            detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
//            orderCoinDealDao.insert(detail);
//        }
        Random random = new Random();
        Integer type = OrderCoinsDealEntity.ORDERTYPE_BUY;
        Integer tradeType = OrderCoinsDealEntity.TRADETYPE_FIXEDPRICE;
        int i = random.nextInt(100);
        if(i==0){
            i=10;
        }
        log.info("准备发送消息");
        plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null);
        plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
        plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
        plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
        BigDecimal price =   new BigDecimal(i);
        orderCoinService.initOrders("NEKK",type,tradeType,price,new BigDecimal(2),null);
        orderCoinService.initOrders("NEKK",OrderCoinsDealEntity.ORDERTYPE_SELL,tradeType,price,new BigDecimal(2),null);
    }
    /**
     * 每分钟定时器,处理分钟K线
@@ -75,7 +79,7 @@
    public void handle5minKLine(){
        Calendar calendar = Calendar.getInstance();
        log.debug("分钟K线:{}",calendar.getTime());
        //log.debug("分钟K线:{}",calendar.getTime());
        //将秒、微秒字段置为0
        calendar.set(Calendar.SECOND,0);
        calendar.set(Calendar.MILLISECOND,0);
@@ -84,9 +88,10 @@
        int hour = calendar.get(Calendar.HOUR_OF_DAY);
        processorFactory.getProcessorMap().forEach((symbol,processor)->{
            if(!processor.isStopKline()) {
                log.debug("生成{}分钟k线:{}",symbol);
                //log.debug("生成{}分钟k线:{}",symbol);
                //生成1分钟K线
                processor.autoGenerate();
                processor.generateKLine(1, Calendar.MINUTE, time);
                //更新24H成交量
                processor.update24HVolume(time);
                if(minute%5 == 0) {
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -7,9 +7,13 @@
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.modules.exchange.service.HandleKlineService;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.websocket.CandlestickModel;
import com.xcong.excoin.websocket.NewCandlestick;
import com.xcong.excoin.websocket.TradePlateSendWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@@ -45,7 +49,7 @@
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
    public void tradePlate(String content) {
        log.info("#盘口信息消费者---->{}#", content);
        //log.info("#盘口信息消费者---->{}#", content);
        tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null);
    }
@@ -55,7 +59,7 @@
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE)
    public void handleTradeExchange(String content) {
        log.info("#---->{}#", content);
       // log.info("#处理订单---->{}#", content);
        List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
        // 去掉空的  暂时这样
        Iterator<ExchangeTrade> iterator = exchangeTrades.iterator();
@@ -63,6 +67,9 @@
            if(iterator.next()==null){
                iterator.remove();
            }
        }
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        // 处理K线 并更新最新价
        handleKlineService.handleExchangeOrderToKline(exchangeTrades);
@@ -77,8 +84,33 @@
        Object o = redisUtils.get(key);
        Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
        Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
        for(Map.Entry<String, Candlestick> map : entries){
            tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
            String ch = "market.{}.kline.{}";
            Candlestick value = map.getValue();
            String key1 = map.getKey();
            String chKey = key1;
            if(key1.equals("1hour")){
                chKey = "60min";
            }
            // 转换
            NewCandlestick newCandlestick= new NewCandlestick();
            String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt);
            ch = StrUtil.format(ch, nekkusdt,chKey);
            newCandlestick.setCh(ch);
            CandlestickModel model = new CandlestickModel();
            model.setVol(value.getVolume());
            model.setLow(value.getLow());
            model.setOpen(value.getOpen());
            model.setHigh(value.getHigh());
            model.setCount(value.getCount());
            model.setAmount(value.getAmount());
            model.setId(value.getTimestamp()/1000);
            model.setTimestamp(value.getTimestamp()/1000);
            model.setClose(value.getClose());
            newCandlestick.setTick(model);
            tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null);
        }
        // 处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
@@ -37,9 +37,9 @@
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("#----->{}#", correlationData);
        //log.info("#----->{}#", correlationData);
        if (ack) {
            log.info("success");
            //log.info("success");
        } else {
            log.info("--->{}", cause);
        }
src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -53,7 +53,7 @@
     * 初始化交易线程
     */
    public void initialize() {
        logger.info("init CoinTrader for symbol {}", symbol);
        //logger.info("init CoinTrader for symbol {}", symbol);
        //买单队列价格降序排列
        buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder());
        //卖单队列价格升序排列
@@ -107,7 +107,7 @@
        if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            return;
        }
        logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
        //logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
        LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue;
        synchronized (list) {
            list.addLast(exchangeOrder);
@@ -136,7 +136,7 @@
        }
        //logger.info("trade order={}",exchangeOrder);
        if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) {
            logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
            //logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
            return;
        }
        // 如果
@@ -203,7 +203,10 @@
                    OrderCoinsEntity matchOrder = orderIterator.next();
                    //处理匹配
                    ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
                    exchangeTrades.add(trade);
                    if(trade!=null){
                        exchangeTrades.add(trade);
                    }
                    //判断匹配单是否完成
                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //当前匹配的订单完成交易,删除该订单
@@ -304,7 +307,7 @@
                    if (trade != null) {
                        exchangeTrades.add(trade);
                    }
                    //判断匹配单是否完成
                    //判断匹配单是否完成 TODO
                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //当前匹配的订单完成交易,删除该订单
                        orderIterator.remove();
@@ -364,12 +367,12 @@
     */
    private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) {
        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
//            BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover());
//            if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
//                    .compareTo(BigDecimal.ZERO)==0){
//                order.setTurnover(order.getAmount());
//                return leftTurnover;
//            }
            BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
            if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
                    .compareTo(BigDecimal.ZERO)==0){
                //order.setDealAmount(order.getEntrustAmount());
                return leftTurnover;
            }
        }
        return BigDecimal.ZERO;
    }
@@ -400,7 +403,7 @@
        availAmount = calculateTradedAmount(matchOrder, dealPrice);
        //计算成交量 取少的
        BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount);
        logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
        //logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
        //如果成交额为0说明剩余额度无法成交,退出
        if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) {
            return null;
@@ -415,6 +418,21 @@
        focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount));
        // 用户单成交金额
        focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover));
        // 判断两个单是否完成
        if(matchOrder.getEntrustAmount()!=null && matchOrder.getEntrustAmount().compareTo(matchOrder.getDealAmount())<=0){
            matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        if(matchOrder.getEntrustCnt()!=null && matchOrder.getEntrustCnt().compareTo(matchOrder.getDealCnt())<=0){
            matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        if(focusedOrder.getEntrustAmount()!=null && focusedOrder.getEntrustAmount().compareTo(focusedOrder.getDealAmount())<=0){
            focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        if(focusedOrder.getEntrustCnt()!=null && focusedOrder.getEntrustCnt().compareTo(focusedOrder.getDealCnt())<=0){
            focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
        }
        //创建成交记录
        ExchangeTrade exchangeTrade = new ExchangeTrade();
@@ -471,9 +489,6 @@
                    //orderCoinService.handleOrder(subTrades);
                }
            } else {
                trades.forEach(e -> {
                    System.out.println(e);
                });
                exchangeProducer.sendHandleTrade(JSON.toJSONString(trades));
                //orderCoinService.handleOrder(trades);
                // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades));
@@ -551,13 +566,57 @@
    }
    /**
     * 发送盘口变化消息
     *
     * @param
     */
    public String sendTradePlateMessage() {
        //防止并发引起数组越界,造成盘口倒挂 TODO
        List<List<BigDecimal>> plate;
        List<BigDecimal> plateItem;
        TradePlateModel tradePlateModel = new TradePlateModel();
        // 转换格式
        if (buyTradePlate != null && buyTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = buyTradePlate.getItems();
            for (TradePlateItem item : items) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = item.getPrice();
                BigDecimal amount = item.getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
            }
            tradePlateModel.setBuy(plate);
        }
        if (sellTradePlate != null && sellTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = sellTradePlate.getItems();
            for (TradePlateItem item : items) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = item.getPrice();
                BigDecimal amount = item.getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
            }
            tradePlateModel.setSell(plate);
        }
        // 盘口发生变化通知TODO
        return JSON.toJSONString(tradePlateModel);
        //exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel));
    }
    /**
     * 取消委托订单
     *
     * @param exchangeOrder
     * @return
     */
    public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) {
        logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
        //logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
        if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //处理市价单
            Iterator<OrderCoinsEntity> orderIterator;
src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
@@ -5,6 +5,7 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Date;
/**
 * 撮合交易信息
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -29,6 +29,17 @@
        }
    }
    public static String convertReverse(String symbol) {
        switch (symbol) {
            case "BTC/USDT":
                return "btcusdt";
            case "NEKK/USDT":
                return "nekkusdt";
            default:
                return null;
        }
    }
    public static String convertToKey(String symbol) {
        switch (symbol) {
            case "BTC/USDT":
src/main/java/com/xcong/excoin/websocket/CandlestickModel.java
New file
@@ -0,0 +1,18 @@
package com.xcong.excoin.websocket;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class CandlestickModel {
    private Long id;
    private BigDecimal amount;
    private long count;
    private BigDecimal open;
    private BigDecimal close;
    private BigDecimal low;
    private BigDecimal high;
    private BigDecimal vol;
    private long timestamp;
}
src/main/java/com/xcong/excoin/websocket/CandlestickResult.java
New file
@@ -0,0 +1,11 @@
package com.xcong.excoin.websocket;
import lombok.Data;
import java.util.List;
@Data
public class CandlestickResult {
    private String rep;
    private List<CandlestickModel> data;
}
src/main/java/com/xcong/excoin/websocket/NewCandlestick.java
New file
@@ -0,0 +1,9 @@
package com.xcong.excoin.websocket;
import lombok.Data;
@Data
public class NewCandlestick {
    private String ch;
    private CandlestickModel tick;
}
src/main/java/com/xcong/excoin/websocket/SubResultModel.java
New file
@@ -0,0 +1,10 @@
package com.xcong.excoin.websocket;
import lombok.Data;
@Data
public class SubResultModel {
    private String id;
    private String status = "ok";
    private String subbed;
}
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -2,8 +2,11 @@
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.utils.SpringContextHolder;
@@ -15,9 +18,8 @@
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,12 +55,19 @@
    @OnClose
    public void onClose(Session session) {
        onlineCount.decrementAndGet(); // 在线数减1
//        Collection<Map<String, Session>> values = tradeplateClients.values();
//        if(CollectionUtils.isNotEmpty(values)){
//            for(Map<String,Session> map : values){
//                map.remove(session.getId());
//            }
//        }
        Collection<Map<String, Session>> values = tradeplateClients.values();
        if(CollectionUtils.isNotEmpty(values)){
            for(Map<String,Session> map : values){
                map.remove(session.getId());
            }
        }
        Collection<Map<String, Session>> klineClientsValues = klineClients.values();
        if(CollectionUtils.isNotEmpty(klineClientsValues)){
            for(Map<String,Session> map : klineClientsValues){
                map.remove(session.getId());
            }
        }
        log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }
@@ -69,13 +78,13 @@
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
        //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
        // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
        //}
        JSONObject jsonObject = JSON.parseObject(message);
        // 盘口的判断
        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
            log.info("订阅盘口消息:{}", session.getId());
            //log.info("订阅盘口消息:{}", session.getId());
            String sub = jsonObject.get("sub").toString();
            String symbol = sub.split("\\.")[1];
            symbol = CoinTypeConvert.convert(symbol);
@@ -86,11 +95,34 @@
                map.put(session.getId(), session);
                tradeplateClients.put(symbol, map);
            }
            // 发送一次盘口
            CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class);
            // 发送订阅消息
            String nekk = factory.getTrader("NEKK").sendTradePlateMessage();
            SubResultModel subResultModel = new SubResultModel();
            subResultModel.setId("nekkusdt");
            subResultModel.setSubbed(sub);
            synchronized (session){
                try {
                    session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
           synchronized (session){
               try {
                   session.getBasicRemote().sendText(nekk);
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
        }
        // 取消盘口订阅
        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
            // `market.${symbol}.kline.${strPeriod}
            log.info("取消订阅盘口消息:{}", session.getId());
            //log.info("取消订阅盘口消息:{}", session.getId());
            String unsub = jsonObject.get("unsub").toString();
            String[] split = unsub.split("\\.");
            String symbol = split[1];
@@ -112,38 +144,48 @@
        // 取消订阅 {unsub: xxx(标识)}
        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
            // 订阅
            log.info("订阅最新K线消息:{}", session.getId());
            String sub = jsonObject.get("sub").toString();
            log.info("订阅最新K线消息:{}", sub);
            String[] split = sub.split("\\.");
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String period = split[3];
            if("60min".equals(period)){
                period = "1hour";
            }
            String key = symbol + "-" + period;
            log.info("最新K线key:{}", key);
            if (klineClients.containsKey(key)) {
                // 有这个币种K线
                Map<String, Session> stringSessionMap = klineClients.get(key);
                if (!stringSessionMap.containsKey(session.getId())) {
                    stringSessionMap.put(session.getId(), session);
                    log.info("放入最新K线Map:{}", key);
                }
            } else {
                Map<String, Session> stringSessionMap = new HashMap<>();
                stringSessionMap.put(session.getId(), session);
                klineClients.put(key, stringSessionMap);
                log.info("放入最新K线Map:{}", key);
            }
        }
        // 取消订阅
        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
            // `market.${symbol}.kline.${strPeriod}
            log.info("取消订阅最新K消息:{}", session.getId());
            //log.info("取消订阅最新K消息:{}", session.getId());
            String unsub = jsonObject.get("unsub").toString();
            String[] split = unsub.split("\\.");
            String strPeriod = split[3];
            if("60min".equals(strPeriod)){
                strPeriod = "1hour";
            }
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String key = symbol + "-" + strPeriod;
            if (klineClients.containsKey(key)) {
                klineClients.get(key).remove(session.getId());
                //session.getAsyncRemote().sendText(message);
            }
        }
@@ -155,14 +197,38 @@
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String period = split[3];
            if("60min".equals(period)){
                period = "1hour";
            }
            //String key = symbol+"-"+period;
            // String key = "KINE_BCH/USDT_1week";
            String key = "KINE_{}_{}";
            // 币币k线数据
            key = StrUtil.format(key, symbol, period);
            //key = StrUtil.format(key, symbol, period);
            key = StrUtil.format(key, "NEKK/USDT", period);
            RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
            Object o = bean.get(key);
            sendMessageHistory(JSON.toJSONString(o), session);
            List<CandlestickModel> candlestickModels = new ArrayList<>();
            CandlestickResult result = new CandlestickResult();
            result.setRep(sub);
            if(o!=null){
                List<Candlestick> list = (List<Candlestick>)o;
                for(Candlestick candlestick : list){
                    CandlestickModel model = new CandlestickModel();
                    model.setAmount(candlestick.getAmount());
                    model.setClose(candlestick.getClose());
                    model.setCount(candlestick.getCount());
                    model.setHigh(candlestick.getHigh());
                    model.setId(candlestick.getTimestamp()/1000);
                    model.setOpen(candlestick.getOpen());
                    model.setLow(candlestick.getLow());
                    model.setVol(candlestick.getVolume());
                    candlestickModels.add(model);
                }
                result.setData(candlestickModels);
            }
            sendMessageHistory(JSON.toJSONString(result), session);
        }
    }
@@ -184,7 +250,7 @@
                Session toSession = sessionEntry.getValue();
                // 排除掉自己
                //if (!fromSession.getId().equals(toSession.getId())) {
                log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
               // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
                boolean open = toSession.isOpen();
                if (open) {
                    toSession.getAsyncRemote().sendText(message);
@@ -197,7 +263,7 @@
    }
    public void sendMessageHistory(String message, Session toSession) {
        log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
      //  log.info("服务端给客户端[{}]发送历史K线", toSession.getId());
        boolean open = toSession.isOpen();
        if (open) {
            toSession.getAsyncRemote().sendText(message);
@@ -205,19 +271,18 @@
    }
    public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
        String key = symbol + "-" + period;
        //log.info("发送最新K线[{}],数据[{}]",key,message);
        if (klineClients.containsKey(key)) {
            Map<String, Session> stringSessionMap = klineClients.get(key);
            for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
                Session toSession = sessionEntry.getValue();
                // 排除掉自己
                //if (!fromSession.getId().equals(toSession.getId())) {
                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                boolean open = toSession.isOpen();
                if (open) {
                    log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message);
                    toSession.getAsyncRemote().sendText(message);
                }
                //  }
            }
        }
    }
src/main/resources/application-test.yml
@@ -91,16 +91,17 @@
app:
  debug: false
  debug: true
  redis_expire: 3000
  kline-update-job: false
  newest-price-update-job: true
  newest-price-update-job: false
  #日线 该任务不能与最新价处于同一个服务器
  day-line: true
  other-job: true
  loop-job: true
  trade: false
  day-line: false
  other-job: false
  loop-job: false
  rabbit-consumer: true
  block-job: true
  block-job: false
aliyun:
  oss:
src/main/resources/application.yml
@@ -5,7 +5,7 @@
spring:
  profiles:
    active: dev
    active: test
  datasource:
    url: jdbc:mysql://rm-bp151tw8er79ig9kb5o.mysql.rds.aliyuncs.com:3306/db_biue?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: ctcoin_data
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -20,12 +20,15 @@
            deal_amount buyTurnover,
            deal_amount sellTurnover,
            order_type direction,
            create_time time
            UNIX_TIMESTAMP(create_time) time
          from coins_order_deal
          where symbol = #{symbol}
          and order_type  = 1
          and order_status = 3
          and create_time between #{startTime} and #{endTime}
          <if test="startTime != null and endTime != null">
              and create_time between #{startTime} and #{endTime}
          </if>
    </select>
    <select id="selectAllWalletCoinOrderBySymbol"
            resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">