From 32b5de4af771edfaa67197808882512ca7e30120 Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Wed, 14 Oct 2020 11:41:01 +0800 Subject: [PATCH] ROC交易所交易问题修复 --- src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java | 3 src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 283 ++++++++++++++++------------------- src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java | 1 src/test/java/com/xcong/excoin/TradeTest.java | 37 ++++ src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 22 ++ src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java | 4 src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml | 18 ++ src/main/resources/application-test.yml | 27 +- src/main/java/com/xcong/excoin/common/contants/AppContants.java | 5 src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java | 6 src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java | 3 src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java | 3 src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 20 ++ src/main/java/com/xcong/excoin/trade/CoinTrader.java | 13 - src/main/resources/application.yml | 21 +- src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java | 6 16 files changed, 277 insertions(+), 195 deletions(-) diff --git a/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java b/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java index 9d00c82..e18d3e7 100644 --- a/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java +++ b/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java @@ -47,10 +47,10 @@ String token = request.getHeader("token"); String uri = request.getRequestURI(); String mId = (String) redisUtil.get(token); - log.info("#token : {}, uri : {}, mId : {}#", token, uri, mId); + log.debug("#token : {}, uri : {}, mId : {}#", token, uri, mId); key = mId + "_" + uri; boolean flag = redisUtil.setNotExist(key, "1", 5); - log.info("#mid : {}, flag : {}#", mId, flag); + log.debug("#mid : {}, flag : {}#", mId, flag); if (flag) { Object result = joinPoint.proceed(); redisUtil.del(key); diff --git a/src/main/java/com/xcong/excoin/common/contants/AppContants.java b/src/main/java/com/xcong/excoin/common/contants/AppContants.java index 40a7065..89959cc 100644 --- a/src/main/java/com/xcong/excoin/common/contants/AppContants.java +++ b/src/main/java/com/xcong/excoin/common/contants/AppContants.java @@ -76,4 +76,9 @@ public static final String TIME_OUT = "time_out"; + /** + * 取消订单id + */ + public static final String ORDER_CANCEL_KEY = "COIN_ORDER_CANCEL_"; + } diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index b9d1449..0a590b2 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -121,6 +121,12 @@ public static final String ROUTING_KEY_ROC_ORDER_CANCEL = "roc-order-routingKey-cancel"; + public static final String EXCHANGE_ROC_ORDER_COMPLETE = "roc-exchange-order-complete"; + + public static final String QUEUE_ROC_ORDER_COMPLETE = "roc-order-queue-complete"; + + public static final String ROUTING_KEY_ROC_ORDER_COMPLETE = "roc-order-routingKey-complete"; + @Resource private ConnectionFactory connectionFactory; @@ -189,6 +195,22 @@ public Binding bindingCancelOrder() { return BindingBuilder.bind(ordereCancelQueue()).to(orderCancelExchange()).with(ROUTING_KEY_ROC_ORDER_CANCEL); } + // 交易订单 + @Bean + public DirectExchange orderCompleteExchange() { + return new DirectExchange(EXCHANGE_ROC_ORDER_COMPLETE); + } + + + @Bean + public Queue ordereCompleteQueue() { + return new Queue(QUEUE_ROC_ORDER_COMPLETE, true); + } + + @Bean + public Binding bindingCompleteOrder() { + return BindingBuilder.bind(ordereCompleteQueue()).to(orderCompleteExchange()).with(ROUTING_KEY_ROC_ORDER_COMPLETE); + } @Bean diff --git a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java index 2d42a21..d359f5a 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java +++ b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java @@ -6,6 +6,7 @@ import javax.validation.Valid; import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.annotations.SubmitRepeat; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.symbols.constants.SymbolsConstats; import org.springframework.validation.annotation.Validated; @@ -61,6 +62,7 @@ */ @ApiOperation(value = "提交买卖订单", notes = "提交买卖订单") @PostMapping(value="/submitSalesWalletCoinOrder") + @SubmitRepeat public Result submitSalesWalletCoinOrder(@RequestBody @Valid SubmitSalesWalletCoinOrderDto submitSalesWalletCoinOrderDto) { log.info("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto)); String symbol = submitSalesWalletCoinOrderDto.getSymbol(); @@ -97,6 +99,7 @@ */ @ApiOperation(value = "撤销委托订单", notes = "撤销委托订单") @PostMapping(value="/cancelEntrustWalletCoinOrder") + @SubmitRepeat public Result cancelEntrustWalletCoinOrder(@RequestBody @Valid CancelEntrustWalletCoinOrderDto cancelEntrustWalletCoinOrderDto) { String orderId = cancelEntrustWalletCoinOrderDto.getOrderId(); // 根据不同币种 diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java index dd71072..80bae11 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java +++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java @@ -21,4 +21,7 @@ List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list); void updateDeal(@Param("id") Long id, @Param("dealCnt")BigDecimal dealCnt,@Param("dealAmount")BigDecimal dealAmount); + + void batchUpdateStatus(@Param("list")List<Long> list,@Param("status") Integer status); + void updateStatus(@Param("id")Long id,@Param("status") Integer status); } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java index 5be578b..a1e3301 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java @@ -52,6 +52,8 @@ public void handleOrder(List<ExchangeTrade> trades); + public void completeOrder(List<OrderCoinsEntity> trades); + void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount,BigDecimal entrustAmount); @@ -62,4 +64,5 @@ */ public Result cancelEntrustWalletCoinOrderForMatch(String orderId); + } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java index eb034ca..df9e756 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java @@ -8,6 +8,7 @@ import javax.annotation.Resource; import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.common.enumerates.CoinTypeEnum; import com.xcong.excoin.modules.blackchain.service.RocService; import com.xcong.excoin.modules.coin.mapper.OrderCoinsDealMapper; @@ -344,38 +345,38 @@ Long memberId = LoginUserUtils.getAppLoginUser().getId(); // 需要实名 MemberEntity memberEntity = memberDao.selectById(memberId); - if(!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())){ + if (!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())) { return Result.fail(MessageSourceUtils.getString("member_controller_0001")); } // 需要先 String phone = memberEntity.getPhone(); - if(!"13632989240".equals(phone) && !"15158130575".equals(phone)){ - if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){ + if (!"13632989240".equals(phone) && !"15158130575".equals(phone)) { + if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { // 不能超过800个 - if(amount!=null && amount.compareTo(new BigDecimal("800"))>0){ - return Result.fail("买入额度受限"); - } - BigDecimal bigDecimal = orderCoinDealDao.sumTodayBuyAmount(memberId, symbol); - if(bigDecimal==null){ - bigDecimal= BigDecimal.ZERO; - } - amount= amount==null?BigDecimal.ZERO:amount; - bigDecimal = bigDecimal.add(amount); - if(bigDecimal!=null && bigDecimal.compareTo(new BigDecimal("800"))>0){ - return Result.fail("买入额度受限"); - } - // 挂单不能超过800 - BigDecimal bigDecimal1 = orderCoinDealDao.sumTodayEntrustCntBuyAmount(memberId, symbol); - if(bigDecimal1==null){ - bigDecimal1=BigDecimal.ZERO; - } - bigDecimal1 = bigDecimal1.add(amount); - if(bigDecimal1!=null && bigDecimal1.compareTo(new BigDecimal("800"))>0){ - return Result.fail("买入额度受限"); - } - }else{ - return Result.fail("卖出受限"); +// if (amount != null && amount.compareTo(new BigDecimal("800")) > 0) { +// return Result.fail("买入额度受限"); +// } +// BigDecimal bigDecimal = orderCoinDealDao.sumTodayBuyAmount(memberId, symbol); +// if (bigDecimal == null) { +// bigDecimal = BigDecimal.ZERO; +// } +// amount = amount == null ? BigDecimal.ZERO : amount; +// bigDecimal = bigDecimal.add(amount); +// if (bigDecimal != null && bigDecimal.compareTo(new BigDecimal("800")) > 0) { +// return Result.fail("买入额度受限"); +// } +// // 挂单不能超过800 +// BigDecimal bigDecimal1 = orderCoinDealDao.sumTodayEntrustCntBuyAmount(memberId, symbol); +// if (bigDecimal1 == null) { +// bigDecimal1 = BigDecimal.ZERO; +// } +// bigDecimal1 = bigDecimal1.add(amount); +// if (bigDecimal1 != null && bigDecimal1.compareTo(new BigDecimal("800")) > 0) { +// return Result.fail("买入额度受限"); +// } + } else { + return Result.fail("卖出受限"); } } @@ -414,13 +415,11 @@ entrustAmount = price.multiply(amount); } else { // 市价 - if(OrderCoinsEntity.ORDERTYPE_BUY==type){ + if (OrderCoinsEntity.ORDERTYPE_BUY == type) { closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio()); totalPayPrice = entrustAmount.add(closingPrice); } } - // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice); - String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue(); MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode); if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { @@ -486,24 +485,12 @@ //冻结相应的资产 if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { //如果是买入,所对应的币种增加,USDT账户减少金额 -// BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice); -// BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice); -// walletCoinUsdt.setAvailableBalance(availableBalance); -// walletCoinUsdt.setFrozenBalance(frozenBalance); -// memberWalletCoinDao.updateById(walletCoinUsdt); - memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(),totalPayPrice.negate(),totalPayPrice.negate(),entrustAmount); + memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(), totalPayPrice.negate(), totalPayPrice.negate(), entrustAmount); } else { //如果是卖出,币种减少,USDT增加 -// BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount); -// BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount); -// walletCoin.setAvailableBalance(availableBalance); -// walletCoin.setFrozenBalance(frozenBalance); -// memberWalletCoinDao.updateById(walletCoin); - memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount); + memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), amount.negate(), amount.negate(), amount); } - // 加入到撮合 TODO 通过消息队列发送到交易撮合 - //CoinTrader trader = factory.getTrader(symbol); - //trader.trade(order); + // 加入到撮合 order.setSymbol(symbol); orderSubmitProducer.sendMsg(JSONObject.toJSONString(order)); return Result.ok(MessageSourceUtils.getString("order_service_0011")); @@ -544,17 +531,28 @@ @Override @Transactional public Result cancelEntrustWalletCoinOrder(String orderId) { + // 将这个取消放入redis + boolean b = redisUtils.setNotExist(AppContants.ORDER_CANCEL_KEY + orderId, orderId, 10); + if (!b) { + return Result.ok(MessageSourceUtils.getString("order_service_0012")); + } //获取用户ID Long memberId = LoginUserUtils.getAppLoginUser().getId(); OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId); - if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId) ) { + if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId)) { // 如果是撮合交易单 if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) { + // 这里先更新状态 判断状态 防止消息发送过程中的二次提交 + if (!orderCoinsEntity.getOrderStatus().equals(OrderCoinsEntity.ORDERSTATUS_DODING)) { + // 不是持仓中 返回 + return Result.ok(MessageSourceUtils.getString("order_service_0013")); + } + + // 更新为已取消(可能在这个过程中 这个单已经成交) orderSubmitProducer.sendCancelMsg(orderId); - // return this.cancelEntrustWalletCoinOrderForMatch(orderId); return Result.ok(MessageSourceUtils.getString("order_service_0013")); } - if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) { + if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { return Result.fail(MessageSourceUtils.getString("order_service_0012")); } orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); @@ -629,21 +627,31 @@ @Override @Transactional public Result cancelEntrustWalletCoinOrderForMatch(String orderId) { - //获取用户ID + //如果redis中没有这个单 则不再往下走 OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId); - if(orderCoinsEntity==null){ + if (orderCoinsEntity == null) { return Result.ok(""); } Long memberId = orderCoinsEntity.getMemberId(); // 取消撮合订单的单 CoinTrader trader = factory.getTrader(orderCoinsEntity.getSymbol()); - trader.cancelOrder(orderCoinsEntity); - if (ObjectUtil.isNotEmpty(orderCoinsEntity) ) { - if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) { + // 从撮合交易系统得到的已成交的数据 + OrderCoinsEntity coinsEntityCancel = trader.cancelOrder(orderCoinsEntity); + if (coinsEntityCancel == null) { + // 此时说明撮合系统已经没这个单了 不需要继续处理 + return null; + } + + if (ObjectUtil.isNotEmpty(orderCoinsEntity)) { + if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + // 已完成的直接返回 return Result.fail(MessageSourceUtils.getString("order_service_0012")); } - orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); - orderCoinsDao.updateById(orderCoinsEntity); + OrderCoinsEntity update = new OrderCoinsEntity(); + update.setId(Long.valueOf(orderId)); + update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); + //orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); + orderCoinsDao.updateById(update); String symbol = orderCoinsEntity.getSymbol(); @@ -655,10 +663,10 @@ detail.setTradeType(orderCoinsEntity.getTradeType()); detail.setSymbol(symbol); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_CANCEL); - detail.setSymbolCnt(orderCoinsEntity.getEntrustCnt()); + detail.setSymbolCnt(BigDecimal.ZERO); detail.setEntrustPrice(orderCoinsEntity.getEntrustPrice()); - detail.setDealPrice(orderCoinsEntity.getDealPrice()); - detail.setDealAmount(orderCoinsEntity.getDealAmount()); + detail.setDealPrice(BigDecimal.ZERO); + detail.setDealAmount(BigDecimal.ZERO); detail.setFeeAmount(orderCoinsEntity.getFeeAmount()); if (OrderCoinsEntity.ORDERTYPE_BUY.equals(orderCoinsEntity.getOrderType())) { //如果是限价买入,撤单将USDT账户冻结金额返回 @@ -669,13 +677,7 @@ //手续费 = 开仓价*数量*手续费率 //返还金额=开仓价*未成交数量+手续费 // 这里根据成交的单计算 - List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(Long.valueOf(orderId)); - BigDecimal dealAmount = BigDecimal.ZERO; - if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){ - for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) { - dealAmount = dealAmount.add(orderCoinsDealEntity.getDealAmount()); - } - } + BigDecimal dealAmount = coinsEntityCancel.getDealAmount(); // 市价的按成交额退款 BigDecimal returnBalance = orderCoinsEntity.getEntrustAmount().subtract(dealAmount); @@ -689,7 +691,7 @@ returnFee = orderCoinsEntity.getFeeAmount().subtract(needFee); } BigDecimal avi = returnBalance.add(returnFee); - memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),avi,null,returnBalance.negate()); + memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), avi, null, returnBalance.negate()); walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance).add(returnFee)); walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance)); //memberWalletCoinDao.updateById(walletCoin); @@ -709,10 +711,11 @@ MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol); if (ObjectUtil.isNotEmpty(walletCoin)) { // 卖出按卖出的数量计算手续费 - BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(orderCoinsEntity.getDealCnt()); + BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(coinsEntityCancel.getDealCnt()); walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance)); walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance)); - memberWalletCoinDao.updateById(walletCoin); + //memberWalletCoinDao.updateById(walletCoin); + memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), returnBalance, null, returnBalance.negate()); // 流水记录 MemberAccountFlowEntity record = new MemberAccountFlowEntity(); record.setSource(MemberAccountFlowEntity.SOURCE_CANCEL); @@ -972,36 +975,14 @@ BigDecimal price = exchangeTrade.getPrice(); // 卖单 Long sellOrderId = exchangeTrade.getSellOrderId(); + // 买卖单都需要处理 // 买单 OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId); - if(buyOrderCoinsEntity==null){ - return; - } - BigDecimal buyEntrustCnt = buyOrderCoinsEntity.getEntrustCnt(); - if(buyEntrustCnt==null){ - buyEntrustCnt = BigDecimal.ZERO; - } - Long memberId = buyOrderCoinsEntity.getMemberId(); if (buyOrderCoinsEntity != null) { - List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(buyOrderId); - // 比较剩余的量 - BigDecimal dealAmount = BigDecimal.ZERO; - BigDecimal dealCnt = BigDecimal.ZERO; - if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){ - for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) { - dealAmount=dealAmount.add(orderCoinsDealEntity.getDealAmount()); - dealCnt = dealCnt.add(orderCoinsDealEntity.getSymbolCnt()); - } - } - // 单的总金额 - BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount(); - BigDecimal add = dealAmount.add(buyTurnover); BigDecimal closingPrice = buyTurnover.multiply(new BigDecimal("0.002")); - //成交总量 - dealCnt = dealCnt.add(amount); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(buyOrderCoinsEntity.getMemberId()); @@ -1016,36 +997,37 @@ detail.setDealAmount(buyTurnover); detail.setFeeAmount(closingPrice); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); + // 如果这个单在取消状态 则不执行 orderCoinDealDao.insert(detail); - // 如果这个单成交完 更改状态 - if (add.compareTo(entrustAmount) >= 0 ||(buyEntrustCnt.compareTo(BigDecimal.ZERO)>0 &&dealCnt.compareTo(buyEntrustCnt)>=0) ) { - OrderCoinsEntity update = new OrderCoinsEntity(); - update.setId(buyOrderId); - update.setDealAmount(entrustAmount); - update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount)); - update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); - update.setUpdateTime(new Date()); - orderCoinsDao.updateById(update); - // 限价买入时,如果成交价比设置的价格低,需要退还多余的冻结 - OrderCoinsEntity coinsEntity = orderCoinsDao.selectById(buyOrderId); - BigDecimal subtract = coinsEntity.getEntrustAmount().subtract(coinsEntity.getDealAmount()); - if(subtract.compareTo(BigDecimal.ZERO)>=0){ - // 下单扣的比较多 - memberWalletCoinDao.updateWalletBalance(coinsEntity.getId(),subtract,subtract,subtract.negate()); - } - } else { - // 更新买单 - orderCoinsDao.updateDeal(buyOrderId,amount,buyTurnover); + // 更新买单 + //orderCoinsDao.updateDeal(buyOrderId, amount, buyTurnover); + // 买币扣除冻结usdt 增加币种的可用 + MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue()); + if (usdtWallet != null) { + // 减少usdt冻结 + memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate()); } + // 增加买的币 + MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol()); + if (buySymbolWallet != null) { + memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null); + } + // 流水记录 + MemberAccountFlowEntity record = new MemberAccountFlowEntity(); + record.setMemberId(buyOrderCoinsEntity.getMemberId()); + record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate()); + record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol()); + record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount); + record.setSymbol(buyOrderCoinsEntity.getSymbol()); + record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover)); + memberAccountFlowEntityDao.insert(record); } + // 卖单 OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId); if (sellOrderCoinsEntity != null) { // 比较剩余的量 BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt(); - // 单的总数量 - BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt(); - BigDecimal add = dealAmount.add(amount); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(sellOrderCoinsEntity.getMemberId()); @@ -1062,46 +1044,7 @@ detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); orderCoinDealDao.insert(detail); // 如果这个单成交完 更改状态 - if (add.compareTo(entrustCnt) >= 0) { - OrderCoinsEntity update = new OrderCoinsEntity(); - update.setId(sellOrderId); - // 总成交额 - update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); - update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); - update.setDealCnt(entrustCnt); - update.setUpdateTime(new Date()); - orderCoinsDao.updateById(update); - } else { - // 未完成 - OrderCoinsEntity update = new OrderCoinsEntity(); - update.setId(sellOrderId); - // 总成交额 - update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); - update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount)); - update.setUpdateTime(new Date()); - orderCoinsDao.updateById(update); - } - // 买币扣除冻结usdt 增加币种的可用 - MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue()); - if (usdtWallet != null) { - // 减少usdt冻结 - memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate()); - } - - // 增加买的币 - MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol()); - if (buySymbolWallet != null) { - memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null); - } - // 流水记录 - MemberAccountFlowEntity record = new MemberAccountFlowEntity(); - record.setMemberId(buyOrderCoinsEntity.getMemberId()); - record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate()); - record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol()); - record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount); - record.setSymbol(buyOrderCoinsEntity.getSymbol()); - record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover)); - memberAccountFlowEntityDao.insert(record); + //orderCoinsDao.updateDeal(sellOrderId, amount, buyTurnover); // 卖家需要减少冻结的币种 增加usdt MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol()); if (memberWalletCoinEntity != null) { @@ -1117,9 +1060,9 @@ MemberAccountFlowEntity recordSell = new MemberAccountFlowEntity(); recordSell.setMemberId(sellOrderCoinsEntity.getMemberId()); recordSell.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN)); - recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + buyOrderCoinsEntity.getSymbol()); - recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + buyOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString()); - recordSell.setSymbol(buyOrderCoinsEntity.getSymbol()); + recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + sellOrderCoinsEntity.getSymbol()); + recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + sellOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString()); + recordSell.setSymbol(sellOrderCoinsEntity.getSymbol()); recordSell.setBalance(sellWalletCoinEntity.getAvailableBalance().add(buyTurnover)); memberAccountFlowEntityDao.insert(recordSell); } @@ -1127,6 +1070,36 @@ } @Override + @Transactional + public void completeOrder(List<OrderCoinsEntity> trades) { + // 订单完成 更新他们的状态 + List<Long> ids = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(trades)) { + for (OrderCoinsEntity trade : trades) { + if (trade != null) { + orderCoinsDao.updateStatus(trade.getId(),OrderCoinsEntity.ORDERSTATUS_DONE); + ids.add(trade.getId()); + // 买单 实际成交金额小于委托的 这一部分从冻结扣除 + if(OrderCoinsEntity.ORDERTYPE_BUY==trade.getOrderType()){ + if(trade.getEntrustAmount().compareTo(trade.getDealAmount())>0){ + // 此时退回这部分的差额 + BigDecimal subtract = trade.getEntrustAmount().subtract(trade.getDealAmount()); + System.out.println(subtract); + MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(trade.getMemberId(), CoinTypeEnum.USDT.name()); + if(memberWalletCoinEntity!=null){ + memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),subtract,null,subtract.negate()); + } + } + } + } + } + } + if (CollectionUtils.isNotEmpty(ids)) { + // orderCoinsDao.batchUpdateStatus(ids, OrderCoinsEntity.ORDERSTATUS_DONE); + } + } + + @Override public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) { diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java index 2ed81da..9670624 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; @@ -71,6 +72,8 @@ if(CollectionUtils.isEmpty(exchangeTrades)){ return; } + // 先处理处理用户订单 + orderCoinService.handleOrder(exchangeTrades); // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 @@ -112,8 +115,21 @@ newCandlestick.setTick(model); tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); } - // 处理用户订单 - orderCoinService.handleOrder(exchangeTrades); + + } + + /** + * 撮合交易订单全部完成 + * @param content + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE) + public void doComplete(String content) { + log.info("#完成的订单---->{}#", content); + List<OrderCoinsEntity> exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class); + if(CollectionUtils.isEmpty(exchangeTrades)){ + return; + } + orderCoinService.completeOrder(exchangeTrades); } } diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java index 4f20d62..4e7ee1c 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java @@ -6,12 +6,16 @@ import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.trade.CoinTraderFactory; +import com.xcong.excoin.trade.ExchangeTrade; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.Iterator; +import java.util.List; /** * 提交买卖单进入撮合系统 @@ -45,4 +49,6 @@ log.info("#取消的订单---->{}#", content); orderCoinService.cancelEntrustWalletCoinOrderForMatch(content); } + + } diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java index 5d6e4f6..77aee3e 100644 --- a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java +++ b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java @@ -40,6 +40,12 @@ } + public void sendCompleteMsg(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_COMPLETE, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_COMPLETE, content, correlationData); + } + + @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //log.info("#----->{}#", correlationData); diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java index d873ca1..1cc749f 100644 --- a/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java +++ b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java @@ -35,6 +35,7 @@ } + @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("#----->{}#", correlationData); diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java index 3e249f0..38de6e4 100644 --- a/src/main/java/com/xcong/excoin/trade/CoinTrader.java +++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java @@ -2,11 +2,8 @@ import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; -import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.rabbit.producer.ExchangeProducer; -import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -313,7 +310,7 @@ if (trade != null) { exchangeTrades.add(trade); } - //判断匹配单是否完成 TODO + //判断匹配单是否完成 if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { //当前匹配的订单完成交易,删除该订单 orderIterator.remove(); @@ -355,7 +352,7 @@ */ private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) { if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { - //剩余成交量 TODO ? + //剩余成交量 // 委托量-成交量=剩余量 BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount()); return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN); @@ -517,11 +514,11 @@ for (int index = 0; index < size; index += maxSize) { int length = (size - index) > maxSize ? maxSize : size - index; List<OrderCoinsEntity> subOrders = orders.subList(index, index + length); - // TODO 通知订单完成 - //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders)); + // 通知订单完成 + exchangeProducer.sendCompleteMsg(JSON.toJSONString(subOrders)); } } else { - // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders)); + exchangeProducer.sendCompleteMsg(JSON.toJSONString(orders)); } } } diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml index d36e69b..ef4745b 100644 --- a/src/main/resources/application-test.yml +++ b/src/main/resources/application-test.yml @@ -7,9 +7,9 @@ profiles: active: dev datasource: - url: jdbc:mysql://47.114.114.219:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 - username: roc_user - password: roc123pasd!@ + url: jdbc:mysql://47.96.73.250:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 + username: shop_user + password: 123456 driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource druid: @@ -50,11 +50,11 @@ ## Redis数据库索引(默认为0) database: 1 ## Redis服务器地址 - host: 47.114.114.219 + host: 47.96.73.250 ## Redis服务器连接端口 port: 6379 ## Redis服务器连接密码(默认为空) - password: biyi123 + password: qwer12345678 jedis: pool: ## 连接池最大连接数(使用负值表示没有限制) @@ -72,9 +72,9 @@ ## 连接超时时间(毫秒) timeout: 30000 rabbitmq: - host: 120.27.238.55 + host: 47.96.73.250 port: 5672 - username: ct_rabbit + username: rabbit password: 123456 publisher-confirm-type: correlated @@ -91,17 +91,16 @@ app: - debug: false + debug: true redis_expire: 3000 kline-update-job: false - newest-price-update-job: true - #日线 该任务不能与最新价处于同一个服务器 - trade: true + newest-price-update-job: false + exchange-trade: true day-line: false - other-job: true - loop-job: true + other-job: false + loop-job: false rabbit-consumer: false - block-job: true + block-job: false aliyun: oss: diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index b9fc797..d7ba143 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -5,11 +5,11 @@ spring: profiles: - active: prod + active: test datasource: - url: jdbc:mysql://rm-bp151tw8er79ig9kb5o.mysql.rds.aliyuncs.com:3306/db_biue?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 - username: ctcoin_data - password: ctcoin_123 + url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 + username: roc_user + password: roc123pasd!@ driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource druid: @@ -48,7 +48,7 @@ ## redis配置 redis: ## Redis数据库索引(默认为0) - database: 2 + database: 1 ## Redis服务器地址 host: 47.114.114.219 ## Redis服务器连接端口 @@ -72,10 +72,10 @@ ## 连接超时时间(毫秒) timeout: 30000 rabbitmq: - host: 120.27.238.55 + host: 47.114.114.219 port: 5672 - username: ct_rabbit - password: 123456 + username: roc_user + password: roc123456 publisher-confirm-type: correlated @@ -93,13 +93,10 @@ app: debug: false redis_expire: 3000 - # k线更新任务控制 kline-update-job: false - #最新价任务控制 newest-price-update-job: false - #日线 该任务不能与最新价处于同一个服务器 + exchange-trade: false day-line: false - #其他任务控制 other-job: false loop-job: false rabbit-consumer: false diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml index 16190bd..1c5733f 100644 --- a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml +++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml @@ -8,11 +8,12 @@ </select> <select id="findCoinOrderListByMemberIdAndSysmbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity"> - SELECT * FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status} + SELECT (select sum(symbol_cnt) from coins_order_deal where order_id = a.id) as deal_cnt, a.create_by,a.create_time, a.update_by, a.update_time, a.version id, a.member_id, a.order_no, a.order_type, a.symbol, a.mark_price, a.entrust_cnt, a.entrust_price, a.deal_price, a.deal_amount, a.order_status, a.trade_type, a.fee_amount, a.entrust_amount + FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status} <if test="symbol != null and symbol !=''"> and a.symbol = #{symbol} </if> - order by create_time desc + order by a.create_time desc </select> <select id="findWalletCoinOrderByOrderNo" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity"> @@ -57,4 +58,17 @@ </set> where id = #{id} </update> + + <update id="batchUpdateStatus" parameterType="map"> + update coins_order set order_status = #{status} + where id in + <foreach collection="list" item="item" separator="," open="(" close=")"> + #{item} + </foreach> + </update> + + <update id="updateStatus" parameterType="map"> + update coins_order set order_status = #{status} + where id =#{id} + </update> </mapper> diff --git a/src/test/java/com/xcong/excoin/TradeTest.java b/src/test/java/com/xcong/excoin/TradeTest.java index b517349..c941e4b 100644 --- a/src/test/java/com/xcong/excoin/TradeTest.java +++ b/src/test/java/com/xcong/excoin/TradeTest.java @@ -5,6 +5,7 @@ import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.utils.RedisUtils; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -13,6 +14,7 @@ import java.math.BigDecimal; import java.text.ParseException; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; @Slf4j @@ -31,4 +33,39 @@ public void buy(){ redisUtils.set("ROC_NEW_PRICE",new BigDecimal("12.33")); } + + public static void main(String[] args) throws InterruptedException { + // 测试两个地方 + List<String> list = new ArrayList<>(); + list.add("1"); + list.add("2"); + list.add("3"); + list.add("4"); + // 开一个线程 + Thread thread = new Thread(new Runnable() { + @SneakyThrows + @Override + public void run() { + Thread.sleep(1000); + synchronized (list){ + Iterator<String> iterator = list.iterator(); + while (iterator.hasNext()){ + System.out.println("线程里"+iterator.next()); + iterator.remove(); + } + } + + } + }); + thread.start(); + synchronized (list){ + Iterator<String> iterator = list.iterator(); + while (iterator.hasNext()){ + Thread.sleep(1000); + System.out.println(iterator.next()); + } + } + + // + } } -- Gitblit v1.9.1