zainali5120
2020-10-14 edadefcc724fa0ec190ba7b6bf63ae49ac80c545
cpv配置
18 files modified
416 ■■■■■ changed files
src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java 18 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/contants/AppContants.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/system/controller/LoginController.java 2 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java 3 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java 5 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 239 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KlineDataUpdateJob.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java 43 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java 20 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java 8 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java 2 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/CoinTrader.java 13 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml 18 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java
@@ -1,7 +1,13 @@
package com.xcong.excoin.common.aop;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.asymmetric.KeyType;
import cn.hutool.crypto.asymmetric.RSA;
import com.xcong.excoin.common.LoginUserUtils;
import com.xcong.excoin.common.annotations.SubmitRepeat;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.common.response.Result;
import com.xcong.excoin.configurations.properties.SecurityProperties;
import com.xcong.excoin.utils.MessageSourceUtils;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
@@ -27,6 +33,8 @@
    @Resource
    private RedisUtils redisUtil;
    @Resource
    private SecurityProperties securityProperties;
    private String key;
@@ -44,9 +52,15 @@
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
        HttpServletRequest request = attributes.getRequest();
        String token = request.getHeader("token");
       //String token = request.getHeader("token");
        String bearerToken = request.getHeader(AppContants.TOKEN_HEADER);
        String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, "");
        RSA rsa = new RSA(securityProperties.getPrivateKey(), null);
        String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_");
        String token = tokens[0];
        String uri = request.getRequestURI();
        String mId = (String) redisUtil.get(token);
        Long mId = LoginUserUtils.getAppLoginUser().getId();
        //String mId = (String) redisUtil.get(token);
        log.info("#token : {}, uri : {}, mId : {}#", token, uri, mId);
        key = mId + "_" + uri;
        boolean flag = redisUtil.setNotExist(key, "1", 5);
