zainali5120
2020-09-13 df1716a9abacac95261d686bdf0776bc7d6deca2
撮合交易代码提交
15 files modified
23 files added
2888 ■■■■■ changed files
pom.xml 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java 3 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java 65 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java 16 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java 8 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java 17 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 290 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java 14 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java 56 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/CoinProcessor.java 54 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java 34 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/CoinThumb.java 26 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java 392 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/MarketHandler.java 21 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/MarketService.java 125 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java 98 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java 159 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java 98 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java 47 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/CoinTrader.java 762 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java 40 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/ExchangeTrade.java 27 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/MergeOrder.java 42 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/TradePlate.java 181 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/TradePlateItem.java 11 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/TradePlateModel.java 13 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java 220 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/member/MemberWalletCoinDao.xml 16 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml 19 ●●●●● patch | view | raw | blame | history
src/test/java/com/xcong/excoin/GuijiTest.java patch | view | raw | blame | history
pom.xml
@@ -76,6 +76,11 @@
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <!-- websocket -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>org.springframework.security</groupId>-->
src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
@@ -14,7 +14,8 @@
    ,BCH("BCH", "BCH/USDT")
    ,EOS("EOS", "EOS/USDT")
    ,XRP("XRP", "XRP/USDT")
    ,ETC("ETC", "ETC/USDT");
    ,ETC("ETC", "ETC/USDT")
    ,NEKK("NEKK", "NEKK/USDT");
    private String name;
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -29,6 +29,11 @@
    public static final String EXCHANGE_A = "biue-exchange-A";
    /**
     * 撮合交易
     */
    public static final String EXCHANGE_B = "biue-exchange-B";
    // 开多止盈队列
    public static final String QUEUE_MOREPRO = "QUEUE_MOREPRO_NEW";
@@ -51,6 +56,12 @@
    // 平仓队列
    public static final String QUEUE_CLOSETRADE = "QUEUE_CLOSETRADE_NEW";
    // 盘口队列
    public static final String QUEUE_TRADE_PLATE = "QUEUE_TRADE_PLATE";
    // 处理交易
    public static final String QUEUE_HANDLE_TRADE = "QUEUE_HANDLE_TRADE";
    // 开多止盈路由键
    public static final String ROUTINGKEY_MOREPRO = "ROUTINGKEY_MOREPRO";
@@ -71,6 +82,12 @@
    public static final String ROUTINGKEY_PRICEOPERATE = "ROUTINGKEY_PRICEOPERATE";
    // 平仓路由
    public static final String ROUTINGKEY_CLOSETRADE = "ROUTINGKEY_CLOSETRADE";
    // 盘口理路由
    public static final String ROUTINGKEY_TRADE_PLATE = "ROUTINGKEY_TRADE_PLATE";
    // 交易订单处理
    public static final String ROUTINGKEY_HANDLE_TRADE = "ROUTINGKEY_HANDLE_TRADE";
    @Resource
    private ConnectionFactory connectionFactory;
@@ -94,6 +111,7 @@
    public DirectExchange defaultExchange() {
        return new DirectExchange(EXCHANGE_ONE);
    }
    @Bean
    public Queue testQueue() {
@@ -204,6 +222,27 @@
    /**
     * 盘口推送
     *
     * @return
     */
    @Bean
    public Queue queuePlateTrade() {
        return new Queue(QUEUE_TRADE_PLATE, true);
    }
    /**
     * 交易订单处理
     *
     * @return
     */
    @Bean
    public Queue queueHandleTrade() {
        return new Queue(QUEUE_HANDLE_TRADE, true);
    }
    /**
     * 开多止盈
     *
     * @return
@@ -286,4 +325,30 @@
        return BindingBuilder.bind(queueCloseTrade()).to(orderExchange()).with(RabbitMqConfig.ROUTINGKEY_CLOSETRADE);
    }
    @Bean
    public DirectExchange matchTradeExchange() {
        return new DirectExchange(EXCHANGE_B);
    }
    /**
     * 盘口变化绑定
     *
     * @return
     */
    @Bean
    public Binding bindingPlateTrade() {
        return BindingBuilder.bind(queuePlateTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_TRADE_PLATE);
    }
    /**
     *  交易订单处理
     *
     * @return
     */
    @Bean
    public Binding bindingHandleTrade() {
        return BindingBuilder.bind(queueHandleTrade()).to(matchTradeExchange()).with(RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE);
    }
}
src/main/java/com/xcong/excoin/configurations/WebSocketConfig.java
New file
@@ -0,0 +1,16 @@
package com.xcong.excoin.configurations;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
    /**
     * 注入一个ServerEndpointExporter,该Bean会自动注册使用@ServerEndpoint注解申明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}
src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
@@ -53,6 +53,7 @@
                .antMatchers("/api/member/getAppVersionInfo").permitAll()
                .antMatchers("/api/orderCoin/searchSymbolResultList").permitAll()
                .antMatchers("/api/orderCoin/findCollect").permitAll()
                .antMatchers("/trade/**").permitAll()
                .anyRequest().authenticated()
                .and().apply(securityConfiguereAdapter());
    }
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -67,8 +67,13 @@
        Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType();
        BigDecimal price = submitSalesWalletCoinOrderDto.getPrice();
        BigDecimal amount = submitSalesWalletCoinOrderDto.getAmount();
        if("NEKK".equals(symbol)){
            return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
        }else{
        return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
    }
    }
    
    /**
     * 获取委托单数据
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
@@ -1,7 +1,9 @@
package com.xcong.excoin.modules.coin.dao;
import java.util.Date;
import java.util.List;
import com.xcong.excoin.trade.ExchangeTrade;
import org.apache.ibatis.annotations.Param;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
@@ -21,5 +23,6 @@
    IPage<OrderCoinsDealEntity> findAllWalletCoinOrderInPage(Page<OrderCoinsDealEntity> page,
            @Param("record") OrderCoinsDealEntity orderCoinsDealEntity);
    List<ExchangeTrade> selectOrderCoinDealByTime(@Param("symbol")String symbol, @Param("startTime")Date startTime, @Param("endTime")Date endTime);
    
}
src/main/java/com/xcong/excoin/modules/coin/entity/OrderCoinsEntity.java
@@ -57,6 +57,11 @@
     * 成交价
     */
    private BigDecimal dealPrice;
    /**
     *  市价委托时的委托金额
     */
    private BigDecimal entrustAmount;
    /**
     * 成交金额
     */
src/main/java/com/xcong/excoin/modules/coin/parameter/dto/SubmitSalesWalletCoinOrderDto.java
@@ -26,12 +26,16 @@
    private Integer tradeType;
    
    @Min(0)
    @NotNull(message = "数量不能为空")
    //@NotNull(message = "数量不能为空")
    @ApiModelProperty(value = "数量", example = "100")
    private BigDecimal amount;
    @NotNull(message = "建仓价不能为空")
    //@NotNull(message = "建仓价不能为空")
    @ApiModelProperty(value = "建仓价", example = "20.0000")
    private BigDecimal price;
    @ApiModelProperty(value = "市价时输入的总金额", example = "20.0000")
    private BigDecimal entrustAmount;
}
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -1,11 +1,13 @@
package com.xcong.excoin.modules.coin.service;
import java.math.BigDecimal;
import java.util.List;
import com.baomidou.mybatisplus.extension.service.IService;
import com.xcong.excoin.common.response.Result;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.parameter.dto.FindAllWalletCoinOrderDto;
import com.xcong.excoin.trade.ExchangeTrade;
public interface OrderCoinService extends IService<OrderCoinsEntity>{
    
@@ -15,6 +17,19 @@
    Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price,
            BigDecimal amount);
    /**
     *  需要撮合交易的币种提交买卖单
     * @param symbol
     * @param type
     * @param tradeType
     * @param price
     * @param amount
     * @param entrustAmount
     * @return
     */
    Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price,
            BigDecimal amount,BigDecimal entrustAmount);
    public Result getEntrustWalletCoinOrder(String symbol, Integer status);
@@ -34,4 +49,6 @@
    public void dealEntrustCoinOrder();
    public void handleOrder(List<ExchangeTrade> trades);
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -1,14 +1,9 @@
package com.xcong.excoin.modules.coin.service.impl;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import javax.annotation.Resource;
@@ -16,6 +11,10 @@
import com.xcong.excoin.modules.platform.entity.PlatformCnyUsdtExchangeEntity;
import com.xcong.excoin.modules.platform.entity.PlatformSymbolsCoinEntity;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.trade.ExchangeTrade;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@@ -79,6 +78,10 @@
    RedisUtils redisUtils;
    @Resource
    PlatformSymbolsCoinDao platformSymbolsCoinDao;
    @Resource
    private CoinTraderFactory factory;
    @Override
    public String generateSimpleSerialno(String userId) {
@@ -309,6 +312,174 @@
        record.setBalance(walletCoinUsdt.getAvailableBalance());
        memberAccountFlowEntityDao.insert(record);
        return Result.ok(MessageSourceUtils.getString("order_service_0011"));
    }
    @Override
    public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) {
        //获取用户ID
        Long memberId = 13L;
        BigDecimal nowPriceinBigDecimal = price;
        //查询当前价
        //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
        // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
        symbol = symbol.toUpperCase();
        MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
        if (ObjectUtil.isEmpty(walletCoin)) {
            return Result.fail(MessageSourceUtils.getString("order_service_0003"));
        }
        // 查询交易设置
        PlatformTradeSettingEntity tradeSetting = platformTradeSettingDao.findTradeSetting();
        if (ObjectUtil.isEmpty(tradeSetting)) {
            return Result.fail(MessageSourceUtils.getString("order_service_0009"));
        }
        // 手续费用(手续费=建仓价X数量X手续费率)
        BigDecimal closingPrice ;
        // 总费用分两种 1,限价交易 是价格*数量+手续费 2,市价交易 用户输入的金额+手续费
        //总费用 = 成交价*数量+手续费
        BigDecimal totalPayPrice ;
        if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType) ){
            // 限价
            closingPrice = price.multiply(amount).multiply(tradeSetting.getCoinFeeRatio());
            totalPayPrice = price.multiply(amount).add(closingPrice);
        }else{
            // 市价
            closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio());
            totalPayPrice = entrustAmount.add(closingPrice);
        }
       // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
        String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue();
        MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode);
        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
            //买入,所需总费用跟用户USDT金额进行比较
            BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance();
            if (totalPayPrice.compareTo(availableBalance) > 0) {
                return Result.fail(MessageSourceUtils.getString("order_service_0010"));
            }
        } else {
            //卖出,用户币是否足够
            BigDecimal availableBalance = walletCoin.getAvailableBalance();
            if (amount.compareTo(availableBalance) > 0) {
                return Result.fail(MessageSourceUtils.getString("order_service_0010"));
            }
        }
        // 首先将单插入到数据库主表(委托表)
        // 创建订单
        OrderCoinsEntity order = new OrderCoinsEntity();
        //根据委托类型生成不同数据
        // 如果是限价交易直接插入主表数据
        order.setMemberId(memberId);
        order.setOrderNo(generateSimpleSerialno(memberId.toString()));
        order.setOrderType(type);
        order.setSymbol(symbol);
        //order.setMarkPrice(nowPrice);
        // 成交量 先设置为0
        order.setDealCnt(BigDecimal.ZERO);
        // 成交价
        //order.setDealPrice(price);
        // 成交金额
        order.setDealAmount(BigDecimal.ZERO);
        order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING);
        order.setTradeType(tradeType);
        // 手续费
        order.setFeeAmount(closingPrice);
        if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){
                // 限价 是需要价格和数量 可以得到成交金额
                // 下单量
                order.setEntrustCnt(amount);
                // 下单价格
                order.setEntrustPrice(price);
                order.setEntrustAmount(amount.multiply(price));
        }else{
            if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
                // 市价 只有金额
                order.setEntrustAmount(entrustAmount);
            }else{
                // 下单量
                order.setEntrustCnt(amount);
                // 下单价格
                order.setEntrustPrice(price);
                order.setEntrustAmount(amount.multiply(price));
            }
        }
        orderCoinsDao.insert(order);
        //更新用户钱包信息
        //冻结相应的资产
        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
            //如果是买入,所对应的币种增加,USDT账户减少金额
            BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice);
            BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice);
            walletCoinUsdt.setAvailableBalance(availableBalance);
            walletCoinUsdt.setFrozenBalance(frozenBalance);
            memberWalletCoinDao.updateById(walletCoinUsdt);
        } else {
            //如果是卖出,币种减少,USDT增加
            BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount);
            BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount);
            walletCoin.setAvailableBalance(availableBalance);
            walletCoin.setFrozenBalance(frozenBalance);
            memberWalletCoinDao.updateById(walletCoin);
        }
        // 加入到撮合
        CoinTrader trader = factory.getTrader(symbol);
