zainali5120
2020-10-14 32b5de4af771edfaa67197808882512ca7e30120
ROC交易所交易问题修复
16 files modified
444 ■■■■■ changed files
src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/contants/AppContants.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 255 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java 20 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/CoinTrader.java 13 ●●●●● patch | view | raw | blame | history
src/main/resources/application-test.yml 27 ●●●● patch | view | raw | blame | history
src/main/resources/application.yml 21 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml 18 ●●●● patch | view | raw | blame | history
src/test/java/com/xcong/excoin/TradeTest.java 37 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java
@@ -47,10 +47,10 @@
        String token = request.getHeader("token");
        String uri = request.getRequestURI();
        String mId = (String) redisUtil.get(token);
        log.info("#token : {}, uri : {}, mId : {}#", token, uri, mId);
        log.debug("#token : {}, uri : {}, mId : {}#", token, uri, mId);
        key = mId + "_" + uri;
        boolean flag = redisUtil.setNotExist(key, "1", 5);
        log.info("#mid : {}, flag : {}#", mId, flag);
        log.debug("#mid : {}, flag : {}#", mId, flag);
        if (flag) {
            Object result = joinPoint.proceed();
            redisUtil.del(key);
src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -76,4 +76,9 @@
    public static final String TIME_OUT = "time_out";
    /**
     *  取消订单id
     */
    public static final String ORDER_CANCEL_KEY = "COIN_ORDER_CANCEL_";
}
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -121,6 +121,12 @@
    public static final String ROUTING_KEY_ROC_ORDER_CANCEL  = "roc-order-routingKey-cancel";
    public static final String EXCHANGE_ROC_ORDER_COMPLETE = "roc-exchange-order-complete";
    public static final String QUEUE_ROC_ORDER_COMPLETE = "roc-order-queue-complete";
    public static final String ROUTING_KEY_ROC_ORDER_COMPLETE  = "roc-order-routingKey-complete";
    @Resource
    private ConnectionFactory connectionFactory;
@@ -189,6 +195,22 @@
    public Binding bindingCancelOrder() {
        return BindingBuilder.bind(ordereCancelQueue()).to(orderCancelExchange()).with(ROUTING_KEY_ROC_ORDER_CANCEL);
    }
   // 交易订单
    @Bean
    public DirectExchange orderCompleteExchange() {
        return new DirectExchange(EXCHANGE_ROC_ORDER_COMPLETE);
    }
    @Bean
    public Queue ordereCompleteQueue() {
        return new Queue(QUEUE_ROC_ORDER_COMPLETE, true);
    }
    @Bean
    public Binding bindingCompleteOrder() {
        return BindingBuilder.bind(ordereCompleteQueue()).to(orderCompleteExchange()).with(ROUTING_KEY_ROC_ORDER_COMPLETE);
    }
   @Bean
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -6,6 +6,7 @@
import javax.validation.Valid;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.annotations.SubmitRepeat;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.symbols.constants.SymbolsConstats;
import org.springframework.validation.annotation.Validated;
@@ -61,6 +62,7 @@
     */
    @ApiOperation(value = "提交买卖订单", notes = "提交买卖订单")
    @PostMapping(value="/submitSalesWalletCoinOrder")
    @SubmitRepeat
    public Result submitSalesWalletCoinOrder(@RequestBody @Valid SubmitSalesWalletCoinOrderDto submitSalesWalletCoinOrderDto) {
        log.info("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto));
        String symbol = submitSalesWalletCoinOrderDto.getSymbol();
@@ -97,6 +99,7 @@
     */
    @ApiOperation(value = "撤销委托订单", notes = "撤销委托订单")
    @PostMapping(value="/cancelEntrustWalletCoinOrder")
    @SubmitRepeat
    public Result cancelEntrustWalletCoinOrder(@RequestBody @Valid CancelEntrustWalletCoinOrderDto cancelEntrustWalletCoinOrderDto) {
        String orderId = cancelEntrustWalletCoinOrderDto.getOrderId();
        // 根据不同币种
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
@@ -21,4 +21,7 @@
    List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list);
    void updateDeal(@Param("id") Long id, @Param("dealCnt")BigDecimal dealCnt,@Param("dealAmount")BigDecimal dealAmount);
    void batchUpdateStatus(@Param("list")List<Long> list,@Param("status") Integer status);
    void updateStatus(@Param("id")Long id,@Param("status") Integer status);
}
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -52,6 +52,8 @@
    public void handleOrder(List<ExchangeTrade> trades);
    public void completeOrder(List<OrderCoinsEntity> trades);
    void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
                    BigDecimal amount,BigDecimal entrustAmount);