src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -76,4 +76,9 @@
    public static final String TIME_OUT = "time_out";
    /**
     *  取消订单id
     */
    public static final String ORDER_CANCEL_KEY = "COIN_ORDER_CANCEL_";
}
src/main/java/com/xcong/excoin/common/system/controller/LoginController.java
@@ -111,7 +111,7 @@
        return rsa.encryptBase64(token + "_" + System.currentTimeMillis(), KeyType.PublicKey);
    }
    @SubmitRepeat
    //@SubmitRepeat
    @ApiOperation(value = "app注册接口", notes = "app注册接口,验证码必须输入可默认为123456")
    @PostMapping(value = "/register")
    public Result register(@RequestBody @Validated RegisterDto registerDto) {
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -121,6 +121,12 @@
    public static final String ROUTING_KEY_ROC_ORDER_CANCEL  = "roc-order-routingKey-cancel";
    public static final String EXCHANGE_ROC_ORDER_COMPLETE = "roc-exchange-order-complete";
    public static final String QUEUE_ROC_ORDER_COMPLETE = "roc-order-queue-complete";
    public static final String ROUTING_KEY_ROC_ORDER_COMPLETE  = "roc-order-routingKey-complete";
    @Resource
    private ConnectionFactory connectionFactory;
@@ -189,6 +195,22 @@
    public Binding bindingCancelOrder() {
        return BindingBuilder.bind(ordereCancelQueue()).to(orderCancelExchange()).with(ROUTING_KEY_ROC_ORDER_CANCEL);
    }
   // 交易订单
    @Bean
    public DirectExchange orderCompleteExchange() {
        return new DirectExchange(EXCHANGE_ROC_ORDER_COMPLETE);
    }
    @Bean
    public Queue ordereCompleteQueue() {
        return new Queue(QUEUE_ROC_ORDER_COMPLETE, true);
    }
    @Bean
    public Binding bindingCompleteOrder() {
        return BindingBuilder.bind(ordereCompleteQueue()).to(orderCompleteExchange()).with(ROUTING_KEY_ROC_ORDER_COMPLETE);
    }
   @Bean
src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java
@@ -53,8 +53,9 @@
                .antMatchers("/api/member/getAppVersionInfo").permitAll()
                .antMatchers("/api/orderCoin/searchSymbolResultList").permitAll()
                .antMatchers("/api/orderCoin/findCollect").permitAll()
                .antMatchers("/trade/**").permitAll()
                .antMatchers("/api/orderCoin/deal/list").permitAll()
                .antMatchers("/api/helpCenter/**").permitAll()
                .antMatchers("/trade/**").permitAll()
                .anyRequest().authenticated()
                .and().apply(securityConfiguereAdapter());
    }
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -6,6 +6,7 @@
import javax.validation.Valid;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.annotations.SubmitRepeat;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.symbols.constants.SymbolsConstats;
import org.springframework.validation.annotation.Validated;
@@ -61,8 +62,9 @@
     */
    @ApiOperation(value = "提交买卖订单", notes = "提交买卖订单")
    @PostMapping(value="/submitSalesWalletCoinOrder")
    @SubmitRepeat
    public Result submitSalesWalletCoinOrder(@RequestBody @Valid SubmitSalesWalletCoinOrderDto submitSalesWalletCoinOrderDto) {
        log.info("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto));
        log.debug("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto));
        String symbol = submitSalesWalletCoinOrderDto.getSymbol();
        Integer type = submitSalesWalletCoinOrderDto.getType();
        Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType();
@@ -97,6 +99,7 @@
     */
    @ApiOperation(value = "撤销委托订单", notes = "撤销委托订单")
    @PostMapping(value="/cancelEntrustWalletCoinOrder")
    @SubmitRepeat
    public Result cancelEntrustWalletCoinOrder(@RequestBody @Valid CancelEntrustWalletCoinOrderDto cancelEntrustWalletCoinOrderDto) {
        String orderId = cancelEntrustWalletCoinOrderDto.getOrderId();
        // 根据不同币种
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
@@ -21,4 +21,7 @@
    List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list);
    void updateDeal(@Param("id") Long id, @Param("dealCnt")BigDecimal dealCnt,@Param("dealAmount")BigDecimal dealAmount);
    void batchUpdateStatus(@Param("list")List<Long> list,@Param("status") Integer status);
    void updateStatus(@Param("id")Long id,@Param("status") Integer status);
}
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -52,6 +52,10 @@
    public void handleOrder(List<ExchangeTrade> trades);
    public void completeOrder(List<OrderCoinsEntity> trades);
    void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
                    BigDecimal amount,BigDecimal entrustAmount);
    /**
     *  撮合交易单的撤销方法
@@ -60,4 +64,5 @@
     */
    public Result cancelEntrustWalletCoinOrderForMatch(String orderId);
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -8,6 +8,7 @@
import javax.annotation.Resource;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.common.enumerates.CoinTypeEnum;
import com.xcong.excoin.modules.blackchain.service.RocService;
import com.xcong.excoin.modules.coin.mapper.OrderCoinsDealMapper;
@@ -344,7 +345,7 @@
        Long memberId = LoginUserUtils.getAppLoginUser().getId();
        // 需要实名
        MemberEntity memberEntity = memberDao.selectById(memberId);
        if(!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())){
        if (!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())) {
            return Result.fail(MessageSourceUtils.getString("member_controller_0001"));
        }
        BigDecimal nowPriceinBigDecimal = price;
@@ -382,13 +383,11 @@
            entrustAmount = price.multiply(amount);
        } else {
            // 市价
            if(OrderCoinsEntity.ORDERTYPE_BUY==type){
            if (OrderCoinsEntity.ORDERTYPE_BUY == type) {
                closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio());
                totalPayPrice = entrustAmount.add(closingPrice);
            }
        }
        // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
        String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue();
        MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode);
        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
@@ -454,24 +453,12 @@
        //冻结相应的资产
        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
            //如果是买入,所对应的币种增加,USDT账户减少金额