//        if(trader==null){
//
//             trader = new CoinTrader("NEKK");
//            //newTrader.setKafkaTemplate(kafkaTemplate);
//            //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
//            //newTrader.setCoinScale(coin.getCoinScale());
//            // newTrader.setPublishType(coin.getPublishType());
//            //newTrader.setClearTime(coin.getClearTime());
//
//            // 创建成功以后需要对未处理订单预处理
//            //log.info("======CoinTrader Process: " + symbol + "======");
//            List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
//            if(CollectionUtils.isNotEmpty(orders)){
//                List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
//                List<OrderCoinsEntity> completedOrders = new ArrayList<>();
//                orders.forEach(order1 -> {
//                    tradingOrders.add(order1);
//                });
//                try {
//                    trader.trade(tradingOrders);
//                } catch (ParseException e) {
//                    e.printStackTrace();
//                    // log.info("异常:trader.trade(tradingOrders);");
//                }
//            }
//
//            trader.setReady(true);
//            factory.addTrader(symbol, trader);
//        }
        trader.trade(order);
//        // 流水记录 TODO
//        MemberAccountFlowEntity record = new MemberAccountFlowEntity();
//        record.setMemberId(memberId);
//        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
//            record.setPrice(totalPayPrice.setScale(4, BigDecimal.ROUND_DOWN));
//            record.setSource(MemberAccountFlowEntity.SOURCE_BUY + symbol);
//            record.setRemark(MemberAccountFlowEntity.REMARK_BUY + symbol + ":" + amount);
//        } else {
//            record.setPrice(totalPayPrice.negate().setScale(4, BigDecimal.ROUND_DOWN));
//            record.setSource(MemberAccountFlowEntity.SOURCE_SALE + symbol);
//            record.setRemark(MemberAccountFlowEntity.REMARK_SALE + symbol + ":" + amount);
//        }
//        record.setSymbol(symbol);
//        record.setBalance(walletCoinUsdt.getAvailableBalance());
//
//        memberAccountFlowEntityDao.insert(record);
        return Result.ok(MessageSourceUtils.getString("order_service_0011"));
    }
@@ -638,4 +809,109 @@
            }
        }
    }
    public void handleOrder(List<ExchangeTrade> trades){
        // 处理撮合交易的订单
        for(ExchangeTrade exchangeTrade : trades){
            BigDecimal amount = exchangeTrade.getAmount();
            Long buyOrderId = exchangeTrade.getBuyOrderId();
            BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
            int direction = exchangeTrade.getDirection();
            BigDecimal price = exchangeTrade.getPrice();
            Long sellOrderId = exchangeTrade.getSellOrderId();
            // 买卖单都需要处理
            // 买单
            OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId);
            if(buyOrderCoinsEntity!=null){
                // 比较剩余的量
                BigDecimal dealAmount = buyOrderCoinsEntity.getDealAmount();
                // 单的总金额
                BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount();
                BigDecimal add = dealAmount.add(buyTurnover);
                // 创建一个完成的单
                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
                detail.setMemberId(buyOrderCoinsEntity.getMemberId());
                //detail.setOrderId(order.getId());
                detail.setOrderNo(buyOrderCoinsEntity.getOrderNo());
                detail.setOrderType(buyOrderCoinsEntity.getOrderType());
                detail.setTradeType(buyOrderCoinsEntity.getTradeType());
                detail.setSymbol(buyOrderCoinsEntity.getSymbol());
                detail.setSymbolCnt(amount);
                detail.setEntrustPrice(buyOrderCoinsEntity.getEntrustPrice());
                detail.setDealPrice(price);
                detail.setDealAmount(buyTurnover);
                //detail.setFeeAmount(closingPrice);
                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
                orderCoinDealDao.insert(detail);
                // 如果这个单成交完  更改状态
                if(add.compareTo(entrustAmount)>=0){
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(buyOrderId);
                    update.setDealAmount(entrustAmount);
                    update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                }else{
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(buyOrderId);
                    update.setDealAmount(add);
                    update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                }
            }
            // 卖单
            OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId);
            if(sellOrderCoinsEntity!=null){
                // 比较剩余的量
                BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt();
                // 单的总数量
                BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt();
                BigDecimal add = dealAmount.add(amount);
                // 创建一个完成的单
                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
                detail.setMemberId(sellOrderCoinsEntity.getMemberId());
                //detail.setOrderId(order.getId());
                detail.setOrderNo(sellOrderCoinsEntity.getOrderNo());
                detail.setOrderType(sellOrderCoinsEntity.getOrderType());
                detail.setTradeType(sellOrderCoinsEntity.getTradeType());
                detail.setSymbol(sellOrderCoinsEntity.getSymbol());
                detail.setSymbolCnt(amount);
                detail.setEntrustPrice(sellOrderCoinsEntity.getEntrustPrice());
                detail.setDealPrice(price);
                detail.setDealAmount(buyTurnover);
                //detail.setFeeAmount(closingPrice);
                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
                orderCoinDealDao.insert(detail);
                // 如果这个单成交完  更改状态
                if(add.compareTo(entrustCnt)>=0){
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(sellOrderId);
                    // 总成交额
                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
                    update.setDealCnt(entrustCnt);
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                }else{
                    // 未完成
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(sellOrderId);
                    // 总成交额
                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
                    update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount));
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                }
                // 卖币得到的usdt
                MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol());
                if(memberWalletCoinEntity!=null){
                    memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),buyTurnover,buyTurnover,null);
                }
            }
        }
    }
}
src/main/java/com/xcong/excoin/modules/exchange/service/HandleKlineService.java
New file
@@ -0,0 +1,14 @@
package com.xcong.excoin.modules.exchange.service;
import com.xcong.excoin.trade.ExchangeTrade;
import java.util.List;
public interface HandleKlineService {
    /**
     *  处理交易后的最新K线
     * @param trades
     */
    void handleExchangeOrderToKline(List<ExchangeTrade> trades);
}
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
New file
@@ -0,0 +1,56 @@
package com.xcong.excoin.modules.exchange.service.impl;
import cn.hutool.core.util.StrUtil;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.modules.exchange.service.HandleKlineService;
import com.xcong.excoin.processor.CoinProcessor;
import com.xcong.excoin.processor.CoinProcessorFactory;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@Service
public class HandleKlineServiceImpl implements HandleKlineService {
    @Resource
    private CoinProcessorFactory processorFactory;
    @Resource
    private RedisUtils redisUtils;
    @Override
    public void handleExchangeOrderToKline(List<ExchangeTrade> trades) {
        if (CollectionUtils.isEmpty(trades)) {
            return;
        }
        String symbol = trades.get(0).getSymbol();
        String symbolUsdt = symbol;
        if(!symbol.contains("USDT")){
            symbolUsdt = symbol+"/USDT";
        }
        CoinProcessor processor = processorFactory.getProcessor(symbol);
        Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap();
        Collection<Candlestick> values = currentKlineMap.values();
        BigDecimal newPrice = trades.get(trades.size()-1).getPrice();
        for (Candlestick candlestick : values) {
            for (ExchangeTrade exchangeTrade : trades) {
                processor.processTrade(candlestick, exchangeTrade);
            }
        }
        // 存入redis,websocket去取
        String key = "NEW_KINE_{}";
        key = StrUtil.format(key, symbolUsdt);
        redisUtils.set(key,currentKlineMap);
        // 更新最新价
        redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
    }
}
src/main/java/com/xcong/excoin/modules/member/dao/MemberWalletCoinDao.java
@@ -22,4 +22,5 @@
    int subFrozenBalance(@Param("memberId") Long memberId, @Param("id") Long id, @Param("amount") BigDecimal amount);
    int updateBlockBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("earlyBalance") BigDecimal earlyBalance, @Param("blockNumber") Integer blockNumber);
    int updateWalletBalance(@Param("id") Long id, @Param("availableBalance") BigDecimal availableBalance, @Param("totalBalance") BigDecimal totalBalance, @Param("frozenBalance") BigDecimal frozenBalance);
}
src/main/java/com/xcong/excoin/processor/CoinProcessor.java
New file
@@ -0,0 +1,54 @@
package com.xcong.excoin.processor;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.RedisUtils;
import java.util.List;
import java.util.Map;
public interface CoinProcessor {
    void setIsHalt(boolean status);
    void setIsStopKLine(boolean stop);
    boolean isStopKline();
    /**
     * 处理新生成的交易信息
     * @param trades
     * @return
     */
    void process(List<ExchangeTrade> trades);
    void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade);
    /**
     * 添加存储器
     * @param storage
     */
    void addHandler(MarketHandler storage);
    CoinThumb getThumb();
    void setMarketService(MarketService service);
    void generateKLine(int range, int field, long time);
    Candlestick getKLine();
    void initializeThumb();
    void autoGenerate();
    void resetThumb();
   // void setExchangeRate(CoinExchangeRate coinExchangeRate);
    void update24HVolume(long time);
    //void initializeUsdRate();
     Map<String,Candlestick> getCurrentKlineMap();
     void setRedisUtils(RedisUtils redisUtils);
}
src/main/java/com/xcong/excoin/processor/CoinProcessorFactory.java
New file
@@ -0,0 +1,34 @@
package com.xcong.excoin.processor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
@Component
public class CoinProcessorFactory {
    private ConcurrentHashMap<String, CoinProcessor> processorMap;
    public CoinProcessorFactory() {
        processorMap = new ConcurrentHashMap<>();
    }
    public void addProcessor(String symbol, CoinProcessor processor) {
        log.info("CoinProcessorFactory addProcessor = {}" + symbol);
        processorMap.put(symbol, processor);
    }
    public boolean containsProcessor(String symbol) {
        return processorMap != null && processorMap.containsKey(symbol);
    }
    public CoinProcessor getProcessor(String symbol) {
        return processorMap.get(symbol);
    }
    public ConcurrentHashMap<String, CoinProcessor> getProcessorMap() {
        return processorMap;
    }
}
src/main/java/com/xcong/excoin/processor/CoinThumb.java
New file
@@ -0,0 +1,26 @@
package com.xcong.excoin.processor;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class CoinThumb {
    private String symbol;
    private BigDecimal open = BigDecimal.ZERO;
    private BigDecimal high= BigDecimal.ZERO;
    private BigDecimal low= BigDecimal.ZERO;
    private BigDecimal close=BigDecimal.ZERO;
    private BigDecimal chg = BigDecimal.ZERO.setScale(2);
    private BigDecimal change = BigDecimal.ZERO.setScale(2);
    private BigDecimal volume = BigDecimal.ZERO.setScale(2);
    private BigDecimal turnover= BigDecimal.ZERO;
    //昨日收盘价
    private BigDecimal lastDayClose = BigDecimal.ZERO;
    //交易币对usd汇率
    private BigDecimal usdRate;
    //基币对usd的汇率
    private BigDecimal baseUsdRate;
    // 交易區
    private int zone;
}
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
New file
@@ -0,0 +1,392 @@
package com.xcong.excoin.processor;
import com.alibaba.fastjson.JSON;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.RedisUtils;
import lombok.ToString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
/**
 * 默认交易处理器,产生1mK线信息
 */