@@ -62,4 +64,5 @@
     */
    public Result cancelEntrustWalletCoinOrderForMatch(String orderId);
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -8,6 +8,7 @@
import javax.annotation.Resource;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.common.enumerates.CoinTypeEnum;
import com.xcong.excoin.modules.blackchain.service.RocService;
import com.xcong.excoin.modules.coin.mapper.OrderCoinsDealMapper;
@@ -353,27 +354,27 @@
            if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
                // 不能超过800个
                if(amount!=null && amount.compareTo(new BigDecimal("800"))>0){
                    return Result.fail("买入额度受限");
                }
                BigDecimal bigDecimal = orderCoinDealDao.sumTodayBuyAmount(memberId, symbol);
                if(bigDecimal==null){
                    bigDecimal= BigDecimal.ZERO;
                }
                amount= amount==null?BigDecimal.ZERO:amount;
                bigDecimal = bigDecimal.add(amount);
                if(bigDecimal!=null && bigDecimal.compareTo(new BigDecimal("800"))>0){
                    return Result.fail("买入额度受限");
                }
                // 挂单不能超过800
                BigDecimal bigDecimal1 = orderCoinDealDao.sumTodayEntrustCntBuyAmount(memberId, symbol);
                if(bigDecimal1==null){
                    bigDecimal1=BigDecimal.ZERO;
                }
                bigDecimal1 = bigDecimal1.add(amount);
                if(bigDecimal1!=null && bigDecimal1.compareTo(new BigDecimal("800"))>0){
                    return Result.fail("买入额度受限");
                }
//                if (amount != null && amount.compareTo(new BigDecimal("800")) > 0) {
//                    return Result.fail("买入额度受限");
//                }
//                BigDecimal bigDecimal = orderCoinDealDao.sumTodayBuyAmount(memberId, symbol);
//                if (bigDecimal == null) {
//                    bigDecimal = BigDecimal.ZERO;
//                }
//                amount = amount == null ? BigDecimal.ZERO : amount;
//                bigDecimal = bigDecimal.add(amount);
//                if (bigDecimal != null && bigDecimal.compareTo(new BigDecimal("800")) > 0) {
//                    return Result.fail("买入额度受限");
//                }
//                // 挂单不能超过800
//                BigDecimal bigDecimal1 = orderCoinDealDao.sumTodayEntrustCntBuyAmount(memberId, symbol);
//                if (bigDecimal1 == null) {
//                    bigDecimal1 = BigDecimal.ZERO;
//                }
//                bigDecimal1 = bigDecimal1.add(amount);
//                if (bigDecimal1 != null && bigDecimal1.compareTo(new BigDecimal("800")) > 0) {
//                    return Result.fail("买入额度受限");
//                }
            }else{
                return Result.fail("卖出受限");
            }
@@ -419,8 +420,6 @@
                totalPayPrice = entrustAmount.add(closingPrice);
            }
        }
        // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
        String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue();
        MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode);
        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
@@ -486,24 +485,12 @@
        //冻结相应的资产
        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
            //如果是买入,所对应的币种增加,USDT账户减少金额
//            BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice);
//            BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice);
//            walletCoinUsdt.setAvailableBalance(availableBalance);
//            walletCoinUsdt.setFrozenBalance(frozenBalance);
//            memberWalletCoinDao.updateById(walletCoinUsdt);
            memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(),totalPayPrice.negate(),totalPayPrice.negate(),entrustAmount);
        } else {
            //如果是卖出,币种减少,USDT增加
//            BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount);
//            BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount);
//            walletCoin.setAvailableBalance(availableBalance);
//            walletCoin.setFrozenBalance(frozenBalance);
//            memberWalletCoinDao.updateById(walletCoin);
            memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount);
        }
        // 加入到撮合 TODO  通过消息队列发送到交易撮合
        //CoinTrader trader = factory.getTrader(symbol);
        //trader.trade(order);
        // 加入到撮合
        order.setSymbol(symbol);
        orderSubmitProducer.sendMsg(JSONObject.toJSONString(order));
        return Result.ok(MessageSourceUtils.getString("order_service_0011"));