//            BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice);
//            BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice);
//            walletCoinUsdt.setAvailableBalance(availableBalance);
//            walletCoinUsdt.setFrozenBalance(frozenBalance);
//            memberWalletCoinDao.updateById(walletCoinUsdt);
            memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(),totalPayPrice.negate(),totalPayPrice.negate(),entrustAmount);
            memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(), totalPayPrice.negate(), totalPayPrice.negate(), entrustAmount);
        } else {
            //如果是卖出,币种减少,USDT增加
//            BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount);
//            BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount);
//            walletCoin.setAvailableBalance(availableBalance);
//            walletCoin.setFrozenBalance(frozenBalance);
//            memberWalletCoinDao.updateById(walletCoin);
            memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount);
            memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), amount.negate(), amount.negate(), amount);
        }
        // 加入到撮合 TODO  通过消息队列发送到交易撮合
        //CoinTrader trader = factory.getTrader(symbol);
        //trader.trade(order);
        // 加入到撮合
        order.setSymbol(symbol);
        orderSubmitProducer.sendMsg(JSONObject.toJSONString(order));
        return Result.ok(MessageSourceUtils.getString("order_service_0011"));
@@ -512,17 +499,28 @@
    @Override
    @Transactional
    public Result cancelEntrustWalletCoinOrder(String orderId) {
        // 将这个取消放入redis
        boolean b = redisUtils.setNotExist(AppContants.ORDER_CANCEL_KEY + orderId, orderId, 10);
        if (!b) {
            return Result.ok(MessageSourceUtils.getString("order_service_0012"));
        }
        //获取用户ID
        Long memberId = LoginUserUtils.getAppLoginUser().getId();
        OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId);
        if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId) ) {
        if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId)) {
            // 如果是撮合交易单
            if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) {
                // 这里先更新状态 判断状态 防止消息发送过程中的二次提交
                if (!orderCoinsEntity.getOrderStatus().equals(OrderCoinsEntity.ORDERSTATUS_DODING)) {
                    // 不是持仓中 返回
                    return Result.ok(MessageSourceUtils.getString("order_service_0013"));
                }
                // 更新为已取消(可能在这个过程中  这个单已经成交)
                orderSubmitProducer.sendCancelMsg(orderId);
               // return this.cancelEntrustWalletCoinOrderForMatch(orderId);
                return Result.ok(MessageSourceUtils.getString("order_service_0013"));
            }
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) {
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                return Result.fail(MessageSourceUtils.getString("order_service_0012"));
            }
            orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL);
@@ -597,21 +595,31 @@
    @Override
    @Transactional
    public Result cancelEntrustWalletCoinOrderForMatch(String orderId) {
        //获取用户ID
        //如果redis中没有这个单 则不再往下走
        OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId);
        if(orderCoinsEntity==null){
        if (orderCoinsEntity == null) {
            return Result.ok("");
        }
        Long memberId = orderCoinsEntity.getMemberId();
        // 取消撮合订单的单
        CoinTrader trader = factory.getTrader(orderCoinsEntity.getSymbol());
        trader.cancelOrder(orderCoinsEntity);
        if (ObjectUtil.isNotEmpty(orderCoinsEntity) ) {
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) {
        // 从撮合交易系统得到的已成交的数据
        OrderCoinsEntity coinsEntityCancel = trader.cancelOrder(orderCoinsEntity);
        if (coinsEntityCancel == null) {
            // 此时说明撮合系统已经没这个单了 不需要继续处理
            return null;
        }
        if (ObjectUtil.isNotEmpty(orderCoinsEntity)) {
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                // 已完成的直接返回
                return Result.fail(MessageSourceUtils.getString("order_service_0012"));
            }
            orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL);
            orderCoinsDao.updateById(orderCoinsEntity);
            OrderCoinsEntity update = new OrderCoinsEntity();
            update.setId(Long.valueOf(orderId));
            update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL);
            //orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL);
            orderCoinsDao.updateById(update);
            String symbol = orderCoinsEntity.getSymbol();