@ToString
public class DefaultCoinProcessor implements CoinProcessor {
    private Logger logger = LoggerFactory.getLogger(DefaultCoinProcessor.class);
    private String symbol;
    private String baseCoin;
    private Candlestick currentKLine;
    private List<MarketHandler> handlers;
    private CoinThumb coinThumb;
    private MarketService service;
    private RedisUtils redisUtils;
    //private CoinExchangeRate coinExchangeRate;
    //是否暂时处理
    private Boolean isHalt = true;
    //是否停止K线生成
    private Boolean stopKLine = false;
    /**
     *  每个时间段的K线在生成后,生成一个最新的K线
     */
    private Map<String,Candlestick> currentKlineMap = new ConcurrentHashMap<>();
    public DefaultCoinProcessor(String symbol, String baseCoin) {
        //handlers = new ArrayList<>();
        createNewKLine();
        this.baseCoin = baseCoin;
        this.symbol = symbol;
    }
    public String getSymbol() {
        return symbol;
    }
    @Override
    public void initializeThumb() {
        Calendar calendar = Calendar.getInstance();
        //将秒、微秒字段置为0
        calendar.set(Calendar.SECOND, 0);
        calendar.set(Calendar.MILLISECOND, 0);
        long nowTime = calendar.getTimeInMillis();
        calendar.set(Calendar.MINUTE, 0);
        calendar.set(Calendar.HOUR_OF_DAY, 0);
        long firstTimeOfToday = calendar.getTimeInMillis();
        String period = "1min";
        logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
        List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period);
        coinThumb = new CoinThumb();
        synchronized (coinThumb) {
            coinThumb.setSymbol(symbol);
            for (Candlestick kline : lines) {
                if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0) {
                    continue;
                }
                if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
                    coinThumb.setOpen(kline.getOpen());
                }
                if (coinThumb.getHigh().compareTo(kline.getHigh()) < 0) {
                    coinThumb.setHigh(kline.getHigh());
                }
                if (kline.getLow().compareTo(BigDecimal.ZERO) > 0 && coinThumb.getLow().compareTo(kline.getLow()) > 0) {
                    coinThumb.setLow(kline.getLow());
                }
                if (kline.getClose().compareTo(BigDecimal.ZERO) > 0) {
                    coinThumb.setClose(kline.getClose());
                }
                coinThumb.setVolume(coinThumb.getVolume().add(kline.getVolume()));
                // TODO
                coinThumb.setTurnover(coinThumb.getTurnover().add(kline.getAmount()));
            }
            coinThumb.setChange(coinThumb.getClose().subtract(coinThumb.getOpen()));
            // 此处计算涨幅并没有以开盘价为标准,而是以最低价
            if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
                coinThumb.setChg(coinThumb.getChange().divide(coinThumb.getLow(), 4, RoundingMode.UP));
            }
        }
    }
    public void createNewKLine() {
        currentKLine = new Candlestick();
        synchronized (currentKLine) {
            Calendar calendar = Calendar.getInstance();
            calendar.set(Calendar.SECOND, 0);
            calendar.set(Calendar.MILLISECOND, 0);
            //1Min时间要是下一整分钟的
            calendar.add(Calendar.MINUTE, 1);
            currentKLine.setTimestamp(calendar.getTimeInMillis());
            // K线类型
            //currentKLine.setPeriod("1min");
            currentKLine.setCount(0);
        }
    }
    /**
     * 00:00:00 时重置CoinThumb
     */
    @Override
    public void resetThumb() {
        logger.info("reset coinThumb");
        synchronized (coinThumb) {
            coinThumb.setOpen(BigDecimal.ZERO);
            coinThumb.setHigh(BigDecimal.ZERO);
            //设置昨收价格
            coinThumb.setLastDayClose(coinThumb.getClose());
            //coinThumb.setClose(BigDecimal.ZERO);
            coinThumb.setLow(BigDecimal.ZERO);
            coinThumb.setChg(BigDecimal.ZERO);
            coinThumb.setChange(BigDecimal.ZERO);
        }
    }
//    @Override
//    public void setExchangeRate(CoinExchangeRate coinExchangeRate) {
//        this.coinExchangeRate = coinExchangeRate;
//    }
    @Override
    public void update24HVolume(long time) {
        if(coinThumb!=null) {
            synchronized (coinThumb) {
                Calendar calendar = Calendar.getInstance();
                calendar.setTimeInMillis(time);
                calendar.add(Calendar.HOUR_OF_DAY, -24);
                long timeStart = calendar.getTimeInMillis();
                // TODO
                BigDecimal volume = service.findTradeVolume(this.symbol, timeStart, time);
                coinThumb.setVolume(volume.setScale(4, RoundingMode.DOWN));
            }
        }
    }