@@ -544,14 +531,25 @@
    @Override
    @Transactional
    public Result cancelEntrustWalletCoinOrder(String orderId) {
        // 将这个取消放入redis
        boolean b = redisUtils.setNotExist(AppContants.ORDER_CANCEL_KEY + orderId, orderId, 10);
        if (!b) {
            return Result.ok(MessageSourceUtils.getString("order_service_0012"));
        }
        //获取用户ID
        Long memberId = LoginUserUtils.getAppLoginUser().getId();
        OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId);
        if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId) ) {
            // 如果是撮合交易单
            if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) {
                // 这里先更新状态 判断状态 防止消息发送过程中的二次提交
                if (!orderCoinsEntity.getOrderStatus().equals(OrderCoinsEntity.ORDERSTATUS_DODING)) {
                    // 不是持仓中 返回
                    return Result.ok(MessageSourceUtils.getString("order_service_0013"));
                }
                // 更新为已取消(可能在这个过程中  这个单已经成交)
                orderSubmitProducer.sendCancelMsg(orderId);
               // return this.cancelEntrustWalletCoinOrderForMatch(orderId);
                return Result.ok(MessageSourceUtils.getString("order_service_0013"));
            }
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) {
@@ -629,7 +627,7 @@
    @Override
    @Transactional
    public Result cancelEntrustWalletCoinOrderForMatch(String orderId) {
        //获取用户ID
        //如果redis中没有这个单 则不再往下走
        OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId);
        if(orderCoinsEntity==null){
            return Result.ok("");
@@ -637,13 +635,23 @@
        Long memberId = orderCoinsEntity.getMemberId();
        // 取消撮合订单的单
        CoinTrader trader = factory.getTrader(orderCoinsEntity.getSymbol());
        trader.cancelOrder(orderCoinsEntity);
        // 从撮合交易系统得到的已成交的数据
        OrderCoinsEntity coinsEntityCancel = trader.cancelOrder(orderCoinsEntity);
        if (coinsEntityCancel == null) {
            // 此时说明撮合系统已经没这个单了 不需要继续处理
            return null;
        }
        if (ObjectUtil.isNotEmpty(orderCoinsEntity) ) {
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) {
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                // 已完成的直接返回
                return Result.fail(MessageSourceUtils.getString("order_service_0012"));
            }
            orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL);
            orderCoinsDao.updateById(orderCoinsEntity);
            OrderCoinsEntity update = new OrderCoinsEntity();
            update.setId(Long.valueOf(orderId));
            update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL);
            //orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL);
            orderCoinsDao.updateById(update);
            String symbol = orderCoinsEntity.getSymbol();
@@ -655,10 +663,10 @@
            detail.setTradeType(orderCoinsEntity.getTradeType());
            detail.setSymbol(symbol);
            detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_CANCEL);
            detail.setSymbolCnt(orderCoinsEntity.getEntrustCnt());
            detail.setSymbolCnt(BigDecimal.ZERO);
            detail.setEntrustPrice(orderCoinsEntity.getEntrustPrice());
            detail.setDealPrice(orderCoinsEntity.getDealPrice());
            detail.setDealAmount(orderCoinsEntity.getDealAmount());
            detail.setDealPrice(BigDecimal.ZERO);
            detail.setDealAmount(BigDecimal.ZERO);
            detail.setFeeAmount(orderCoinsEntity.getFeeAmount());
            if (OrderCoinsEntity.ORDERTYPE_BUY.equals(orderCoinsEntity.getOrderType())) {
                //如果是限价买入,撤单将USDT账户冻结金额返回
@@ -669,13 +677,7 @@
                    //手续费 = 开仓价*数量*手续费率
                    //返还金额=开仓价*未成交数量+手续费
                    // 这里根据成交的单计算
                    List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(Long.valueOf(orderId));
                    BigDecimal dealAmount = BigDecimal.ZERO;
                    if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){
                        for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) {
                            dealAmount = dealAmount.add(orderCoinsDealEntity.getDealAmount());
                        }
                    }
                    BigDecimal dealAmount = coinsEntityCancel.getDealAmount();
                    // 市价的按成交额退款
                    BigDecimal returnBalance = orderCoinsEntity.getEntrustAmount().subtract(dealAmount);
