From edadefcc724fa0ec190ba7b6bf63ae49ac80c545 Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Wed, 14 Oct 2020 20:34:29 +0800 Subject: [PATCH] cpv配置 --- src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java | 3 src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 239 +++++++++++++++------------------ src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java | 1 src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 22 +++ src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java | 2 src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java | 18 ++ src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml | 18 ++ src/main/java/com/xcong/excoin/quartz/job/KlineDataUpdateJob.java | 3 src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java | 3 src/main/java/com/xcong/excoin/common/contants/AppContants.java | 5 src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java | 6 src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java | 5 src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java | 5 src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java | 43 +++-- src/main/java/com/xcong/excoin/common/system/controller/LoginController.java | 2 src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 20 ++ src/main/java/com/xcong/excoin/trade/CoinTrader.java | 13 - src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java | 8 + 18 files changed, 245 insertions(+), 171 deletions(-) diff --git a/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java b/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java index 9d00c82..f7f4596 100644 --- a/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java +++ b/src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java @@ -1,7 +1,13 @@ package com.xcong.excoin.common.aop; +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.asymmetric.KeyType; +import cn.hutool.crypto.asymmetric.RSA; +import com.xcong.excoin.common.LoginUserUtils; import com.xcong.excoin.common.annotations.SubmitRepeat; +import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.common.response.Result; +import com.xcong.excoin.configurations.properties.SecurityProperties; import com.xcong.excoin.utils.MessageSourceUtils; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; @@ -27,6 +33,8 @@ @Resource private RedisUtils redisUtil; + @Resource + private SecurityProperties securityProperties; private String key; @@ -44,9 +52,15 @@ ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); - String token = request.getHeader("token"); + //String token = request.getHeader("token"); + String bearerToken = request.getHeader(AppContants.TOKEN_HEADER); + String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, ""); + RSA rsa = new RSA(securityProperties.getPrivateKey(), null); + String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_"); + String token = tokens[0]; String uri = request.getRequestURI(); - String mId = (String) redisUtil.get(token); + Long mId = LoginUserUtils.getAppLoginUser().getId(); + //String mId = (String) redisUtil.get(token); log.info("#token : {}, uri : {}, mId : {}#", token, uri, mId); key = mId + "_" + uri; boolean flag = redisUtil.setNotExist(key, "1", 5); diff --git a/src/main/java/com/xcong/excoin/common/contants/AppContants.java b/src/main/java/com/xcong/excoin/common/contants/AppContants.java index 40a7065..89959cc 100644 --- a/src/main/java/com/xcong/excoin/common/contants/AppContants.java +++ b/src/main/java/com/xcong/excoin/common/contants/AppContants.java @@ -76,4 +76,9 @@ public static final String TIME_OUT = "time_out"; + /** + * 取消订单id + */ + public static final String ORDER_CANCEL_KEY = "COIN_ORDER_CANCEL_"; + } diff --git a/src/main/java/com/xcong/excoin/common/system/controller/LoginController.java b/src/main/java/com/xcong/excoin/common/system/controller/LoginController.java index 4754bb5..5e64a2f 100644 --- a/src/main/java/com/xcong/excoin/common/system/controller/LoginController.java +++ b/src/main/java/com/xcong/excoin/common/system/controller/LoginController.java @@ -111,7 +111,7 @@ return rsa.encryptBase64(token + "_" + System.currentTimeMillis(), KeyType.PublicKey); } - @SubmitRepeat + //@SubmitRepeat @ApiOperation(value = "app注册接口", notes = "app注册接口,验证码必须输入可默认为123456") @PostMapping(value = "/register") public Result register(@RequestBody @Validated RegisterDto registerDto) { diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index b9d1449..0a590b2 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -121,6 +121,12 @@ public static final String ROUTING_KEY_ROC_ORDER_CANCEL = "roc-order-routingKey-cancel"; + public static final String EXCHANGE_ROC_ORDER_COMPLETE = "roc-exchange-order-complete"; + + public static final String QUEUE_ROC_ORDER_COMPLETE = "roc-order-queue-complete"; + + public static final String ROUTING_KEY_ROC_ORDER_COMPLETE = "roc-order-routingKey-complete"; + @Resource private ConnectionFactory connectionFactory; @@ -189,6 +195,22 @@ public Binding bindingCancelOrder() { return BindingBuilder.bind(ordereCancelQueue()).to(orderCancelExchange()).with(ROUTING_KEY_ROC_ORDER_CANCEL); } + // 交易订单 + @Bean + public DirectExchange orderCompleteExchange() { + return new DirectExchange(EXCHANGE_ROC_ORDER_COMPLETE); + } + + + @Bean + public Queue ordereCompleteQueue() { + return new Queue(QUEUE_ROC_ORDER_COMPLETE, true); + } + + @Bean + public Binding bindingCompleteOrder() { + return BindingBuilder.bind(ordereCompleteQueue()).to(orderCompleteExchange()).with(ROUTING_KEY_ROC_ORDER_COMPLETE); + } @Bean diff --git a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java index 796bf4a..004f851 100644 --- a/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/security/WebSecurityConfig.java @@ -53,8 +53,9 @@ .antMatchers("/api/member/getAppVersionInfo").permitAll() .antMatchers("/api/orderCoin/searchSymbolResultList").permitAll() .antMatchers("/api/orderCoin/findCollect").permitAll() - .antMatchers("/trade/**").permitAll() + .antMatchers("/api/orderCoin/deal/list").permitAll() .antMatchers("/api/helpCenter/**").permitAll() + .antMatchers("/trade/**").permitAll() .anyRequest().authenticated() .and().apply(securityConfiguereAdapter()); } diff --git a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java index 2d42a21..792d40d 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java +++ b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java @@ -6,6 +6,7 @@ import javax.validation.Valid; import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.annotations.SubmitRepeat; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.symbols.constants.SymbolsConstats; import org.springframework.validation.annotation.Validated; @@ -61,8 +62,9 @@ */ @ApiOperation(value = "提交买卖订单", notes = "提交买卖订单") @PostMapping(value="/submitSalesWalletCoinOrder") + @SubmitRepeat public Result submitSalesWalletCoinOrder(@RequestBody @Valid SubmitSalesWalletCoinOrderDto submitSalesWalletCoinOrderDto) { - log.info("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto)); + log.debug("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto)); String symbol = submitSalesWalletCoinOrderDto.getSymbol(); Integer type = submitSalesWalletCoinOrderDto.getType(); Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType(); @@ -97,6 +99,7 @@ */ @ApiOperation(value = "撤销委托订单", notes = "撤销委托订单") @PostMapping(value="/cancelEntrustWalletCoinOrder") + @SubmitRepeat public Result cancelEntrustWalletCoinOrder(@RequestBody @Valid CancelEntrustWalletCoinOrderDto cancelEntrustWalletCoinOrderDto) { String orderId = cancelEntrustWalletCoinOrderDto.getOrderId(); // 根据不同币种 diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java index dd71072..80bae11 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java +++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java @@ -21,4 +21,7 @@ List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list); void updateDeal(@Param("id") Long id, @Param("dealCnt")BigDecimal dealCnt,@Param("dealAmount")BigDecimal dealAmount); + + void batchUpdateStatus(@Param("list")List<Long> list,@Param("status") Integer status); + void updateStatus(@Param("id")Long id,@Param("status") Integer status); } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java index 960fda6..a1e3301 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java @@ -52,6 +52,10 @@ public void handleOrder(List<ExchangeTrade> trades); + public void completeOrder(List<OrderCoinsEntity> trades); + + void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price, + BigDecimal amount,BigDecimal entrustAmount); /** * 撮合交易单的撤销方法 @@ -60,4 +64,5 @@ */ public Result cancelEntrustWalletCoinOrderForMatch(String orderId); + } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java index 35ee332..4583faf 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java @@ -8,6 +8,7 @@ import javax.annotation.Resource; import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.common.enumerates.CoinTypeEnum; import com.xcong.excoin.modules.blackchain.service.RocService; import com.xcong.excoin.modules.coin.mapper.OrderCoinsDealMapper; @@ -344,7 +345,7 @@ Long memberId = LoginUserUtils.getAppLoginUser().getId(); // 需要实名 MemberEntity memberEntity = memberDao.selectById(memberId); - if(!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())){ + if (!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())) { return Result.fail(MessageSourceUtils.getString("member_controller_0001")); } BigDecimal nowPriceinBigDecimal = price; @@ -382,13 +383,11 @@ entrustAmount = price.multiply(amount); } else { // 市价 - if(OrderCoinsEntity.ORDERTYPE_BUY==type){ + if (OrderCoinsEntity.ORDERTYPE_BUY == type) { closingPrice = entrustAmount.multiply(tradeSetting.getCoinFeeRatio()); totalPayPrice = entrustAmount.add(closingPrice); } } - // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice); - String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue(); MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode); if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { @@ -454,24 +453,12 @@ //冻结相应的资产 if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { //如果是买入,所对应的币种增加,USDT账户减少金额 -// BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice); -// BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice); -// walletCoinUsdt.setAvailableBalance(availableBalance); -// walletCoinUsdt.setFrozenBalance(frozenBalance); -// memberWalletCoinDao.updateById(walletCoinUsdt); - memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(),totalPayPrice.negate(),totalPayPrice.negate(),entrustAmount); + memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(), totalPayPrice.negate(), totalPayPrice.negate(), entrustAmount); } else { //如果是卖出,币种减少,USDT增加 -// BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount); -// BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount); -// walletCoin.setAvailableBalance(availableBalance); -// walletCoin.setFrozenBalance(frozenBalance); -// memberWalletCoinDao.updateById(walletCoin); - memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount); + memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), amount.negate(), amount.negate(), amount); } - // 加入到撮合 TODO 通过消息队列发送到交易撮合 - //CoinTrader trader = factory.getTrader(symbol); - //trader.trade(order); + // 加入到撮合 order.setSymbol(symbol); orderSubmitProducer.sendMsg(JSONObject.toJSONString(order)); return Result.ok(MessageSourceUtils.getString("order_service_0011")); @@ -512,17 +499,28 @@ @Override @Transactional public Result cancelEntrustWalletCoinOrder(String orderId) { + // 将这个取消放入redis + boolean b = redisUtils.setNotExist(AppContants.ORDER_CANCEL_KEY + orderId, orderId, 10); + if (!b) { + return Result.ok(MessageSourceUtils.getString("order_service_0012")); + } //获取用户ID Long memberId = LoginUserUtils.getAppLoginUser().getId(); OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId); - if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId) ) { + if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId)) { // 如果是撮合交易单 if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) { + // 这里先更新状态 判断状态 防止消息发送过程中的二次提交 + if (!orderCoinsEntity.getOrderStatus().equals(OrderCoinsEntity.ORDERSTATUS_DODING)) { + // 不是持仓中 返回 + return Result.ok(MessageSourceUtils.getString("order_service_0013")); + } + + // 更新为已取消(可能在这个过程中 这个单已经成交) orderSubmitProducer.sendCancelMsg(orderId); - // return this.cancelEntrustWalletCoinOrderForMatch(orderId); return Result.ok(MessageSourceUtils.getString("order_service_0013")); } - if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) { + if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { return Result.fail(MessageSourceUtils.getString("order_service_0012")); } orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); @@ -597,21 +595,31 @@ @Override @Transactional public Result cancelEntrustWalletCoinOrderForMatch(String orderId) { - //获取用户ID + //如果redis中没有这个单 则不再往下走 OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId); - if(orderCoinsEntity==null){ + if (orderCoinsEntity == null) { return Result.ok(""); } Long memberId = orderCoinsEntity.getMemberId(); // 取消撮合订单的单 CoinTrader trader = factory.getTrader(orderCoinsEntity.getSymbol()); - trader.cancelOrder(orderCoinsEntity); - if (ObjectUtil.isNotEmpty(orderCoinsEntity) ) { - if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) { + // 从撮合交易系统得到的已成交的数据 + OrderCoinsEntity coinsEntityCancel = trader.cancelOrder(orderCoinsEntity); + if (coinsEntityCancel == null) { + // 此时说明撮合系统已经没这个单了 不需要继续处理 + return null; + } + + if (ObjectUtil.isNotEmpty(orderCoinsEntity)) { + if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { + // 已完成的直接返回 return Result.fail(MessageSourceUtils.getString("order_service_0012")); } - orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); - orderCoinsDao.updateById(orderCoinsEntity); + OrderCoinsEntity update = new OrderCoinsEntity(); + update.setId(Long.valueOf(orderId)); + update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); + //orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); + orderCoinsDao.updateById(update); String symbol = orderCoinsEntity.getSymbol(); @@ -623,10 +631,10 @@ detail.setTradeType(orderCoinsEntity.getTradeType()); detail.setSymbol(symbol); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_CANCEL); - detail.setSymbolCnt(orderCoinsEntity.getEntrustCnt()); + detail.setSymbolCnt(BigDecimal.ZERO); detail.setEntrustPrice(orderCoinsEntity.getEntrustPrice()); - detail.setDealPrice(orderCoinsEntity.getDealPrice()); - detail.setDealAmount(orderCoinsEntity.getDealAmount()); + detail.setDealPrice(BigDecimal.ZERO); + detail.setDealAmount(BigDecimal.ZERO); detail.setFeeAmount(orderCoinsEntity.getFeeAmount()); if (OrderCoinsEntity.ORDERTYPE_BUY.equals(orderCoinsEntity.getOrderType())) { //如果是限价买入,撤单将USDT账户冻结金额返回 @@ -637,13 +645,7 @@ //手续费 = 开仓价*数量*手续费率 //返还金额=开仓价*未成交数量+手续费 // 这里根据成交的单计算 - List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(Long.valueOf(orderId)); - BigDecimal dealAmount = BigDecimal.ZERO; - if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){ - for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) { - dealAmount = dealAmount.add(orderCoinsDealEntity.getDealAmount()); - } - } + BigDecimal dealAmount = coinsEntityCancel.getDealAmount(); // 市价的按成交额退款 BigDecimal returnBalance = orderCoinsEntity.getEntrustAmount().subtract(dealAmount); @@ -657,7 +659,7 @@ returnFee = orderCoinsEntity.getFeeAmount().subtract(needFee); } BigDecimal avi = returnBalance.add(returnFee); - memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),avi,null,returnBalance.negate()); + memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), avi, null, returnBalance.negate()); walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance).add(returnFee)); walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance)); //memberWalletCoinDao.updateById(walletCoin); @@ -677,10 +679,11 @@ MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol); if (ObjectUtil.isNotEmpty(walletCoin)) { // 卖出按卖出的数量计算手续费 - BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(orderCoinsEntity.getDealCnt()); + BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(coinsEntityCancel.getDealCnt()); walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance)); walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance)); - memberWalletCoinDao.updateById(walletCoin); + //memberWalletCoinDao.updateById(walletCoin); + memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), returnBalance, null, returnBalance.negate()); // 流水记录 MemberAccountFlowEntity record = new MemberAccountFlowEntity(); record.setSource(MemberAccountFlowEntity.SOURCE_CANCEL); @@ -940,36 +943,14 @@ BigDecimal price = exchangeTrade.getPrice(); // 卖单 Long sellOrderId = exchangeTrade.getSellOrderId(); + // 买卖单都需要处理 // 买单 OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId); - if(buyOrderCoinsEntity==null){ - return; - } - BigDecimal buyEntrustCnt = buyOrderCoinsEntity.getEntrustCnt(); - if(buyEntrustCnt==null){ - buyEntrustCnt = BigDecimal.ZERO; - } - Long memberId = buyOrderCoinsEntity.getMemberId(); if (buyOrderCoinsEntity != null) { - List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(buyOrderId); - // 比较剩余的量 - BigDecimal dealAmount = BigDecimal.ZERO; - BigDecimal dealCnt = BigDecimal.ZERO; - if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){ - for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) { - dealAmount=dealAmount.add(orderCoinsDealEntity.getDealAmount()); - dealCnt = dealCnt.add(orderCoinsDealEntity.getSymbolCnt()); - } - } - // 单的总金额 - BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount(); - BigDecimal add = dealAmount.add(buyTurnover); BigDecimal closingPrice = buyTurnover.multiply(new BigDecimal("0.002")); - //成交总量 - dealCnt = dealCnt.add(amount); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(buyOrderCoinsEntity.getMemberId()); @@ -984,36 +965,37 @@ detail.setDealAmount(buyTurnover); detail.setFeeAmount(closingPrice); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); + // 如果这个单在取消状态 则不执行 orderCoinDealDao.insert(detail); - // 如果这个单成交完 更改状态 - if (add.compareTo(entrustAmount) >= 0 ||(buyEntrustCnt.compareTo(BigDecimal.ZERO)>0 &&dealCnt.compareTo(buyEntrustCnt)>=0) ) { - OrderCoinsEntity update = new OrderCoinsEntity(); - update.setId(buyOrderId); - update.setDealAmount(entrustAmount); - update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount)); - update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); - update.setUpdateTime(new Date()); - orderCoinsDao.updateById(update); - // 限价买入时,如果成交价比设置的价格低,需要退还多余的冻结 - OrderCoinsEntity coinsEntity = orderCoinsDao.selectById(buyOrderId); - BigDecimal subtract = coinsEntity.getEntrustAmount().subtract(coinsEntity.getDealAmount()); - if(subtract.compareTo(BigDecimal.ZERO)>=0){ - // 下单扣的比较多 - memberWalletCoinDao.updateWalletBalance(coinsEntity.getId(),subtract,subtract,subtract.negate()); - } - } else { - // 更新买单 - orderCoinsDao.updateDeal(buyOrderId,amount,buyTurnover); + // 更新买单 + //orderCoinsDao.updateDeal(buyOrderId, amount, buyTurnover); + // 买币扣除冻结usdt 增加币种的可用 + MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue()); + if (usdtWallet != null) { + // 减少usdt冻结 + memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate()); } + // 增加买的币 + MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol()); + if (buySymbolWallet != null) { + memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null); + } + // 流水记录 + MemberAccountFlowEntity record = new MemberAccountFlowEntity(); + record.setMemberId(buyOrderCoinsEntity.getMemberId()); + record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate()); + record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol()); + record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount); + record.setSymbol(buyOrderCoinsEntity.getSymbol()); + record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover)); + memberAccountFlowEntityDao.insert(record); } + // 卖单 OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId); if (sellOrderCoinsEntity != null) { // 比较剩余的量 BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt(); - // 单的总数量 - BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt(); - BigDecimal add = dealAmount.add(amount); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(sellOrderCoinsEntity.getMemberId()); @@ -1030,46 +1012,7 @@ detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); orderCoinDealDao.insert(detail); // 如果这个单成交完 更改状态 - if (add.compareTo(entrustCnt) >= 0) { - OrderCoinsEntity update = new OrderCoinsEntity(); - update.setId(sellOrderId); - // 总成交额 - update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); - update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); - update.setDealCnt(entrustCnt); - update.setUpdateTime(new Date()); - orderCoinsDao.updateById(update); - } else { - // 未完成 - OrderCoinsEntity update = new OrderCoinsEntity(); - update.setId(sellOrderId); - // 总成交额 - update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); - update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount)); - update.setUpdateTime(new Date()); - orderCoinsDao.updateById(update); - } - // 买币扣除冻结usdt 增加币种的可用 - MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue()); - if (usdtWallet != null) { - // 减少usdt冻结 - memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate()); - } - - // 增加买的币 - MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol()); - if (buySymbolWallet != null) { - memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null); - } - // 流水记录 - MemberAccountFlowEntity record = new MemberAccountFlowEntity(); - record.setMemberId(buyOrderCoinsEntity.getMemberId()); - record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate()); - record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol()); - record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount); - record.setSymbol(buyOrderCoinsEntity.getSymbol()); - record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover)); - memberAccountFlowEntityDao.insert(record); + //orderCoinsDao.updateDeal(sellOrderId, amount, buyTurnover); // 卖家需要减少冻结的币种 增加usdt MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol()); if (memberWalletCoinEntity != null) { @@ -1085,13 +1028,49 @@ MemberAccountFlowEntity recordSell = new MemberAccountFlowEntity(); recordSell.setMemberId(sellOrderCoinsEntity.getMemberId()); recordSell.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN)); - recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + buyOrderCoinsEntity.getSymbol()); - recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + buyOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString()); - recordSell.setSymbol(buyOrderCoinsEntity.getSymbol()); + recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + sellOrderCoinsEntity.getSymbol()); + recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + sellOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString()); + recordSell.setSymbol(sellOrderCoinsEntity.getSymbol()); recordSell.setBalance(sellWalletCoinEntity.getAvailableBalance().add(buyTurnover)); memberAccountFlowEntityDao.insert(recordSell); } } } + @Override + @Transactional + public void completeOrder(List<OrderCoinsEntity> trades) { + // 订单完成 更新他们的状态 + List<Long> ids = new ArrayList<>(); + if (CollectionUtils.isNotEmpty(trades)) { + for (OrderCoinsEntity trade : trades) { + if (trade != null) { + //orderCoinsDao.updateStatus(trade.getId(),OrderCoinsEntity.ORDERSTATUS_DONE); + ids.add(trade.getId()); + // 买单 实际成交金额小于委托的 这一部分从冻结扣除 + if(OrderCoinsEntity.ORDERTYPE_BUY==trade.getOrderType()){ + if(trade.getEntrustAmount().compareTo(trade.getDealAmount())>0){ + // 此时退回这部分的差额 + BigDecimal subtract = trade.getEntrustAmount().subtract(trade.getDealAmount()); + MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(trade.getMemberId(), CoinTypeEnum.USDT.name()); + if(memberWalletCoinEntity!=null){ + memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),subtract,null,subtract.negate()); + } + } + } + } + } + } + if (CollectionUtils.isNotEmpty(ids)) { + orderCoinsDao.batchUpdateStatus(ids, OrderCoinsEntity.ORDERSTATUS_DONE); + } + } + + @Override + public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price, + BigDecimal amount, BigDecimal entrustAmount) { + + } + + } diff --git a/src/main/java/com/xcong/excoin/quartz/job/KlineDataUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/KlineDataUpdateJob.java index c2d7d84..ca41425 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/KlineDataUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/KlineDataUpdateJob.java @@ -150,7 +150,4 @@ } } - public void updateTodayLine(){ - symbolsService.updateSymbolsKine("1day"); - } } diff --git a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java index b849903..99fdc25 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java @@ -38,27 +38,32 @@ @PostConstruct public void initNewestPrice() { - log.info("#=======价格更新开启=======#"); - SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); - subscriptionOptions.setConnectionDelayOnFailure(5); - subscriptionOptions.setUri("wss://api.hadax.com/ws"); - SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); - subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> { - String symbol = tradeEvent.getSymbol(); - // 根据symbol判断做什么操作 - symbol = CoinTypeConvert.convert(symbol); - if (null != symbol) { - String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString(); - // TODO 测试环境关闭这个插入redis - redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); - // 比较 - //websocketPriceService.comparePriceAsc(symbol, price); - //websocketPriceService.comparePriceDesc(symbol, price); - //System.out.println("比较完毕:"+symbol+"-"+price); + try{ + log.info("#=======价格更新开启=======#"); + SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); + subscriptionOptions.setConnectionDelayOnFailure(5); + subscriptionOptions.setUri("wss://api.hadax.com/ws"); + SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); + subscriptionClient.subscribeTradeEvent("btcusdt,ethusdt,xrpusdt,ltcusdt,bchusdt,eosusdt,etcusdt", tradeEvent -> { + String symbol = tradeEvent.getSymbol(); + // 根据symbol判断做什么操作 + symbol = CoinTypeConvert.convert(symbol); + if (null != symbol) { + String price = tradeEvent.getTradeList().get(0).getPrice().toPlainString(); + // TODO 测试环境关闭这个插入redis + redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); + // 比较 + //websocketPriceService.comparePriceAsc(symbol, price); + //websocketPriceService.comparePriceDesc(symbol, price); + //System.out.println("比较完毕:"+symbol+"-"+price); - } + } - }); + }); + }catch (Exception e){ + + } + // subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { // Candlestick data = candlestickEvent.getData(); // redisUtils.set(CoinTypeConvert.convert(candlestickEvent.getSymbol()), data); diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java index a1a2d51..00646d6 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java @@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; @@ -71,6 +72,8 @@ if(CollectionUtils.isEmpty(exchangeTrades)){ return; } + // 先处理处理用户订单 + orderCoinService.handleOrder(exchangeTrades); // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 @@ -112,8 +115,21 @@ newCandlestick.setTick(model); tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); } - // 处理用户订单 - orderCoinService.handleOrder(exchangeTrades); + + } + + /** + * 撮合交易订单全部完成 + * @param content + */ + @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE) + public void doComplete(String content) { + log.debug("#完成的订单---->{}#", content); + List<OrderCoinsEntity> exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class); + if(CollectionUtils.isEmpty(exchangeTrades)){ + return; + } + orderCoinService.completeOrder(exchangeTrades); } } diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java index 4f20d62..10f5118 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java @@ -6,12 +6,16 @@ import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.trade.CoinTraderFactory; +import com.xcong.excoin.trade.ExchangeTrade; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.Iterator; +import java.util.List; /** * 提交买卖单进入撮合系统 @@ -42,7 +46,9 @@ */ @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_CANCEL) public void doCancel(String content) { - log.info("#取消的订单---->{}#", content); + log.debug("#取消的订单---->{}#", content); orderCoinService.cancelEntrustWalletCoinOrderForMatch(content); } + + } diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java index 7288fc6..62c25c7 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java @@ -37,7 +37,7 @@ @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS) public void addUsdtAddress(String content) { if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){ - log.info("#添加新地址---->{}#", content); + log.debug("#添加新地址---->{}#", content); UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(content); } } diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java index 5d6e4f6..77aee3e 100644 --- a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java +++ b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java @@ -40,6 +40,12 @@ } + public void sendCompleteMsg(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_COMPLETE, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_COMPLETE, content, correlationData); + } + + @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //log.info("#----->{}#", correlationData); diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java index d873ca1..1cc749f 100644 --- a/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java +++ b/src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java @@ -35,6 +35,7 @@ } + @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("#----->{}#", correlationData); diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java index 3e249f0..38de6e4 100644 --- a/src/main/java/com/xcong/excoin/trade/CoinTrader.java +++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java @@ -2,11 +2,8 @@ import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; -import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.rabbit.producer.ExchangeProducer; -import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -313,7 +310,7 @@ if (trade != null) { exchangeTrades.add(trade); } - //判断匹配单是否完成 TODO + //判断匹配单是否完成 if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { //当前匹配的订单完成交易,删除该订单 orderIterator.remove(); @@ -355,7 +352,7 @@ */ private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) { if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { - //剩余成交量 TODO ? + //剩余成交量 // 委托量-成交量=剩余量 BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount()); return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN); @@ -517,11 +514,11 @@ for (int index = 0; index < size; index += maxSize) { int length = (size - index) > maxSize ? maxSize : size - index; List<OrderCoinsEntity> subOrders = orders.subList(index, index + length); - // TODO 通知订单完成 - //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders)); + // 通知订单完成 + exchangeProducer.sendCompleteMsg(JSON.toJSONString(subOrders)); } } else { - // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders)); + exchangeProducer.sendCompleteMsg(JSON.toJSONString(orders)); } } } diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml index 16190bd..b443d3d 100644 --- a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml +++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml @@ -8,11 +8,12 @@ </select> <select id="findCoinOrderListByMemberIdAndSysmbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity"> - SELECT * FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status} + SELECT (select sum(symbol_cnt) from coins_order_deal where order_id = a.id) as deal_cnt, a.create_by,a.create_time, a.update_by, a.update_time, a.version,a.id, a.member_id, a.order_no, a.order_type, a.symbol, a.mark_price, a.entrust_cnt, a.entrust_price, a.deal_price, a.deal_amount, a.order_status, a.trade_type, a.fee_amount, a.entrust_amount + FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status} <if test="symbol != null and symbol !=''"> and a.symbol = #{symbol} </if> - order by create_time desc + order by a.create_time desc </select> <select id="findWalletCoinOrderByOrderNo" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity"> @@ -57,4 +58,17 @@ </set> where id = #{id} </update> + + <update id="batchUpdateStatus" parameterType="map"> + update coins_order set order_status = #{status} + where id in + <foreach collection="list" item="item" separator="," open="(" close=")"> + #{item} + </foreach> + </update> + + <update id="updateStatus" parameterType="map"> + update coins_order set order_status = #{status} + where id =#{id} + </update> </mapper> -- Gitblit v1.9.1