//    @Override
//    public void initializeUsdRate() {
//        //logger.info("symbol = {} ,baseCoin = {}",this.symbol,this.baseCoin);
//        BigDecimal baseUsdRate = coinExchangeRate.getUsdRate(baseCoin);
//        coinThumb.setBaseUsdRate(baseUsdRate);
//        //logger.info("setBaseUsdRate = ",baseUsdRate);
//        BigDecimal multiply = coinThumb.getClose().multiply(coinExchangeRate.getUsdRate(baseCoin));
//        //logger.info("setUsdRate = ",multiply);
//        coinThumb.setUsdRate(multiply);
//    }
    @Override
    public void autoGenerate() {
        DateFormat df = new SimpleDateFormat("HH:mm:ss");
        //logger.info("auto generate 1min kline in {},data={}", df.format(new Date(currentKLine.getTime())), JSON.toJSONString(currentKLine));
        if(coinThumb != null) {
            synchronized (currentKLine) {
                //没有成交价时存储上一笔成交价
                if(currentKLine.getOpen()==null){
                    currentKLine.setOpen(BigDecimal.ZERO);
                }
                if (currentKLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
                    currentKLine.setOpen(coinThumb.getClose());
                    currentKLine.setLow(coinThumb.getClose());
                    currentKLine.setHigh(coinThumb.getClose());
                    currentKLine.setClose(coinThumb.getClose());
                }
                Calendar calendar = Calendar.getInstance();
                calendar.set(Calendar.SECOND, 0);
                calendar.set(Calendar.MILLISECOND, 0);
                currentKLine.setTimestamp(calendar.getTimeInMillis());
                handleKLineStorage(currentKLine);
                createNewKLine();
            }
        }
    }
    @Override
    public void setIsHalt(boolean status) {
        this.isHalt = status;
    }
    @Override
    public void process(List<ExchangeTrade> trades) {
        if (!isHalt) {
            if (trades == null || trades.size() == 0) {
                return;
            }
            synchronized (currentKLine) {
                for (ExchangeTrade exchangeTrade : trades) {
                    //处理K线
                    processTrade(currentKLine, exchangeTrade);
                    //处理今日概况信息
                    logger.debug("处理今日概况信息");
                    handleThumb(exchangeTrade);
                    //存储并推送成交信息
                    handleTradeStorage(exchangeTrade);
                }
            }
        }
    }
    public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) {
        if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) {
            //第一次设置K线值
            kLine.setOpen(exchangeTrade.getPrice());
            kLine.setHigh(exchangeTrade.getPrice());
            kLine.setLow(exchangeTrade.getPrice());
            kLine.setClose(exchangeTrade.getPrice());
        } else {
            kLine.setHigh(exchangeTrade.getPrice().max(kLine.getHigh()));
            kLine.setLow(exchangeTrade.getPrice().min(kLine.getLow()));
            kLine.setClose(exchangeTrade.getPrice());
        }
        kLine.setCount(kLine.getCount() + 1);
        kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount()));
        BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount());
       // kLine.setTurnover(kLine.getTurnover().add(turnover));
    }
    public void handleTradeStorage(ExchangeTrade exchangeTrade) {
        for (MarketHandler storage : handlers) {
            storage.handleTrade(symbol, exchangeTrade, coinThumb);
        }
    }
    public void handleKLineStorage(Candlestick kLine) {
        // 存储交易信息 TODO 发送最新的一根K线
//        for (MarketHandler storage : handlers) {
//            storage.handleKLine(symbol, kLine);
//        }
    }
    public void handleThumb(ExchangeTrade exchangeTrade) {
        logger.info("handleThumb symbol = {}", this.symbol);
        synchronized (coinThumb) {
            if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
                //第一笔交易记为开盘价
                coinThumb.setOpen(exchangeTrade.getPrice());
            }
            coinThumb.setHigh(exchangeTrade.getPrice().max(coinThumb.getHigh()));
            if (coinThumb.getLow().compareTo(BigDecimal.ZERO) == 0) {
                coinThumb.setLow(exchangeTrade.getPrice());
            } else {
                coinThumb.setLow(exchangeTrade.getPrice().min(coinThumb.getLow()));
            }
            coinThumb.setClose(exchangeTrade.getPrice());
            coinThumb.setVolume(coinThumb.getVolume().add(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP));
            BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP);
            coinThumb.setTurnover(coinThumb.getTurnover().add(turnover));
            BigDecimal change = coinThumb.getClose().subtract(coinThumb.getOpen());
            coinThumb.setChange(change);
            if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
                coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP));
            }
            if ("USDT".equalsIgnoreCase(baseCoin)) {
                logger.info("setUsdRate", exchangeTrade.getPrice());
                coinThumb.setUsdRate(exchangeTrade.getPrice());
            } else {
            }
            //coinThumb.setBaseUsdRate(coinExchangeRate.getUsdRate(baseCoin));
            //coinThumb.setUsdRate(exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
            //logger.info("setUsdRate", exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
            //logger.info("thumb = {}", coinThumb);
        }
    }
    @Override
    public void addHandler(MarketHandler storage) {
        handlers.add(storage);
    }
    @Override
    public CoinThumb getThumb() {
        return coinThumb;
    }
    @Override
    public void setMarketService(MarketService service) {
        this.service = service;
    }
    @Override
    public void generateKLine(int range, int field, long time) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTimeInMillis(time);
        DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        long endTick = calendar.getTimeInMillis();
        String endTime = df.format(calendar.getTime());
        //往前推range个时间单位
        calendar.add(field, -range);
        String fromTime = df.format(calendar.getTime());
        long startTick = calendar.getTimeInMillis();
        System.out.println("time range from " + fromTime + " to " + endTime);
        List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick);
        Candlestick kLine = new Candlestick();
        kLine.setTimestamp(endTick);
        kLine.setAmount(BigDecimal.ZERO);
        kLine.setClose(BigDecimal.ZERO);
        kLine.setLow(BigDecimal.ZERO);
        kLine.setOpen(BigDecimal.ZERO);
        kLine.setVolume(BigDecimal.ZERO);
        kLine.setHigh(BigDecimal.ZERO);
        String rangeUnit = "";
        if (field == Calendar.MINUTE) {
            rangeUnit = "min";
        } else if (field == Calendar.HOUR_OF_DAY) {
            rangeUnit = "hour";
        } else if (field == Calendar.DAY_OF_WEEK) {
            rangeUnit = "week";
        } else if (field == Calendar.DAY_OF_YEAR) {
            rangeUnit = "day";
        } else if (field == Calendar.MONTH) {
            rangeUnit = "month";
        }
       // kLine.setPeriod(range + rangeUnit);
        String period = range + rangeUnit;
        // 处理K线信息
        for (ExchangeTrade exchangeTrade : exchangeTrades) {
            processTrade(kLine, exchangeTrade);
        }
        // 如果开盘价为0,则设置为前一个价格
        if(kLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
            kLine.setOpen(coinThumb.getClose());
            kLine.setClose(coinThumb.getClose());
            kLine.setLow(coinThumb.getClose());
            kLine.setHigh(coinThumb.getClose());
        }
        logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
        service.saveKLine(symbol,period, kLine);
        // 生成一个对应的新K线 后续的交易会更新这个最新K线数据
        Candlestick newKline = new Candlestick();
        //kLine.setTimestamp(endTick);
        newKline.setAmount(BigDecimal.ZERO);
        newKline.setClose(kLine.getClose());
        newKline.setLow(kLine.getClose());
        // 开盘价是上个K线的收盘价
        newKline.setOpen(kLine.getClose());
        newKline.setVolume(BigDecimal.ZERO);
        newKline.setHigh(kLine.getClose());
        currentKlineMap.put(period,newKline);
        // 存储昨日K线
        if("day".equals(rangeUnit)){
            redisUtils.set("NEKK/USDT",kLine);
        }
    }
    @Override
    public Candlestick getKLine() {
        return currentKLine;
    }
    @Override
    public void setIsStopKLine(boolean stop) {
        this.stopKLine = stop;
    }
    @Override
    public boolean isStopKline() {
        return this.stopKLine;
    }
    @Override
    public Map<String, Candlestick> getCurrentKlineMap() {
        return currentKlineMap;
    }
    @Override
    public void setRedisUtils(RedisUtils redisUtils) {
        this.redisUtils = redisUtils;
    }
    public void setCurrentKlineMap(Map<String, Candlestick> currentKlineMap) {
        this.currentKlineMap = currentKlineMap;
    }
}
src/main/java/com/xcong/excoin/processor/ExchangeOrderDirection.java
New file
@@ -0,0 +1,5 @@
package com.xcong.excoin.processor;
public enum ExchangeOrderDirection {
    BUY,SELL;
}
src/main/java/com/xcong/excoin/processor/MarketHandler.java
New file
@@ -0,0 +1,21 @@
package com.xcong.excoin.processor;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.trade.ExchangeTrade;
public interface MarketHandler {
    /**
     * 存储交易信息
     * @param exchangeTrade
     */
    void handleTrade(String symbol, ExchangeTrade exchangeTrade, CoinThumb thumb);
    /**
     * 存储K线信息
     *
     * @param kLine
     */
    void handleKLine(String symbol, Candlestick kLine);
}
src/main/java/com/xcong/excoin/processor/MarketService.java
New file
@@ -0,0 +1,125 @@
package com.xcong.excoin.processor;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.RedisUtils;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@Service
public class MarketService {
//    @Autowired
//    private MongoTemplate mongoTemplate;
    @Resource
    private OrderCoinDealDao orderCoinDealDao;
    @Resource
    private RedisUtils redisUtils;
    public List<Candlestick> findAllKLine(String symbol, String peroid) {
//        Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time"));
//        Query query = new Query().with(sort).limit(1000);
//
//        return mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+peroid);
        return null;
    }
    public List<Candlestick> findAllKLine(String symbol, long fromTime, long toTime, String period) {
//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
//        Query query = new Query(criteria).with(sort);
//        List<KLine> kLines = mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_"+ period);
//        return kLines;
        String key = "KINE_" + symbol + "_" + period;
        Object data = redisUtils.get(key);
        List list = new ArrayList();
        if (data != null) {
            list = (List) data;
        }
        return list;
    }
//
//    public ExchangeTrade findFirstTrade(String symbol,long fromTime,long toTime){
//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
//        Query query = new Query(criteria).with(sort);
//        return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
//    }
//    public ExchangeTrade findLastTrade(String symbol,long fromTime,long toTime){
//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
//        Sort sort = new Sort(new Sort.Order(Sort.Direction.DESC,"time"));
//        Query query = new Query(criteria).with(sort);
//        return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
//    }
//
//    public ExchangeTrade findTrade(String symbol, long fromTime, long toTime, Sort.Order order){
//        Criteria criteria = Criteria.where("time").gte(fromTime).andOperator(Criteria.where("time").lte(toTime));
//        Sort sort = new Sort(order);
//        Query query = new Query(criteria).with(sort);
//        return mongoTemplate.findOne(query,ExchangeTrade.class,"exchange_trade_"+symbol);
//    }
    public List<ExchangeTrade> findTradeByTimeRange(String symbol, long timeStart, long timeEnd) {
//        Criteria criteria = Criteria.where("time").gte(timeStart).andOperator(Criteria.where("time").lt(timeEnd));
//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
//        Query query = new Query(criteria).with(sort);
//
//        return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol);
        return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
        // return null;
    }
    public void saveKLine(String symbol, String period, Candlestick kLine) {
        // 先获取
        String key = "KINE_" + symbol + "_" + period;
        Object data = redisUtils.get(key);
        List list = new ArrayList();
        if (data != null) {
            list = (List) data;
        }
        list.add(kLine);
        redisUtils.set("KINE_" + symbol + "_" + period, list);
        //  mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod());
    }
    /**
     * 查找某时间段内的交易量
     *
     * @param symbol
     * @param timeStart
     * @param timeEnd
     * @return
     */
    public BigDecimal findTradeVolume(String symbol, long timeStart, long timeEnd) {
//        Criteria criteria = Criteria.where("time").gt(timeStart)
//                .andOperator(Criteria.where("time").lte(timeEnd));
//                //.andOperator(Criteria.where("volume").gt(0));
//        Sort sort = new Sort(new Sort.Order(Sort.Direction.ASC,"time"));
//        Query query = new Query(criteria).with(sort);
//        List<KLine> kLines =  mongoTemplate.find(query,KLine.class,"exchange_kline_"+symbol+"_1min");
//        BigDecimal totalVolume = BigDecimal.ZERO;
//        for(KLine kLine:kLines){
//            totalVolume = totalVolume.add(kLine.getVolume());
//        }
//        return totalVolume;
        List<ExchangeTrade> exchangeTrades = orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
        BigDecimal totalVolume = BigDecimal.ZERO;
        if (CollectionUtils.isNotEmpty(exchangeTrades)) {
            for (ExchangeTrade kLine : exchangeTrades) {
                totalVolume = totalVolume.add(kLine.getBuyTurnover());
            }
        }
        return totalVolume;
    }
}
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
New file
@@ -0,0 +1,98 @@
package com.xcong.excoin.quartz.job;
import com.alibaba.fastjson.JSON;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.huobi.client.model.Candlestick;
import com.huobi.client.model.enums.CandlestickInterval;
import com.xcong.excoin.modules.coin.dao.OrderCoinsDao;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.modules.symbols.service.SymbolsService;
import com.xcong.excoin.processor.CoinProcessor;
import com.xcong.excoin.processor.CoinProcessorFactory;
import com.xcong.excoin.processor.DefaultCoinProcessor;
import com.xcong.excoin.processor.MarketService;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
/**
 *  开启撮合交易
 *
 * @author wzy
 * @date 2020-05-28
 **/
@Slf4j
@Component
//@ConditionalOnProperty(prefix = "app", name = "trade", havingValue = "true")
public class CoinTradeInitJob {
    @Resource
    private OrderCoinsDao orderCoinsDao;
    @Resource
    private CoinTraderFactory factory;
    @Resource
    private OrderCoinService coinService;
    @Resource
    private MarketService marketService;
    @Resource
    private CoinProcessorFactory processorFactory;
    @PostConstruct
    public void initCoinTrade() {
        log.info("#=======撮合交易器开启=======#");
        String symbol = "NEKK";
        CoinTrader newTrader = new CoinTrader(symbol);
        newTrader.setOrderCoinService(coinService);
        //newTrader.setKafkaTemplate(kafkaTemplate);
        //newTrader.setBaseCoinScale(coin.getBaseCoinScale());
        //newTrader.setCoinScale(coin.getCoinScale());
        // newTrader.setPublishType(coin.getPublishType());
        //newTrader.setClearTime(coin.getClearTime());
        // 创建成功以后需要对未处理订单预处理
        log.info("======CoinTrader Process: " + symbol + "======");
        List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
        List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
        orders.forEach(order -> {
            tradingOrders.add(order);
        });
        try {
            newTrader.trade(tradingOrders);
        } catch (ParseException e) {
            e.printStackTrace();
            log.info("异常:trader.trade(tradingOrders);");
        }
        newTrader.setReady(true);
        factory.addTrader(symbol, newTrader);
        // 创建K线生成器
        CoinProcessor processor = new DefaultCoinProcessor(symbol, "USDT");
        processor.setMarketService(marketService);
        //processor.setExchangeRate(exchangeRate);
        processor.initializeThumb();
        //processor.initializeUsdRate();
        processor.setIsHalt(false);
        processorFactory.addProcessor(symbol, processor);
    }
}
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
New file
@@ -0,0 +1,159 @@
package com.xcong.excoin.quartz.job;
import com.alibaba.fastjson.JSON;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.processor.CoinProcessorFactory;
import com.xcong.excoin.trade.TradePlateModel;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.websocket.TradePlateSendWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
/**
 * 生成各时间段的K线信息
 *
 */