@@ -709,10 +711,11 @@
                MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
                if (ObjectUtil.isNotEmpty(walletCoin)) {
                    // 卖出按卖出的数量计算手续费
                    BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(orderCoinsEntity.getDealCnt());
                    BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(coinsEntityCancel.getDealCnt());
                    walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance));
                    walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance));
                    memberWalletCoinDao.updateById(walletCoin);
                    //memberWalletCoinDao.updateById(walletCoin);
                    memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), returnBalance, null, returnBalance.negate());
                    // 流水记录
                    MemberAccountFlowEntity record = new MemberAccountFlowEntity();
                    record.setSource(MemberAccountFlowEntity.SOURCE_CANCEL);
@@ -972,36 +975,14 @@
            BigDecimal price = exchangeTrade.getPrice();
            // 卖单
            Long sellOrderId = exchangeTrade.getSellOrderId();
            // 买卖单都需要处理
            // 买单
            OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId);
            if(buyOrderCoinsEntity==null){
                return;
            }
            BigDecimal buyEntrustCnt = buyOrderCoinsEntity.getEntrustCnt();
            if(buyEntrustCnt==null){
                buyEntrustCnt = BigDecimal.ZERO;
            }
            Long memberId = buyOrderCoinsEntity.getMemberId();
            if (buyOrderCoinsEntity != null) {
                List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(buyOrderId);
                // 比较剩余的量
                BigDecimal dealAmount = BigDecimal.ZERO;
                BigDecimal dealCnt = BigDecimal.ZERO;
                if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){
                    for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) {
                        dealAmount=dealAmount.add(orderCoinsDealEntity.getDealAmount());
                        dealCnt = dealCnt.add(orderCoinsDealEntity.getSymbolCnt());
                    }
                }
                // 单的总金额
                BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount();
                BigDecimal add = dealAmount.add(buyTurnover);
                BigDecimal closingPrice = buyTurnover.multiply(new BigDecimal("0.002"));
                //成交总量
                 dealCnt = dealCnt.add(amount);
                // 创建一个完成的单
                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
                detail.setMemberId(buyOrderCoinsEntity.getMemberId());