@@ -623,10 +631,10 @@
            detail.setTradeType(orderCoinsEntity.getTradeType());
            detail.setSymbol(symbol);
            detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_CANCEL);
            detail.setSymbolCnt(orderCoinsEntity.getEntrustCnt());
            detail.setSymbolCnt(BigDecimal.ZERO);
            detail.setEntrustPrice(orderCoinsEntity.getEntrustPrice());
            detail.setDealPrice(orderCoinsEntity.getDealPrice());
            detail.setDealAmount(orderCoinsEntity.getDealAmount());
            detail.setDealPrice(BigDecimal.ZERO);
            detail.setDealAmount(BigDecimal.ZERO);
            detail.setFeeAmount(orderCoinsEntity.getFeeAmount());
            if (OrderCoinsEntity.ORDERTYPE_BUY.equals(orderCoinsEntity.getOrderType())) {
                //如果是限价买入,撤单将USDT账户冻结金额返回
@@ -637,13 +645,7 @@
                    //手续费 = 开仓价*数量*手续费率
                    //返还金额=开仓价*未成交数量+手续费
                    // 这里根据成交的单计算
                    List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(Long.valueOf(orderId));
                    BigDecimal dealAmount = BigDecimal.ZERO;
                    if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){
                        for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) {
                            dealAmount = dealAmount.add(orderCoinsDealEntity.getDealAmount());
                        }
                    }
                    BigDecimal dealAmount = coinsEntityCancel.getDealAmount();
                    // 市价的按成交额退款
                    BigDecimal returnBalance = orderCoinsEntity.getEntrustAmount().subtract(dealAmount);
@@ -657,7 +659,7 @@
                        returnFee = orderCoinsEntity.getFeeAmount().subtract(needFee);
                    }
                    BigDecimal avi = returnBalance.add(returnFee);
                    memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),avi,null,returnBalance.negate());
                    memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), avi, null, returnBalance.negate());
                    walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance).add(returnFee));
                    walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance));
                    //memberWalletCoinDao.updateById(walletCoin);
@@ -677,10 +679,11 @@
                MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
                if (ObjectUtil.isNotEmpty(walletCoin)) {
                    // 卖出按卖出的数量计算手续费
                    BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(orderCoinsEntity.getDealCnt());
                    BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(coinsEntityCancel.getDealCnt());
                    walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance));
                    walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance));
                    memberWalletCoinDao.updateById(walletCoin);
                    //memberWalletCoinDao.updateById(walletCoin);
                    memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), returnBalance, null, returnBalance.negate());
                    // 流水记录
                    MemberAccountFlowEntity record = new MemberAccountFlowEntity();
                    record.setSource(MemberAccountFlowEntity.SOURCE_CANCEL);
@@ -940,36 +943,14 @@
            BigDecimal price = exchangeTrade.getPrice();
            // 卖单
            Long sellOrderId = exchangeTrade.getSellOrderId();
            // 买卖单都需要处理
            // 买单
            OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId);
            if(buyOrderCoinsEntity==null){
                return;
            }
            BigDecimal buyEntrustCnt = buyOrderCoinsEntity.getEntrustCnt();
            if(buyEntrustCnt==null){
                buyEntrustCnt = BigDecimal.ZERO;
            }
            Long memberId = buyOrderCoinsEntity.getMemberId();
            if (buyOrderCoinsEntity != null) {
                List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(buyOrderId);
                // 比较剩余的量
                BigDecimal dealAmount = BigDecimal.ZERO;
                BigDecimal dealCnt = BigDecimal.ZERO;
                if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){
                    for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) {
                        dealAmount=dealAmount.add(orderCoinsDealEntity.getDealAmount());
                        dealCnt = dealCnt.add(orderCoinsDealEntity.getSymbolCnt());
                    }
                }
                // 单的总金额
                BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount();
                BigDecimal add = dealAmount.add(buyTurnover);
                BigDecimal closingPrice = buyTurnover.multiply(new BigDecimal("0.002"));
                //成交总量
                 dealCnt = dealCnt.add(amount);
                // 创建一个完成的单
                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
                detail.setMemberId(buyOrderCoinsEntity.getMemberId());