@Component
@Slf4j
public class KLineGeneratorJob {
    @Resource
    private CoinProcessorFactory processorFactory;
    @Resource
    private TradePlateSendWebSocket plateSendWebSocket;
    @Resource
    private RedisUtils redisUtils;
    @Scheduled(cron = "0/2 * * * * *")
    public void tradePlate(){
        redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()));
        Candlestick candlestick = new Candlestick();
        candlestick.setOpen(new BigDecimal("10.33"));
        candlestick.setHigh(new BigDecimal("15.23"));
        candlestick.setVolume(new BigDecimal("12121.34"));
        candlestick.setLow(new BigDecimal("8.234"));
        candlestick.setAmount(new BigDecimal("1199"));
        candlestick.setTimestamp(1599840000);
        candlestick.setId(1599840000L);
        candlestick.setCount(100002);
        candlestick.setClose(new BigDecimal("12.2323"));
        redisUtils.set("NEKK/USDT",candlestick);
        // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
        TradePlateModel tradePlateModel = new TradePlateModel();
        List<BigDecimal> buy;
        List<BigDecimal> sell;
        for(int i=0;i<5;i++){
            buy = new ArrayList<>(2);
            buy.add(new BigDecimal(Math.random()*i));
            buy.add(new BigDecimal(Math.random()*i));
            tradePlateModel.getBuy().add(buy);
            sell = new ArrayList<>(2);
            sell.add(new BigDecimal(Math.random()*i*2));
            sell.add(new BigDecimal(Math.random()*i*2));
            tradePlateModel.getSell().add(sell);
        }
        plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null);
        plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
        plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
        plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
    }
    /**
     * 每分钟定时器,处理分钟K线
     */
    @Scheduled(cron = "0 * * * * *")
    public void handle5minKLine(){
        Calendar calendar = Calendar.getInstance();
        log.debug("分钟K线:{}",calendar.getTime());
        //将秒、微秒字段置为0
        calendar.set(Calendar.SECOND,0);
        calendar.set(Calendar.MILLISECOND,0);
        long time = calendar.getTimeInMillis();
        int minute = calendar.get(Calendar.MINUTE);
        int hour = calendar.get(Calendar.HOUR_OF_DAY);
        processorFactory.getProcessorMap().forEach((symbol,processor)->{
            if(!processor.isStopKline()) {
                log.debug("生成{}分钟k线:{}",symbol);
                //生成1分钟K线
                processor.autoGenerate();
                //更新24H成交量
                processor.update24HVolume(time);
                if(minute%5 == 0) {
                    // 五分钟的当前K线
                    processor.generateKLine(5, Calendar.MINUTE, time);
                }
                if(minute%10 == 0){
                    processor.generateKLine(10, Calendar.MINUTE,time);
                }
                if(minute%15 == 0){
                    processor.generateKLine(15, Calendar.MINUTE,time);
                }
                if(minute%30 == 0){
                    processor.generateKLine(30, Calendar.MINUTE,time);
                }
                if(hour == 0 && minute == 0){
                    processor.resetThumb();
                }
            }
        });
    }
    /**
     * 每小时运行
     */
    @Scheduled(cron = "0 0 * * * *")
    public void handleHourKLine(){
        processorFactory.getProcessorMap().forEach((symbol,processor)-> {
            if(!processor.isStopKline()) {
                Calendar calendar = Calendar.getInstance();
                log.info("小时K线:{}",calendar.getTime());
                //将秒、微秒字段置为0
                calendar.set(Calendar.MINUTE, 0);
                calendar.set(Calendar.SECOND, 0);
                calendar.set(Calendar.MILLISECOND, 0);
                long time = calendar.getTimeInMillis();
                processor.generateKLine(1, Calendar.HOUR_OF_DAY, time);
            }
        });
    }
    /**
     * 每日0点处理器,处理日K线
     */
    @Scheduled(cron = "0 0 0 * * *")
    public void handleDayKLine(){
        processorFactory.getProcessorMap().forEach((symbol,processor)->{
            if(!processor.isStopKline()) {
                Calendar calendar = Calendar.getInstance();
                log.info("日K线:{}",calendar.getTime());
                //将秒、微秒字段置为0
                calendar.set(Calendar.HOUR_OF_DAY,0);
                calendar.set(Calendar.MINUTE,0);
                calendar.set(Calendar.SECOND,0);
                calendar.set(Calendar.MILLISECOND,0);
                long time = calendar.getTimeInMillis();
                int week = calendar.get(Calendar.DAY_OF_WEEK);
                int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH);
                if(week == 1){
                    processor.generateKLine(1, Calendar.DAY_OF_WEEK, time);
                }
                if(dayOfMonth == 1){
                    processor.generateKLine(1, Calendar.DAY_OF_MONTH, time);
                }
                processor.generateKLine(1, Calendar.DAY_OF_YEAR,time);
            }
        });
    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
New file
@@ -0,0 +1,98 @@
package com.xcong.excoin.rabbit.consumer;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.modules.exchange.service.HandleKlineService;
import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.websocket.TradePlateSendWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
 * @author wzy
 * @date 2020-05-25
 **/
@Slf4j
@Component
public class ExchangeConsumer {
    @Resource
    private TradePlateSendWebSocket tradePlateSendWebSocket;
    @Resource
    private RedisUtils redisUtils;
    @Resource
    private HandleKlineService handleKlineService;
    @Resource
    private OrderCoinService orderCoinService;
    /**
     *  发送盘口信息
     * @param content
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
    public void tradePlate(String content) {
        log.info("#---->{}#", content);
        tradePlateSendWebSocket.sendMessagePlate(content,null);
    }
    /**
     *  处理订单
     * @param content
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE)
    public void handleTradeExchange(String content) {
        log.info("#---->{}#", content);
        List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
        // 处理K线 并更新最新价
        handleKlineService.handleExchangeOrderToKline(exchangeTrades);
        // 推送最新K线
        String symbol = exchangeTrades.get(0).getSymbol();
        String symbolUsdt = symbol;
        if(!symbol.contains("USDT")){
            symbolUsdt = symbol+"/USDT";
        }
        String key = "NEW_KINE_{}";
        key = StrUtil.format(key, symbolUsdt);
        Object o = redisUtils.get(key);
        Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
        Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
        for(Map.Entry<String, Candlestick> map : entries){
            tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
        }
        // 处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
    }
    /**
     *  更新最新K线
     * @param content
     */
//    @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
//    public void newKling(String content) {
//        log.info("#---->{}#", content);
//        // 最新K线的币种
//        String key = "NEW_KINE_{}";
//        key = StrUtil.format(key, content);
//        Object o = redisUtils.get(key);
//        Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>)o;
//        // 推送最新K线
//        Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
//        for(Map.Entry<String, Candlestick> map : entries){
//            tradePlateSendWebSocket.sendMessageKline(content,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
//        }
//
//    }
}
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
New file
@@ -0,0 +1,47 @@
package com.xcong.excoin.rabbit.producer;
import cn.hutool.core.util.IdUtil;
import com.xcong.excoin.configurations.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author wzy
 * @date 2020-05-25
 **/