@@ -1016,36 +997,37 @@
                detail.setDealAmount(buyTurnover);
                detail.setFeeAmount(closingPrice);
                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
                // 如果这个单在取消状态 则不执行
                orderCoinDealDao.insert(detail);
                // 如果这个单成交完  更改状态
                if (add.compareTo(entrustAmount) >= 0 ||(buyEntrustCnt.compareTo(BigDecimal.ZERO)>0 &&dealCnt.compareTo(buyEntrustCnt)>=0) ) {
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(buyOrderId);
                    update.setDealAmount(entrustAmount);
                    update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount));
                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                    // 限价买入时,如果成交价比设置的价格低,需要退还多余的冻结
                    OrderCoinsEntity coinsEntity = orderCoinsDao.selectById(buyOrderId);
                    BigDecimal subtract = coinsEntity.getEntrustAmount().subtract(coinsEntity.getDealAmount());
                    if(subtract.compareTo(BigDecimal.ZERO)>=0){
                        // 下单扣的比较多
                        memberWalletCoinDao.updateWalletBalance(coinsEntity.getId(),subtract,subtract,subtract.negate());
                    }
                } else {
                    // 更新买单
                    orderCoinsDao.updateDeal(buyOrderId,amount,buyTurnover);
                //orderCoinsDao.updateDeal(buyOrderId, amount, buyTurnover);
                // 买币扣除冻结usdt 增加币种的可用
                MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue());
                if (usdtWallet != null) {
                    // 减少usdt冻结
                    memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate());
                }
                // 增加买的币
                MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol());
                if (buySymbolWallet != null) {
                    memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null);
            }
                // 流水记录
                MemberAccountFlowEntity record = new MemberAccountFlowEntity();
                record.setMemberId(buyOrderCoinsEntity.getMemberId());
                record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate());
                record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol());
                record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount);
                record.setSymbol(buyOrderCoinsEntity.getSymbol());
                record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover));
                memberAccountFlowEntityDao.insert(record);
            }
            // 卖单
            OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId);
            if (sellOrderCoinsEntity != null) {
                // 比较剩余的量
                BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt();
                // 单的总数量
                BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt();
                BigDecimal add = dealAmount.add(amount);
                // 创建一个完成的单
                OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
                detail.setMemberId(sellOrderCoinsEntity.getMemberId());
@@ -1062,46 +1044,7 @@
                detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
                orderCoinDealDao.insert(detail);
                // 如果这个单成交完  更改状态
                if (add.compareTo(entrustCnt) >= 0) {
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(sellOrderId);
                    // 总成交额
                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
                    update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
                    update.setDealCnt(entrustCnt);
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                } else {
                    // 未完成
                    OrderCoinsEntity update = new OrderCoinsEntity();
                    update.setId(sellOrderId);
                    // 总成交额
                    update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount()));
                    update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount));
                    update.setUpdateTime(new Date());
                    orderCoinsDao.updateById(update);
                }
                // 买币扣除冻结usdt 增加币种的可用
                MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue());
                if (usdtWallet != null) {
                    // 减少usdt冻结
                    memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate());
                }
                // 增加买的币
                MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol());
                if (buySymbolWallet != null) {
                    memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null);
                }
                // 流水记录
                MemberAccountFlowEntity record = new MemberAccountFlowEntity();
                record.setMemberId(buyOrderCoinsEntity.getMemberId());
                record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate());
                record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol());
                record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount);
                record.setSymbol(buyOrderCoinsEntity.getSymbol());
                record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover));
                memberAccountFlowEntityDao.insert(record);
                //orderCoinsDao.updateDeal(sellOrderId, amount, buyTurnover);
                // 卖家需要减少冻结的币种  增加usdt
                MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol());
                if (memberWalletCoinEntity != null) {
@@ -1117,9 +1060,9 @@
                MemberAccountFlowEntity recordSell = new MemberAccountFlowEntity();
                recordSell.setMemberId(sellOrderCoinsEntity.getMemberId());
                recordSell.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN));
                recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + buyOrderCoinsEntity.getSymbol());
                recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + buyOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString());
                recordSell.setSymbol(buyOrderCoinsEntity.getSymbol());
                recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + sellOrderCoinsEntity.getSymbol());
                recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + sellOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString());
                recordSell.setSymbol(sellOrderCoinsEntity.getSymbol());
                recordSell.setBalance(sellWalletCoinEntity.getAvailableBalance().add(buyTurnover));
                memberAccountFlowEntityDao.insert(recordSell);
            }