@@ -984,36 +965,37 @@
                detail.setDealAmount(buyTurnover);
                detail.setFeeAmount(closingPrice);
                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
                // 如果这个单在取消状态 则不执行
                orderCoinDealDao.insert(detail);
                // 如果这个单成交完  更改状态
                if (add.compareTo(entrustAmount) >= 0 ||(buyEntrustCnt.compareTo(BigDecimal.ZERO)>0 &&dealCnt.compareTo(buyEntrustCnt)>=0) ) {
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(buyOrderId);
                    update.setDealAmount(entrustAmount);
                    update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                    // 限价买入时,如果成交价比设置的价格低,需要退还多余的冻结
                    OrderCoinsEntity coinsEntity = orderCoinsDao.selectById(buyOrderId);
                    BigDecimal subtract = coinsEntity.getEntrustAmount().subtract(coinsEntity.getDealAmount());
                    if(subtract.compareTo(BigDecimal.ZERO)>=0){
                        // 下单扣的比较多
                        memberWalletCoinDao.updateWalletBalance(coinsEntity.getId(),subtract,subtract,subtract.negate());
                    }
                } else {
                    // 更新买单
                    orderCoinsDao.updateDeal(buyOrderId,amount,buyTurnover);
                // 更新买单
                //orderCoinsDao.updateDeal(buyOrderId, amount, buyTurnover);
                // 买币扣除冻结usdt 增加币种的可用
                MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue());
                if (usdtWallet != null) {
                    // 减少usdt冻结
                    memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate());
                }
                // 增加买的币
                MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol());
                if (buySymbolWallet != null) {
                    memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null);
                }
                // 流水记录
                MemberAccountFlowEntity record = new MemberAccountFlowEntity();
                record.setMemberId(buyOrderCoinsEntity.getMemberId());
                record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate());
                record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol());
                record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount);
                record.setSymbol(buyOrderCoinsEntity.getSymbol());
                record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover));
                memberAccountFlowEntityDao.insert(record);
            }
            // 卖单
            OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId);
            if (sellOrderCoinsEntity != null) {
                // 比较剩余的量
                BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt();
                // 单的总数量
                BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt();
                BigDecimal add = dealAmount.add(amount);
                // 创建一个完成的单
                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
                detail.setMemberId(sellOrderCoinsEntity.getMemberId());
@@ -1030,46 +1012,7 @@
                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
                orderCoinDealDao.insert(detail);
                // 如果这个单成交完  更改状态
                if (add.compareTo(entrustCnt) >= 0) {
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(sellOrderId);
                    // 总成交额
                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
                    update.setDealCnt(entrustCnt);
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                } else {
                    // 未完成
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(sellOrderId);
                    // 总成交额
                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
                    update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount));
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                }
                // 买币扣除冻结usdt 增加币种的可用
                MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue());
                if (usdtWallet != null) {
                    // 减少usdt冻结
                    memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate());
                }
                // 增加买的币
                MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol());
                if (buySymbolWallet != null) {
                    memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null);
                }
                // 流水记录
                MemberAccountFlowEntity record = new MemberAccountFlowEntity();
                record.setMemberId(buyOrderCoinsEntity.getMemberId());
                record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate());
                record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol());
                record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount);
                record.setSymbol(buyOrderCoinsEntity.getSymbol());
                record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover));
                memberAccountFlowEntityDao.insert(record);
                //orderCoinsDao.updateDeal(sellOrderId, amount, buyTurnover);
                // 卖家需要减少冻结的币种  增加usdt
                MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol());
                if (memberWalletCoinEntity != null) {
@@ -1085,13 +1028,49 @@
                MemberAccountFlowEntity recordSell = new MemberAccountFlowEntity();
                recordSell.setMemberId(sellOrderCoinsEntity.getMemberId());
                recordSell.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN));
                recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + buyOrderCoinsEntity.getSymbol());
                recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + buyOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString());
                recordSell.setSymbol(buyOrderCoinsEntity.getSymbol());
                recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + sellOrderCoinsEntity.getSymbol());
                recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + sellOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString());
                recordSell.setSymbol(sellOrderCoinsEntity.getSymbol());
                recordSell.setBalance(sellWalletCoinEntity.getAvailableBalance().add(buyTurnover));
                memberAccountFlowEntityDao.insert(recordSell);
            }
        }
    }
    @Override
    @Transactional
    public void completeOrder(List<OrderCoinsEntity> trades) {
        // 订单完成 更新他们的状态
        List<Long> ids = new ArrayList<>();
        if (CollectionUtils.isNotEmpty(trades)) {
            for (OrderCoinsEntity trade : trades) {
                if (trade != null) {
                    //orderCoinsDao.updateStatus(trade.getId(),OrderCoinsEntity.ORDERSTATUS_DONE);
                    ids.add(trade.getId());
                    // 买单 实际成交金额小于委托的 这一部分从冻结扣除
                    if(OrderCoinsEntity.ORDERTYPE_BUY==trade.getOrderType()){
                        if(trade.getEntrustAmount().compareTo(trade.getDealAmount())>0){
                            // 此时退回这部分的差额
                            BigDecimal subtract = trade.getEntrustAmount().subtract(trade.getDealAmount());
                            MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(trade.getMemberId(), CoinTypeEnum.USDT.name());
                            if(memberWalletCoinEntity!=null){
                                memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),subtract,null,subtract.negate());
                            }
                        }
                    }
                }
            }
        }
        if (CollectionUtils.isNotEmpty(ids)) {
            orderCoinsDao.batchUpdateStatus(ids, OrderCoinsEntity.ORDERSTATUS_DONE);
        }
    }
    @Override
    public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
                           BigDecimal amount, BigDecimal entrustAmount) {
    }
}
src/main/java/com/xcong/excoin/quartz/job/KlineDataUpdateJob.java
@@ -150,7 +150,4 @@
        }
    }
    public void updateTodayLine(){
        symbolsService.updateSymbolsKine("1day");
    }
}
src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -38,27 +38,32 @@
    @PostConstruct
    public void initNewestPrice() {
        log.info("#=======价格更新开启=======#");
        SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
        subscriptionOptions.setConnectionDelayOnFailure(5);
        subscriptionOptions.setUri("wss://api.hadax.com/ws");
        SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
        subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
            String symbol = tradeEvent.getSymbol();
            // 根据symbol判断做什么操作
            symbol = CoinTypeConvert.convert(symbol);
            if (null != symbol) {
                String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
                // TODO 测试环境关闭这个插入redis
                redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
                // 比较
                //websocketPriceService.comparePriceAsc(symbol, price);
                //websocketPriceService.comparePriceDesc(symbol, price);
                //System.out.println("比较完毕:"+symbol+"-"+price);
        try{
            log.info("#=======价格更新开启=======#");
            SubscriptionOptions subscriptionOptions = new SubscriptionOptions();
            subscriptionOptions.setConnectionDelayOnFailure(5);
            subscriptionOptions.setUri("wss://api.hadax.com/ws");
            SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions);
            subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> {
                String symbol = tradeEvent.getSymbol();
                // 根据symbol判断做什么操作
                symbol = CoinTypeConvert.convert(symbol);
                if (null != symbol) {
                    String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString();
                    // TODO 测试环境关闭这个插入redis
                    redisUtils.set(CoinTypeConvert.convertToKey(symbol), price);
                    // 比较
                    //websocketPriceService.comparePriceAsc(symbol, price);
                    //websocketPriceService.comparePriceDesc(symbol, price);
                    //System.out.println("比较完毕:"+symbol+"-"+price);
            }
                }
        });
            });
        }catch (Exception e){
        }