@Slf4j
@Component
public class ExchangeProducer implements RabbitTemplate.ConfirmCallback {
    private RabbitTemplate rabbitTemplate;
    @Autowired
    public ExchangeProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }
    public void sendPlateMsg(String content) {
        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_TRADE_PLATE, content, correlationData);
    }
    public void sendHandleTrade(String content) {
        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_B, RabbitMqConfig.ROUTINGKEY_HANDLE_TRADE, content, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("#----->{}#", correlationData);
        if (ack) {
            log.info("success");
        } else {
            log.info("--->{}", cause);
        }
    }
}
src/main/java/com/xcong/excoin/trade/CoinTrader.java
New file
@@ -0,0 +1,762 @@
package com.xcong.excoin.trade;
import com.alibaba.fastjson.JSON;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class CoinTrader {
    private String symbol;
    private ExchangeProducer exchangeProducer;
    private OrderCoinService orderCoinService;
    //交易币种的精度
    private int coinScale = 4;
    //基币的精度
    private int baseCoinScale = 4;
    private Logger logger = LoggerFactory.getLogger(CoinTrader.class);
    //买入限价订单链表,价格从高到低排列
    private TreeMap<BigDecimal, MergeOrder> buyLimitPriceQueue;
    //卖出限价订单链表,价格从低到高排列
    private TreeMap<BigDecimal, MergeOrder> sellLimitPriceQueue;
    //买入市价订单链表,按时间从小到大排序
    private LinkedList<OrderCoinsEntity> buyMarketQueue;
    //卖出市价订单链表,按时间从小到大排序
    private LinkedList<OrderCoinsEntity> sellMarketQueue;
    //卖盘盘口信息
    private TradePlate sellTradePlate;
    //买盘盘口信息
    private TradePlate buyTradePlate;
    //是否暂停交易
    private boolean tradingHalt = false;
    private boolean ready = false;
    //交易对信息
    //private ExchangeCoinPublishType publishType;
    private String clearTime;
    private SimpleDateFormat dateTimeFormat;
    public CoinTrader(String symbol) {
        this.symbol = symbol;
        initialize();
    }
    /**
     * 初始化交易线程
     */
    public void initialize() {
        logger.info("init CoinTrader for symbol {}", symbol);
        //买单队列价格降序排列
        buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder());
        //卖单队列价格升序排列
        this.sellLimitPriceQueue = new TreeMap<>(Comparator.naturalOrder());
        this.buyMarketQueue = new LinkedList<>();
        this.sellMarketQueue = new LinkedList<>();
        this.sellTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_SELL);
        this.buyTradePlate = new TradePlate(symbol, OrderCoinsEntity.ORDERTYPE_BUY);
        this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    }
    /**
     * 增加限价订单到队列,买入单按从价格高到低排,卖出单按价格从低到高排
     *
     * @param exchangeOrder
     */
    public void addLimitPriceOrder(OrderCoinsEntity exchangeOrder) {
        if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
            return;
        }
        //logger.info("addLimitPriceOrder,orderId = {}", exchangeOrder.getOrderId());
        TreeMap<BigDecimal, MergeOrder> list;
        if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            list = buyLimitPriceQueue;
            // 添加盘口信息 TODO
            buyTradePlate.add(exchangeOrder);
            if (ready) {
                // 发送盘口信息
                sendTradePlateMessage(buyTradePlate, sellTradePlate);
            }
        } else {
            list = sellLimitPriceQueue;
            sellTradePlate.add(exchangeOrder);
            if (ready) {
                sendTradePlateMessage(buyTradePlate, sellTradePlate);
            }
        }
        synchronized (list) {
            MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
            if (mergeOrder == null) {
                mergeOrder = new MergeOrder();
                mergeOrder.add(exchangeOrder);
                list.put(exchangeOrder.getEntrustPrice(), mergeOrder);
            } else {
                mergeOrder.add(exchangeOrder);
            }
        }
    }
    public void addMarketPriceOrder(OrderCoinsEntity exchangeOrder) {
        if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            return;
        }
        logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
        LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue;
        synchronized (list) {
            list.addLast(exchangeOrder);
        }
    }
    public void trade(List<OrderCoinsEntity> orders) throws ParseException {
        if (tradingHalt) {
            return;
        }
        for (OrderCoinsEntity order : orders) {
            trade(order);
        }
    }
    /**
     * 主动交易输入的订单,交易不完成的会输入到队列
     *
     * @param exchangeOrder
     * @throws ParseException
     */
    public void trade(OrderCoinsEntity exchangeOrder) {
        if (tradingHalt) {
            return;
        }
        //logger.info("trade order={}",exchangeOrder);
        if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) {
            logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
            return;
        }
        // 如果
        if (exchangeOrder.getEntrustAmount().compareTo(BigDecimal.ZERO) <= 0 || exchangeOrder.getEntrustAmount().subtract(exchangeOrder.getDealAmount()).compareTo(BigDecimal.ZERO) <= 0) {
            return;
        }
        TreeMap<BigDecimal, MergeOrder> limitPriceOrderList;
        LinkedList<OrderCoinsEntity> marketPriceOrderList;
        if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            // 买单 和限价卖单以及市价市场卖单队列对比 完成撮合
            limitPriceOrderList = sellLimitPriceQueue;
            marketPriceOrderList = sellMarketQueue;
        } else {
            limitPriceOrderList = buyLimitPriceQueue;
            marketPriceOrderList = buyMarketQueue;
        }
        if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //logger.info(">>>>>市价单>>>交易与限价单交易");
            //与限价单交易
            matchMarketPriceWithLPList(limitPriceOrderList, exchangeOrder);
        } else if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
            //限价单价格必须大于0
            if (exchangeOrder.getEntrustPrice().compareTo(BigDecimal.ZERO) <= 0) {
                return;
            }
            //logger.info(">>>>>限价单>>>交易与限价单交易");
            //先与限价单交易
            matchLimitPriceWithLPList(limitPriceOrderList, exchangeOrder, false);
            if (exchangeOrder.getEntrustCnt().compareTo(exchangeOrder.getDealCnt()) > 0) {
                //logger.info(">>>>限价单未交易完>>>>与市价单交易>>>>");
                //后与市价单交易
                matchLimitPriceWithMPList(marketPriceOrderList, exchangeOrder);
            }
        }
    }
    /**
     * 限价委托单与限价队列匹配
     *
     * @param lpList       限价对手单队列
     * @param focusedOrder 交易订单
     */
    public void matchLimitPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder, boolean canEnterList) {
        List<ExchangeTrade> exchangeTrades = new ArrayList<>();
        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
        synchronized (lpList) {
            Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
            boolean exitLoop = false;
            while (!exitLoop && mergeOrderIterator.hasNext()) {
                Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
                MergeOrder mergeOrder = entry.getValue();
                Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
                //买入单需要匹配的价格不大于委托价,否则退出
                if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) > 0) {
                    break;
                }
                //卖出单需要匹配的价格不小于委托价,否则退出
                if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && mergeOrder.getPrice().compareTo(focusedOrder.getEntrustPrice()) < 0) {
                    break;
                }
                while (orderIterator.hasNext()) {
                    OrderCoinsEntity matchOrder = orderIterator.next();
                    //处理匹配
                    ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
                    exchangeTrades.add(trade);
                    //判断匹配单是否完成
                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //当前匹配的订单完成交易,删除该订单
                        orderIterator.remove();
                        completedOrders.add(matchOrder);
                    }
                    //判断交易单是否完成
                    if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //交易完成
                        completedOrders.add(focusedOrder);
                        //退出循环
                        exitLoop = true;
                        break;
                    }
                }
                if (mergeOrder.size() == 0) {
                    mergeOrderIterator.remove();
                }
            }
        }
        //如果还没有交易完,订单压入列表中
        if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0 && canEnterList) {
            addLimitPriceOrder(focusedOrder);
        }
        //每个订单的匹配批量推送
        handleExchangeTrade(exchangeTrades);
        if (completedOrders.size() > 0) {
            orderCompleted(completedOrders);
            TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
            sendTradePlateMessage(buyTradePlate, sellTradePlate);
        }
    }
    /**
     * 限价委托单与市价队列匹配
     *
     * @param mpList       市价对手单队列
     * @param focusedOrder 交易订单
     */
    public void matchLimitPriceWithMPList(LinkedList<OrderCoinsEntity> mpList, OrderCoinsEntity focusedOrder) {
        List<ExchangeTrade> exchangeTrades = new ArrayList<>();
        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
        synchronized (mpList) {
            Iterator<OrderCoinsEntity> iterator = mpList.iterator();
            while (iterator.hasNext()) {
                OrderCoinsEntity matchOrder = iterator.next();
                ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
                logger.info(">>>>>" + trade);
                if (trade != null) {
                    exchangeTrades.add(trade);
                }
                //判断匹配单是否完成,市价单amount为成交量
                if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                    iterator.remove();
                    completedOrders.add(matchOrder);
                }
                //判断吃单是否完成,判断成交量是否完成
                if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                    //交易完成
                    completedOrders.add(focusedOrder);
                    //退出循环
                    break;
                }
            }
        }
        //如果还没有交易完,订单压入列表中
        if (focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0) {
            addLimitPriceOrder(focusedOrder);
        }
        //每个订单的匹配批量推送
        handleExchangeTrade(exchangeTrades);
        orderCompleted(completedOrders);
    }
    /**
     * 市价委托单与限价对手单列表交易
     *
     * @param lpList       限价对手单列表
     * @param focusedOrder 待交易订单
     */
    public void matchMarketPriceWithLPList(TreeMap<BigDecimal, MergeOrder> lpList, OrderCoinsEntity focusedOrder) {
        List<ExchangeTrade> exchangeTrades = new ArrayList<>();
        List<OrderCoinsEntity> completedOrders = new ArrayList<>();
        // 加锁 同步
        synchronized (lpList) {
            Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = lpList.entrySet().iterator();
            boolean exitLoop = false;
            while (!exitLoop && mergeOrderIterator.hasNext()) {
                Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
                MergeOrder mergeOrder = entry.getValue();
                Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
                while (orderIterator.hasNext()) {
                    OrderCoinsEntity matchOrder = orderIterator.next();
                    //处理匹配 用户单和买卖盘内的委托单进行比较,得到可以交易的量
                    // 例如 市价买单 1000个 卖1 500 卖2 300 卖3 500 此时从卖1开始吃到卖3只剩300个
                    ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
                    if (trade != null) {
                        exchangeTrades.add(trade);
                    }
                    //判断匹配单是否完成
                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //当前匹配的订单完成交易,删除该订单
                        orderIterator.remove();
                        completedOrders.add(matchOrder);
                    }
                    //判断焦点订单是否完成
                    if (focusedOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        completedOrders.add(focusedOrder);
                        //退出循环
                        exitLoop = true;
                        break;
                    }
                }
                if (mergeOrder.size() == 0) {
                    mergeOrderIterator.remove();
                }
            }
        }
        //如果还没有交易完,订单压入列表中,市价买单按成交量算
        if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0
                || focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) {
            addMarketPriceOrder(focusedOrder);
        }
        //每个订单的匹配批量推送
        handleExchangeTrade(exchangeTrades);
        if (completedOrders.size() > 0) {
            orderCompleted(completedOrders);
            TradePlate plate = focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? sellTradePlate : buyTradePlate;
            sendTradePlateMessage(buyTradePlate, sellTradePlate);
        }
    }
    /**
     * 计算委托单剩余可成交的数量
     *
     * @param order     委托单
     * @param dealPrice 成交价
     * @return
     */
    private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) {
        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //剩余成交量 TODO ?
            // 委托量-成交量=剩余量
            BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
            return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN);
        } else {
            return order.getEntrustCnt().subtract(order.getDealCnt());
        }
    }
    /**
     * 调整市价单剩余成交额,当剩余成交额不足时设置订单完成
     *
     * @param order
     * @param dealPrice
     * @return
     */
    private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) {
        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
//            BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover());
//            if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
//                    .compareTo(BigDecimal.ZERO)==0){
//                order.setTurnover(order.getAmount());
//                return leftTurnover;
//            }
        }
        return BigDecimal.ZERO;
    }
    /**
     * 处理两个匹配的委托订单
     *
     * @param focusedOrder 焦点单
     * @param matchOrder   匹配单
     * @return
     */
    private ExchangeTrade processMatch(OrderCoinsEntity focusedOrder, OrderCoinsEntity matchOrder) {
        //需要交易的数量,成交量,成交价,可用数量
        BigDecimal needAmount, dealPrice, availAmount;
        //如果匹配单是限价单,则以其价格为成交价
        if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
            dealPrice = matchOrder.getEntrustPrice();
        } else {
            dealPrice = focusedOrder.getEntrustPrice();
        }
        //成交价必须大于0
        if (dealPrice.compareTo(BigDecimal.ZERO) <= 0) {
            return null;
        }
        // 需要的成交量
        needAmount = calculateTradedAmount(focusedOrder, dealPrice);
        // 队列单可提供的成交量
        availAmount = calculateTradedAmount(matchOrder, dealPrice);
        //计算成交量 取少的
        BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount);
        logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
        //如果成交额为0说明剩余额度无法成交,退出
        if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) {
            return null;
        }
        //计算成交额,成交额要保留足够精度
        BigDecimal turnover = tradedAmount.multiply(dealPrice);
        matchOrder.setDealCnt(matchOrder.getDealCnt().add(tradedAmount));
        // 成交金额
        matchOrder.setDealAmount(matchOrder.getDealAmount().add(turnover));
        // 用户单成交量
        focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount));
        // 用户单成交金额
        focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover));
        //创建成交记录
        ExchangeTrade exchangeTrade = new ExchangeTrade();
        exchangeTrade.setSymbol(symbol);
        // 成交量
        exchangeTrade.setAmount(tradedAmount);
        exchangeTrade.setDirection(focusedOrder.getOrderType());
        // 成交价格
        exchangeTrade.setPrice(dealPrice);
        // 成交金额
        exchangeTrade.setBuyTurnover(turnover);
        exchangeTrade.setSellTurnover(turnover);
        //校正市价单剩余成交额
        if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == focusedOrder.getTradeType() && focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            BigDecimal adjustTurnover = adjustMarketOrderTurnover(focusedOrder, dealPrice);
            exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
        } else if (OrderCoinsEntity.TRADETYPE_MARKETPRICE == matchOrder.getTradeType() && matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            BigDecimal adjustTurnover = adjustMarketOrderTurnover(matchOrder, dealPrice);
            exchangeTrade.setBuyTurnover(turnover.add(adjustTurnover));
        }
        if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
            exchangeTrade.setBuyOrderId(focusedOrder.getId());
            exchangeTrade.setSellOrderId(matchOrder.getId());
        } else {
            exchangeTrade.setBuyOrderId(matchOrder.getId());
            exchangeTrade.setSellOrderId(focusedOrder.getId());
        }
        exchangeTrade.setTime(Calendar.getInstance().getTimeInMillis());
        if (matchOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
            if (matchOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
                buyTradePlate.remove(matchOrder, tradedAmount);
            } else {
                sellTradePlate.remove(matchOrder, tradedAmount);
            }
        }
        return exchangeTrade;
    }
    // 这里是交易完的单
    public void handleExchangeTrade(List<ExchangeTrade> trades) {
        //logger.info("handleExchangeTrade:{}", trades);
        if (trades.size() > 0) {
            int maxSize = 1000;
            //发送消息,key为交易对符号
            if (trades.size() > maxSize) {
                int size = trades.size();
                for (int index = 0; index < size; index += maxSize) {
                    int length = (size - index) > maxSize ? maxSize : size - index;
                    List<ExchangeTrade> subTrades = trades.subList(index, index + length);
                    exchangeProducer.sendHandleTrade(JSON.toJSONString(subTrades));
                    //orderCoinService.handleOrder(subTrades);
                }
            } else {
                trades.forEach(e -> {
                    System.out.println(e);
                });
                exchangeProducer.sendHandleTrade(JSON.toJSONString(trades));
                //orderCoinService.handleOrder(trades);
                // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades));
            }
            // 更新最新K线 TODO
        }
    }
    /**
     * 订单完成,执行消息通知,订单数超1000个要拆分发送
     *
     * @param orders
     */
    public void orderCompleted(List<OrderCoinsEntity> orders) {
        //logger.info("orderCompleted ,order={}",orders);
        if (orders.size() > 0) {
            int maxSize = 1000;
            if (orders.size() > maxSize) {
                int size = orders.size();
                for (int index = 0; index < size; index += maxSize) {
                    int length = (size - index) > maxSize ? maxSize : size - index;
                    List<OrderCoinsEntity> subOrders = orders.subList(index, index + length);
                    // TODO 通知订单完成
                    //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders));
                }
            } else {
                // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders));
            }
        }
    }
    /**
     * 发送盘口变化消息
     *
     * @param buyTradePlate sellTradePlate
     */
    public void sendTradePlateMessage(TradePlate buyTradePlate, TradePlate sellTradePlate) {
        //防止并发引起数组越界,造成盘口倒挂 TODO
        List<List<BigDecimal>> plate;
        List<BigDecimal> plateItem;
        TradePlateModel tradePlateModel = new TradePlateModel();
        // 转换格式
        if (buyTradePlate != null && buyTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = buyTradePlate.getItems();
            for (TradePlateItem item : items) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = item.getPrice();
                BigDecimal amount = item.getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
            }
            tradePlateModel.setBuy(plate);
        }
        if (sellTradePlate != null && sellTradePlate.getItems() != null) {
            plate = new ArrayList<>();
            LinkedList<TradePlateItem> items = sellTradePlate.getItems();
            for (TradePlateItem item : items) {
                plateItem = new ArrayList<>(2);
                BigDecimal price = item.getPrice();
                BigDecimal amount = item.getAmount();
                plateItem.add(price);
                plateItem.add(amount);
                plate.add(plateItem);
            }
            tradePlateModel.setSell(plate);
        }
        // 盘口发生变化通知TODO
        exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel));
    }
    /**
     * 取消委托订单
     *
     * @param exchangeOrder
     * @return
     */
    public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) {
        logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
        if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //处理市价单
            Iterator<OrderCoinsEntity> orderIterator;
            List<OrderCoinsEntity> list = null;
            if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
                list = this.buyMarketQueue;
            } else {
                list = this.sellMarketQueue;
            }
            synchronized (list) {
                orderIterator = list.iterator();
                while ((orderIterator.hasNext())) {
                    OrderCoinsEntity order = orderIterator.next();
                    if (order.getId().equals(exchangeOrder.getId())) {
                        orderIterator.remove();
                        onRemoveOrder(order);
                        return order;
                    }
                }
            }
        } else {
            //处理限价单
            TreeMap<BigDecimal, MergeOrder> list = null;
            Iterator<MergeOrder> mergeOrderIterator;
            if (exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
                list = this.buyLimitPriceQueue;
            } else {
                list = this.sellLimitPriceQueue;
            }
            synchronized (list) {
                MergeOrder mergeOrder = list.get(exchangeOrder.getEntrustPrice());
                if (mergeOrder != null) {
                    Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
                    while (orderIterator.hasNext()) {
                        OrderCoinsEntity order = orderIterator.next();
                        if (order.getId().equals(exchangeOrder.getId())) {
                            orderIterator.remove();
                            if (mergeOrder.size() == 0) {
                                list.remove(exchangeOrder.getEntrustPrice());
                            }
                            onRemoveOrder(order);
                            return order;
                        }
                    }
                }
            }
        }
        return null;
    }
    public void onRemoveOrder(OrderCoinsEntity order) {
        if (order.getTradeType() == OrderCoinsEntity.TRADETYPE_FIXEDPRICE) {
            if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY) {
                buyTradePlate.remove(order);
                sendTradePlateMessage(buyTradePlate, sellTradePlate);
            } else {
                sellTradePlate.remove(order);
                sendTradePlateMessage(buyTradePlate, sellTradePlate);
            }
        }
    }
    public TradePlate getTradePlate(ExchangeOrderDirection direction) {
        if (direction == ExchangeOrderDirection.BUY) {
            return buyTradePlate;
        } else {
            return sellTradePlate;
        }
    }
    /**
     * 查询交易器里的订单
     *
     * @param orderId
     * @param type
     * @param direction
     * @return
     */
    public OrderCoinsEntity findOrder(String orderId, int type, int direction) {
        if (type == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            LinkedList<OrderCoinsEntity> list;
            if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
                list = this.buyMarketQueue;
            } else {
                list = this.sellMarketQueue;
            }
            synchronized (list) {
                Iterator<OrderCoinsEntity> orderIterator = list.iterator();
                while ((orderIterator.hasNext())) {
                    OrderCoinsEntity order = orderIterator.next();
                    if (order.getId().equals(orderId)) {
                        return order;
                    }
                }
            }
        } else {
            TreeMap<BigDecimal, MergeOrder> list;
            if (direction == OrderCoinsEntity.ORDERTYPE_BUY) {
                list = this.buyLimitPriceQueue;
            } else {
                list = this.sellLimitPriceQueue;
            }
            synchronized (list) {
                Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = list.entrySet().iterator();
                while (mergeOrderIterator.hasNext()) {
                    Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
                    MergeOrder mergeOrder = entry.getValue();
                    Iterator<OrderCoinsEntity> orderIterator = mergeOrder.iterator();
                    while ((orderIterator.hasNext())) {
                        OrderCoinsEntity order = orderIterator.next();
                        if (order.getId().equals(orderId)) {
                            return order;
                        }
                    }
                }
            }
        }
        return null;
    }
    public TreeMap<BigDecimal, MergeOrder> getBuyLimitPriceQueue() {
        return buyLimitPriceQueue;
    }
    public LinkedList<OrderCoinsEntity> getBuyMarketQueue() {
        return buyMarketQueue;
    }
    public TreeMap<BigDecimal, MergeOrder> getSellLimitPriceQueue() {
        return sellLimitPriceQueue;
    }
    public LinkedList<OrderCoinsEntity> getSellMarketQueue() {
        return sellMarketQueue;
    }
    public void setCoinScale(int scale) {
        this.coinScale = scale;
    }
    public void setBaseCoinScale(int scale) {
        this.baseCoinScale = scale;
    }
    public boolean isTradingHalt() {
        return this.tradingHalt;
    }
    /**
     * 暂停交易,不接收新的订单
     */
    public void haltTrading() {
        this.tradingHalt = true;
    }
    /**
     * 恢复交易
     */
    public void resumeTrading() {
        this.tradingHalt = false;
    }
    public void stopTrading() {
        //TODO:停止交易,取消当前所有订单
    }
    public boolean getReady() {
        return this.ready;
    }
    public void setReady(boolean ready) {
        this.ready = ready;
    }
    public void setClearTime(String clearTime) {
        this.clearTime = clearTime;
    }
    public int getLimitPriceOrderCount(ExchangeOrderDirection direction) {
        int count = 0;
        TreeMap<BigDecimal, MergeOrder> queue = direction == ExchangeOrderDirection.BUY ? buyLimitPriceQueue : sellLimitPriceQueue;
        Iterator<Map.Entry<BigDecimal, MergeOrder>> mergeOrderIterator = queue.entrySet().iterator();
        while (mergeOrderIterator.hasNext()) {
            Map.Entry<BigDecimal, MergeOrder> entry = mergeOrderIterator.next();
            MergeOrder mergeOrder = entry.getValue();
            count += mergeOrder.size();
        }
        return count;
    }
    public OrderCoinService getOrderCoinService() {
        return orderCoinService;
    }
    public void setOrderCoinService(OrderCoinService orderCoinService) {
        this.orderCoinService = orderCoinService;
    }
}
src/main/java/com/xcong/excoin/trade/CoinTraderFactory.java
New file
@@ -0,0 +1,40 @@
package com.xcong.excoin.trade;
import org.springframework.stereotype.Service;
import java.util.concurrent.ConcurrentHashMap;
@Service
public class CoinTraderFactory {
    private ConcurrentHashMap<String, CoinTrader> traderMap;
    public CoinTraderFactory() {
        traderMap = new ConcurrentHashMap<>();
    }
    //添加,已存在的无法添加
    public void addTrader(String symbol, CoinTrader trader) {
        if(!traderMap.containsKey(symbol)) {
            traderMap.put(symbol, trader);
        }
    }
    //重置,即使已经存在也会覆盖
    public void resetTrader(String symbol, CoinTrader trader) {
        traderMap.put(symbol, trader);
    }
    public boolean containsTrader(String symbol) {
        return traderMap.containsKey(symbol);
    }
    public CoinTrader getTrader(String symbol) {
        return traderMap.get(symbol);
    }
    public ConcurrentHashMap<String, CoinTrader> getTraderMap() {
        return traderMap;
    }
}
src/main/java/com/xcong/excoin/trade/ExchangeOrderDirection.java
New file
@@ -0,0 +1,5 @@
package com.xcong.excoin.trade;
public enum ExchangeOrderDirection {
    BUY,SELL;
}
src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
New file
@@ -0,0 +1,27 @@
package com.xcong.excoin.trade;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import java.io.Serializable;
import java.math.BigDecimal;
/**
 * 撮合交易信息
 */