@@ -1127,6 +1070,36 @@
    }
    @Override
    @Transactional
    public void completeOrder(List<OrderCoinsEntity> trades) {
        // 订单完成 更新他们的状态
        List<Long> ids = new ArrayList<>();
        if (CollectionUtils.isNotEmpty(trades)) {
            for (OrderCoinsEntity trade : trades) {
                if (trade != null) {
                    orderCoinsDao.updateStatus(trade.getId(),OrderCoinsEntity.ORDERSTATUS_DONE);
                    ids.add(trade.getId());
                    // 买单 实际成交金额小于委托的 这一部分从冻结扣除
                    if(OrderCoinsEntity.ORDERTYPE_BUY==trade.getOrderType()){
                        if(trade.getEntrustAmount().compareTo(trade.getDealAmount())>0){
                            // 此时退回这部分的差额
                            BigDecimal subtract = trade.getEntrustAmount().subtract(trade.getDealAmount());
                            System.out.println(subtract);
                            MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(trade.getMemberId(), CoinTypeEnum.USDT.name());
                            if(memberWalletCoinEntity!=null){
                                memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),subtract,null,subtract.negate());
                            }
                        }
                    }
                }
            }
        }
        if (CollectionUtils.isNotEmpty(ids)) {
           // orderCoinsDao.batchUpdateStatus(ids, OrderCoinsEntity.ORDERSTATUS_DONE);
        }
    }
    @Override
    public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
                           BigDecimal amount, BigDecimal entrustAmount) {
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -4,6 +4,7 @@
import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.Candlestick;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.modules.exchange.service.HandleKlineService;
import com.xcong.excoin.trade.ExchangeTrade;
@@ -71,6 +72,8 @@
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        // 先处理处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
        // 处理K线 并更新最新价
        handleKlineService.handleExchangeOrderToKline(exchangeTrades);
        // 推送最新K线
@@ -112,8 +115,21 @@
            newCandlestick.setTick(model);
            tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null);
        }
        // 处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
    }
    /**
     *  撮合交易订单全部完成
     * @param content
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE)
    public void doComplete(String content) {
        log.info("#完成的订单---->{}#", content);
        List<OrderCoinsEntity> exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class);
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        orderCoinService.completeOrder(exchangeTrades);
    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java
@@ -6,12 +6,16 @@
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.trade.ExchangeTrade;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.Iterator;
import java.util.List;
/**
 *  提交买卖单进入撮合系统
@@ -45,4 +49,6 @@
        log.info("#取消的订单---->{}#", content);
        orderCoinService.cancelEntrustWalletCoinOrderForMatch(content);
    }
}
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
@@ -40,6 +40,12 @@
    }
    public void sendCompleteMsg(String content) {
        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_COMPLETE, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_COMPLETE, content, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        //log.info("#----->{}#", correlationData);
src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java
@@ -35,6 +35,7 @@
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("#----->{}#", correlationData);
src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -2,11 +2,8 @@
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -313,7 +310,7 @@
                    if (trade != null) {
                        exchangeTrades.add(trade);
                    }
                    //判断匹配单是否完成 TODO
                    //判断匹配单是否完成
                    if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
                        //当前匹配的订单完成交易,删除该订单
                        orderIterator.remove();
@@ -355,7 +352,7 @@
     */
    private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) {
        if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
            //剩余成交量 TODO ?
            //剩余成交量
            // 委托量-成交量=剩余量
            BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
            return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN);
@@ -517,11 +514,11 @@
                for (int index = 0; index < size; index += maxSize) {
                    int length = (size - index) > maxSize ? maxSize : size - index;
                    List<OrderCoinsEntity> subOrders = orders.subList(index, index + length);
                    // TODO 通知订单完成
                    //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders));
                    //  通知订单完成
                    exchangeProducer.sendCompleteMsg(JSON.toJSONString(subOrders));
                }
            } else {
                // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders));
                exchangeProducer.sendCompleteMsg(JSON.toJSONString(orders));
            }
        }
    }
src/main/resources/application-test.yml
@@ -7,9 +7,9 @@
  profiles:
    active: dev
  datasource:
    url: jdbc:mysql://47.114.114.219:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: roc_user
    password: roc123pasd!@
    url: jdbc:mysql://47.96.73.250:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: shop_user
    password: 123456
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
@@ -50,11 +50,11 @@
    ## Redis数据库索引(默认为0)
    database: 1
    ## Redis服务器地址
    host: 47.114.114.219
    host: 47.96.73.250
    ## Redis服务器连接端口
    port: 6379
    ## Redis服务器连接密码(默认为空)
    password: biyi123
    password: qwer12345678
    jedis:
      pool:
        ## 连接池最大连接数(使用负值表示没有限制)
@@ -72,9 +72,9 @@
    ## 连接超时时间(毫秒)
    timeout: 30000
  rabbitmq:
    host: 120.27.238.55
    host: 47.96.73.250
    port: 5672
    username: ct_rabbit
    username: rabbit
    password: 123456
    publisher-confirm-type: correlated