//        subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> {
//            Candlestick data = candlestickEvent.getData();
//            redisUtils.set(CoinTypeConvert.convert(candlestickEvent.getSymbol()), data);
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -4,6 +4,7 @@
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.modules.exchange.service.HandleKlineService;
import com.xcong.excoin.trade.ExchangeTrade;
@@ -71,6 +72,8 @@
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        // 先处理处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
        // 处理K线 并更新最新价
        handleKlineService.handleExchangeOrderToKline(exchangeTrades);
        // 推送最新K线
@@ -112,8 +115,21 @@
            newCandlestick.setTick(model);
            tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null);
        }
        // 处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
    }
    /**
     *  撮合交易订单全部完成
     * @param content
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE)
    public void doComplete(String content) {
        log.debug("#完成的订单---->{}#", content);
        List<OrderCoinsEntity> exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class);
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        orderCoinService.completeOrder(exchangeTrades);
    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java
@@ -6,12 +6,16 @@
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.trade.ExchangeTrade;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Iterator;
import java.util.List;
/**
 *  提交买卖单进入撮合系统
@@ -42,7 +46,9 @@
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_CANCEL)
    public void doCancel(String content) {
        log.info("#取消的订单---->{}#", content);
        log.debug("#取消的订单---->{}#", content);
        orderCoinService.cancelEntrustWalletCoinOrderForMatch(content);
    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -37,7 +37,7 @@
    @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS)
    public void addUsdtAddress(String content) {
        if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){
            log.info("#添加新地址---->{}#", content);
            log.debug("#添加新地址---->{}#", content);
            UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(content);
        }
    }
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
@@ -40,6 +40,12 @@
    }
    public void sendCompleteMsg(String content) {
        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_COMPLETE, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_COMPLETE, content, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //log.info("#----->{}#", correlationData);
src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java
@@ -35,6 +35,7 @@
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("#----->{}#", correlationData);
src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -2,11 +2,8 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -313,7 +310,7 @@
                    if (trade != null) {
                        exchangeTrades.add(trade);
                    }
                    //判断匹配单是否完成 TODO
                    //判断匹配单是否完成
                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //当前匹配的订单完成交易,删除该订单
                        orderIterator.remove();
@@ -355,7 +352,7 @@
     */
    private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) {
        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //剩余成交量 TODO ?
            //剩余成交量
            // 委托量-成交量=剩余量
            BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
            return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN);