@Data
public class ExchangeTrade implements Serializable{
    private String symbol;
    private BigDecimal price;
    private BigDecimal amount;
    private BigDecimal buyTurnover;
    private BigDecimal sellTurnover;
    private int direction;
    private Long buyOrderId;
    private Long sellOrderId;
    private Long time;
    @Override
    public String toString() {
        return  JSON.toJSONString(this);
    }
}
src/main/java/com/xcong/excoin/trade/MergeOrder.java
New file
@@ -0,0 +1,42 @@
package com.xcong.excoin.trade;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class MergeOrder {
    private List<OrderCoinsEntity> orders = new ArrayList<>();
    //最后位置添加一个
    public void add(OrderCoinsEntity order){
        orders.add(order);
    }
    public OrderCoinsEntity get(){
        return orders.get(0);
    }
    public int size(){
        return orders.size();
    }
    public BigDecimal getPrice(){
        return orders.get(0).getEntrustPrice();
    }
    public Iterator<OrderCoinsEntity> iterator(){
        return orders.iterator();
    }
    public BigDecimal getTotalAmount() {
        BigDecimal total = new BigDecimal(0);
        for(OrderCoinsEntity item : orders) {
            total = total.add(item.getEntrustCnt());
        }
        return total;
    }
}
src/main/java/com/xcong/excoin/trade/TradePlate.java
New file
@@ -0,0 +1,181 @@
package com.xcong.excoin.trade;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.LinkedList;
/**
 * 盘口信息
 */