@@ -91,17 +91,16 @@
app:
  debug: false
  debug: true
  redis_expire: 3000
  kline-update-job: false
  newest-price-update-job: true
  #日线 该任务不能与最新价处于同一个服务器
  trade: true
  newest-price-update-job: false
  exchange-trade: true
  day-line: false
  other-job: true
  loop-job: true
  other-job: false
  loop-job: false
  rabbit-consumer: false
  block-job: true
  block-job: false
aliyun:
  oss:
src/main/resources/application.yml
@@ -5,11 +5,11 @@
spring:
  profiles:
    active: prod
    active: test
  datasource:
    url: jdbc:mysql://rm-bp151tw8er79ig9kb5o.mysql.rds.aliyuncs.com:3306/db_biue?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: ctcoin_data
    password: ctcoin_123
    url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: roc_user
    password: roc123pasd!@
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
@@ -48,7 +48,7 @@
  ## redis配置
  redis:
    ## Redis数据库索引(默认为0)
    database: 2
    database: 1
    ## Redis服务器地址
    host: 47.114.114.219
    ## Redis服务器连接端口
@@ -72,10 +72,10 @@
    ## 连接超时时间(毫秒)
    timeout: 30000
  rabbitmq:
    host: 120.27.238.55
    host: 47.114.114.219
    port: 5672
    username: ct_rabbit
    password: 123456
    username: roc_user
    password: roc123456
    publisher-confirm-type: correlated
@@ -93,13 +93,10 @@
app:
  debug: false
  redis_expire: 3000
  # k线更新任务控制
  kline-update-job: false
  #最新价任务控制
  newest-price-update-job: false
  #日线 该任务不能与最新价处于同一个服务器
  exchange-trade: false
  day-line: false
  #其他任务控制
  other-job: false
  loop-job: false
  rabbit-consumer: false
src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
@@ -8,11 +8,12 @@
    </select>
    
    <select id="findCoinOrderListByMemberIdAndSysmbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
        SELECT * FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status}
        SELECT (select sum(symbol_cnt) from coins_order_deal where order_id = a.id) as deal_cnt, a.create_by,a.create_time,    a.update_by,    a.update_time,    a.version    id,    a.member_id,    a.order_no,    a.order_type,    a.symbol,    a.mark_price,    a.entrust_cnt,    a.entrust_price,    a.deal_price,    a.deal_amount,    a.order_status,    a.trade_type,    a.fee_amount,    a.entrust_amount
        FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status}
            <if test="symbol != null and symbol !=''">
                 and a.symbol = #{symbol}
            </if>
            order by create_time desc
            order by a.create_time desc
    </select>
    
    <select id="findWalletCoinOrderByOrderNo" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
@@ -57,4 +58,17 @@
        </set>
        where id = #{id}
    </update>
    <update id="batchUpdateStatus" parameterType="map">
        update coins_order set order_status = #{status}
        where id in
        <foreach collection="list" item="item" separator="," open="(" close=")">
            #{item}
        </foreach>
    </update>
    <update id="updateStatus" parameterType="map">
        update coins_order set order_status = #{status}
        where id =#{id}
    </update>
</mapper>
src/test/java/com/xcong/excoin/TradeTest.java
@@ -5,6 +5,7 @@
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.utils.RedisUtils;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
@@ -13,6 +14,7 @@
import java.math.BigDecimal;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@Slf4j
@@ -31,4 +33,39 @@
    public void buy(){
        redisUtils.set("ROC_NEW_PRICE",new BigDecimal("12.33"));
    }
    public static void main(String[] args) throws InterruptedException {
        // 测试两个地方
        List<String> list  = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        // 开一个线程
        Thread thread = new Thread(new Runnable() {
            @SneakyThrows
            @Override
            public void run() {
                Thread.sleep(1000);
                synchronized (list){
                    Iterator<String> iterator = list.iterator();
                    while (iterator.hasNext()){
                        System.out.println("线程里"+iterator.next());
                        iterator.remove();
                    }
                }
            }
        });
        thread.start();
        synchronized (list){
            Iterator<String> iterator = list.iterator();
            while (iterator.hasNext()){
                Thread.sleep(1000);
                System.out.println(iterator.next());
            }
        }
        //
    }
}