@@ -517,11 +514,11 @@
                for (int index = 0; index < size; index += maxSize) {
                    int length = (size - index) > maxSize ? maxSize : size - index;
                    List<OrderCoinsEntity> subOrders = orders.subList(index, index + length);
                    // TODO 通知订单完成
                    //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders));
                    //  通知订单完成
                    exchangeProducer.sendCompleteMsg(JSON.toJSONString(subOrders));
                }
            } else {
                // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders));
                exchangeProducer.sendCompleteMsg(JSON.toJSONString(orders));
            }
        }
    }
src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
@@ -8,11 +8,12 @@
    </select>
    
    <select id="findCoinOrderListByMemberIdAndSysmbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
        SELECT * FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status}
        SELECT (select sum(symbol_cnt) from coins_order_deal where order_id = a.id) as deal_cnt, a.create_by,a.create_time,    a.update_by,    a.update_time,    a.version,a.id,    a.member_id,    a.order_no,    a.order_type,    a.symbol,    a.mark_price,    a.entrust_cnt,    a.entrust_price,    a.deal_price,    a.deal_amount,    a.order_status,    a.trade_type,    a.fee_amount,    a.entrust_amount
        FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status}
            <if test="symbol != null and symbol !=''">
                 and a.symbol = #{symbol}
            </if>
            order by create_time desc
            order by a.create_time desc
    </select>
    
    <select id="findWalletCoinOrderByOrderNo" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
@@ -57,4 +58,17 @@
        </set>
        where id = #{id}
    </update>
    <update id="batchUpdateStatus" parameterType="map">
        update coins_order set order_status = #{status}
        where id in
        <foreach collection="list" item="item" separator="," open="(" close=")">
            #{item}
        </foreach>
    </update>
    <update id="updateStatus" parameterType="map">
        update coins_order set order_status = #{status}
        where id =#{id}
    </update>
</mapper>