@Data
@Slf4j
public class TradePlate {
    private LinkedList<TradePlateItem> items;
    //最大深度
    private int maxDepth = 100;
    //方向 订单类型 1、买入2、卖出
    private int  direction;
    private String symbol;
    public TradePlate(){
    }
    public TradePlate(String symbol,int direction) {
        this.direction = direction;
        this.symbol = symbol;
        items = new LinkedList<>();
    }
    public boolean add(OrderCoinsEntity exchangeOrder) {
        //log.info("add TradePlate order={}",exchangeOrder);
        synchronized (items) {
            int index = 0;
            if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
                return false;
            }
            if (exchangeOrder.getOrderType() != direction) {
                return false;
            }
            if (items.size() > 0) {
                for (index = 0; index < items.size(); index++) {
                    TradePlateItem item = items.get(index);
                    if ((exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_BUY && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) > 0)
                            || (exchangeOrder.getTradeType() == OrderCoinsEntity.ORDERTYPE_SELL && item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) < 0)) {
                        continue;
                    } else if (item.getPrice().compareTo(exchangeOrder.getEntrustPrice()) == 0) {
                        BigDecimal deltaAmount = exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt());
                        item.setAmount(item.getAmount().add(deltaAmount));
                        return true;
                    } else {
                        break;
                    }
                }
            }
            if(index < maxDepth) {
                TradePlateItem newItem = new TradePlateItem();
                newItem.setAmount(exchangeOrder.getEntrustCnt().subtract(exchangeOrder.getDealCnt()));
                newItem.setPrice(exchangeOrder.getEntrustPrice());
                items.add(index, newItem);
            }
        }
        return true;
    }
    public void remove(OrderCoinsEntity order,BigDecimal amount) {
        synchronized (items) {
            //log.info("items>>init_size={},orderPrice={}",items.size(),order.getPrice());
            for (int index = 0; index < items.size(); index++) {
                TradePlateItem item = items.get(index);
                if (item.getPrice().compareTo(order.getEntrustPrice()) == 0) {
                    item.setAmount(item.getAmount().subtract(amount));
                    if (item.getAmount().compareTo(BigDecimal.ZERO) <= 0) {
                        items.remove(index);
                    }
                    //log.info("items>>final_size={},itemAmount={},itemPrice={}",items.size(),item.getAmount(),item.getPrice());
                    return;
                }
            }
            log.info("items>>return_size={}",items.size());
        }
    }
    public void remove(OrderCoinsEntity order){
        remove(order,order.getEntrustCnt().subtract(order.getDealCnt()));
    }
    public void setItems(LinkedList<TradePlateItem> items){
        this.items = items;
    }
    public BigDecimal getHighestPrice(){
        if(items.size() == 0) {
            return BigDecimal.ZERO;
        }
        if(direction == OrderCoinsEntity.ORDERTYPE_BUY){
            return items.getFirst().getPrice();
        }
        else{
            return items.getLast().getPrice();
        }
    }
    public int getDepth(){
        return items.size();
    }
    public BigDecimal getLowestPrice(){
        if(items.size() == 0) {
            return BigDecimal.ZERO;
        }
        if(direction == OrderCoinsEntity.ORDERTYPE_BUY){
            return items.getLast().getPrice();
        }
        else{
            return items.getFirst().getPrice();
        }
    }
    /**
     * 获取委托量最大的档位
     * @return
     */
    public BigDecimal getMaxAmount(){
        if(items.size() == 0) {
            return BigDecimal.ZERO;
        }
        BigDecimal amount = BigDecimal.ZERO;
        for(TradePlateItem item:items){
            if(item.getAmount().compareTo(amount)>0){
                amount = item.getAmount();
            }
        }
        return amount;
    }
    /**
     * 获取委托量最小的档位
     * @return
     */
    public BigDecimal getMinAmount(){
        if(items.size() == 0) {
            return BigDecimal.ZERO;
        }
        BigDecimal amount = items.getFirst().getAmount();
        for(TradePlateItem item:items){
            if(item.getAmount().compareTo(amount) < 0){
                amount = item.getAmount();
            }
        }
        return amount;
    }
    public JSONObject toJSON(){
        JSONObject json = new JSONObject();
        json.put("direction",direction);
        json.put("maxAmount",getMaxAmount());
        json.put("minAmount",getMinAmount());
        json.put("highestPrice",getHighestPrice());
        json.put("lowestPrice",getLowestPrice());
        json.put("symbol",getSymbol());
        json.put("items",items);
        return json;
    }
    public JSONObject toJSON(int limit){
        JSONObject json = new JSONObject();
        json.put("direction",direction);
        json.put("maxAmount",getMaxAmount());
        json.put("minAmount",getMinAmount());
        json.put("highestPrice",getHighestPrice());
        json.put("lowestPrice",getLowestPrice());
        json.put("symbol",getSymbol());
        json.put("items",items.size() > limit ? items.subList(0,limit) : items);
        return json;
    }
}
src/main/java/com/xcong/excoin/trade/TradePlateItem.java
New file
@@ -0,0 +1,11 @@
package com.xcong.excoin.trade;
import lombok.Data;
import java.math.BigDecimal;
@Data
public class TradePlateItem {
    private BigDecimal price;
    private BigDecimal amount;
}
src/main/java/com/xcong/excoin/trade/TradePlateModel.java
New file
@@ -0,0 +1,13 @@
package com.xcong.excoin.trade;
import lombok.Data;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@Data
public class TradePlateModel {
    private List<List<BigDecimal>> buy  = new ArrayList<>();
    private List<List<BigDecimal>> sell = new ArrayList<>();
}
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -22,6 +22,8 @@
                return "EOS/USDT";
            case "etcusdt":
                return "ETC/USDT";
            case "nekkusdt":
                return "NEKK/USDT";
            default:
                return null;
        }
@@ -43,6 +45,8 @@
                return "EOS_NEW_PRICE";
            case "ETC/USDT":
                return "ETC_NEW_PRICE";
            case "NEKK/USDT":
                return "NEKK_NEW_PRICE";
            default:
                return null;
        }
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
New file
@@ -0,0 +1,220 @@
package com.xcong.excoin.websocket;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.utils.SpringContextHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Slf4j
@ServerEndpoint(value = "/trade/market")
@Component
public class TradePlateSendWebSocket {
    @Resource
    RedisUtils redisUtils;
    /**
     * 记录当前在线连接数
     */
    private static AtomicInteger onlineCount = new AtomicInteger(0);
    private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>();
    private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>();
    /**
     * 连接建立成功调用的方法
     */
    @OnOpen
    public void onOpen(Session session) {
        onlineCount.incrementAndGet(); // 在线数加1
        log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }
    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose(Session session) {
        onlineCount.decrementAndGet(); // 在线数减1
//        Collection<Map<String, Session>> values = tradeplateClients.values();
//        if(CollectionUtils.isNotEmpty(values)){
//            for(Map<String,Session> map : values){
//                map.remove(session.getId());
//            }
//        }
        log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }
    /**
     * 收到客户端消息后调用的方法
     *
     * @param message 客户端发送过来的消息
     */
    @OnMessage
    public void onMessage(String message, Session session) {
        log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
        // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
        //}
        JSONObject jsonObject = JSON.parseObject(message);
        // 盘口的判断
        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
            String sub = jsonObject.get("sub").toString();
            String symbol = sub.split("\\.")[1];
            symbol = CoinTypeConvert.convert(symbol);
            if (tradeplateClients.containsKey(symbol)) {
                tradeplateClients.get(symbol).put(session.getId(), session);
            } else {
                Map<String, Session> map = new HashMap<>();
                map.put(session.getId(), session);
                tradeplateClients.put(symbol, map);
            }
        }
        // 取消盘口订阅
        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
            // `market.${symbol}.kline.${strPeriod}
            String unsub = jsonObject.get("unsub").toString();
            String[] split = unsub.split("\\.");
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String key = symbol;
            if (tradeplateClients.containsKey(key)) {
                tradeplateClients.get(key).remove(session.getId());
            }
        }
        // 最新K线订阅
        // 根据消息判断这个用户需要订阅哪种数据
        // {sub: `market.${symbol}.kline.${strPeriod}`,
        //            symbol: symbol,
        //            period: strPeriod
        //}
        // 取消订阅 {unsub: xxx(标识)}
        if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
            // 订阅
            String sub = jsonObject.get("sub").toString();
            String[] split = sub.split("\\.");
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String period = split[3];
            String key = symbol + "-" + period;
            if (klineClients.containsKey(key)) {
                // 有这个币种K线
                Map<String, Session> stringSessionMap = klineClients.get(key);
                if (!stringSessionMap.containsKey(session.getId())) {
                    stringSessionMap.put(session.getId(), session);
                }
            } else {
                Map<String, Session> stringSessionMap = new HashMap<>();
                stringSessionMap.put(session.getId(), session);
                klineClients.put(key, stringSessionMap);
            }
        }
        // 取消订阅
        if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
            // `market.${symbol}.kline.${strPeriod}
            String unsub = jsonObject.get("unsub").toString();
            String[] split = unsub.split("\\.");
            String strPeriod = split[3];
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String key = symbol + "-" + strPeriod;
            if (klineClients.containsKey(key)) {
                klineClients.get(key).remove(session.getId());
            }
        }
        // 历史K线订阅
        // {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"}
        if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) {
            String sub = jsonObject.get("req").toString();
            String[] split = sub.split("\\.");
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String period = split[3];
            //String key = symbol+"-"+period;
            // String key = "KINE_BCH/USDT_1week";
            String key = "KINE_{}_{}";
            // 币币k线数据
            key = StrUtil.format(key, symbol, period);
            RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
            Object o = bean.get(key);
            sendMessageHistory(JSON.toJSONString(o), session);
        }
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
    }
    /**
     * 群发消息
     *
     * @param message 消息内容
     */
    public void sendMessagePlate(String message, Session fromSession) {
        if (tradeplateClients.containsKey("nekkusdt")) {
            Map<String, Session> nekk = tradeplateClients.get("nekkusdt");
            for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
                Session toSession = sessionEntry.getValue();
                // 排除掉自己
                //if (!fromSession.getId().equals(toSession.getId())) {
                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                boolean open = toSession.isOpen();
                if (open) {
                    toSession.getAsyncRemote().sendText(message);
                }
                //  }
            }
        }
    }
    public void sendMessageHistory(String message, Session toSession) {
        log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
        boolean open = toSession.isOpen();
        if (open) {
            toSession.getAsyncRemote().sendText(message);
        }
    }
    public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
        String key = symbol + "-" + period;
        if (klineClients.containsKey(key)) {
            Map<String, Session> stringSessionMap = klineClients.get(key);
            for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
                Session toSession = sessionEntry.getValue();
                // 排除掉自己
                //if (!fromSession.getId().equals(toSession.getId())) {
                log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
                boolean open = toSession.isOpen();
                if (open) {
                    toSession.getAsyncRemote().sendText(message);
                }
                //  }
            }
        }
    }
}
src/main/resources/mapper/member/MemberWalletCoinDao.xml
@@ -39,5 +39,21 @@
        where id=#{id}
    </update>
    <update id="updateWalletBalance" parameterType="map">
        update member_wallet_coin
        <set>
            <if test="availableBalance != null">
                available_balance = IFNULL(available_balance, 0) + #{availableBalance},
            </if>
            <if test="totalBalance != null">
                total_balance = IFNULL(total_balance, 0) + #{totalBalance},
            </if>
            <if test="frozenBalance != null">
                frozen_balance = IFNULL(frozen_balance, 0) + #{frozenBalance},
            </if>
        </set>
        where id=#{id}
    </update>
</mapper>
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -11,7 +11,24 @@
         </where>
         order by create_time desc
    </select>
    <select id="selectAllWalletCoinOrderBySymbol"  resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
    <select id="selectOrderCoinDealByTime" resultType="com.xcong.excoin.trade.ExchangeTrade">
        SELECT
            symbol symbol,
            deal_price price,
            symbol_cnt amount,
            deal_amount buyTurnover,
            deal_amount sellTurnover,
            order_type direction,
            create_time time
          from coins_order_deal
          where symbol = #{symbol}
          and order_type  = 1
          and order_status = 3
          and create_time between #{startTime} and #{endTime}
    </select>
    <select id="selectAllWalletCoinOrderBySymbol"
            resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
         select * from coins_order_deal 
         <where>
             <if test="memberId != null  and  memberId  != ''">
src/test/java/com/xcong/excoin/GuijiTest.java