src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java
@@ -1,7 +1,13 @@ package com.xcong.excoin.common.aop; import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.asymmetric.KeyType; import cn.hutool.crypto.asymmetric.RSA; import com.xcong.excoin.common.LoginUserUtils; import com.xcong.excoin.common.annotations.SubmitRepeat; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.common.response.Result; import com.xcong.excoin.configurations.properties.SecurityProperties; import com.xcong.excoin.utils.MessageSourceUtils; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; @@ -27,6 +33,8 @@ @Resource private RedisUtils redisUtil; @Resource private SecurityProperties securityProperties; private String key; @@ -44,9 +52,15 @@ ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); String token = request.getHeader("token"); //String token = request.getHeader("token"); String bearerToken = request.getHeader(AppContants.TOKEN_HEADER); String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, ""); RSA rsa = new RSA(securityProperties.getPrivateKey(), null); String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_"); String token = tokens[0]; String uri = request.getRequestURI(); String mId = (String) redisUtil.get(token); Long mId = LoginUserUtils.getAppLoginUser().getId(); //String mId = (String) redisUtil.get(token); log.info("#token : {}, uri : {}, mId : {}#", token, uri, mId); key = mId + "_" + uri; boolean flag = redisUtil.setNotExist(key, "1", 5); src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -76,4 +76,9 @@ public static final String TIME_OUT = "time_out"; /** * 取消订单id */ public static final String ORDER_CANCEL_KEY = "COIN_ORDER_CANCEL_"; } src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -121,6 +121,12 @@ public static final String ROUTING_KEY_ROC_ORDER_CANCEL = "roc-order-routingKey-cancel"; public static final String EXCHANGE_ROC_ORDER_COMPLETE = "roc-exchange-order-complete"; public static final String QUEUE_ROC_ORDER_COMPLETE = "roc-order-queue-complete"; public static final String ROUTING_KEY_ROC_ORDER_COMPLETE = "roc-order-routingKey-complete"; @Resource private ConnectionFactory connectionFactory; @@ -189,6 +195,22 @@ public Binding bindingCancelOrder() { return BindingBuilder.bind(ordereCancelQueue()).to(orderCancelExchange()).with(ROUTING_KEY_ROC_ORDER_CANCEL); } // 交易订单 @Bean public DirectExchange orderCompleteExchange() { return new DirectExchange(EXCHANGE_ROC_ORDER_COMPLETE); } @Bean public Queue ordereCompleteQueue() { return new Queue(QUEUE_ROC_ORDER_COMPLETE, true); } @Bean public Binding bindingCompleteOrder() { return BindingBuilder.bind(ordereCompleteQueue()).to(orderCompleteExchange()).with(ROUTING_KEY_ROC_ORDER_COMPLETE); } @Bean src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -6,6 +6,7 @@ import javax.validation.Valid; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.annotations.SubmitRepeat; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.symbols.constants.SymbolsConstats; import org.springframework.validation.annotation.Validated; @@ -61,8 +62,9 @@ */ @ApiOperation(value = "提交买卖订单", notes = "提交买卖订单") @PostMapping(value="/submitSalesWalletCoinOrder") @SubmitRepeat public Result submitSalesWalletCoinOrder(@RequestBody @Valid SubmitSalesWalletCoinOrderDto submitSalesWalletCoinOrderDto) { log.info("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto)); log.debug("买卖单参数[{}]", JSONObject.toJSONString(submitSalesWalletCoinOrderDto)); String symbol = submitSalesWalletCoinOrderDto.getSymbol(); Integer type = submitSalesWalletCoinOrderDto.getType(); Integer tradeType = submitSalesWalletCoinOrderDto.getTradeType(); @@ -97,6 +99,7 @@ */ @ApiOperation(value = "撤销委托订单", notes = "撤销委托订单") @PostMapping(value="/cancelEntrustWalletCoinOrder") @SubmitRepeat public Result cancelEntrustWalletCoinOrder(@RequestBody @Valid CancelEntrustWalletCoinOrderDto cancelEntrustWalletCoinOrderDto) { String orderId = cancelEntrustWalletCoinOrderDto.getOrderId(); // 根据不同币种 src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
@@ -21,4 +21,7 @@ List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list); void updateDeal(@Param("id") Long id, @Param("dealCnt")BigDecimal dealCnt,@Param("dealAmount")BigDecimal dealAmount); void batchUpdateStatus(@Param("list")List<Long> list,@Param("status") Integer status); void updateStatus(@Param("id")Long id,@Param("status") Integer status); } src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -52,6 +52,8 @@ public void handleOrder(List<ExchangeTrade> trades); public void completeOrder(List<OrderCoinsEntity> trades); void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount,BigDecimal entrustAmount); @@ -62,4 +64,5 @@ */ public Result cancelEntrustWalletCoinOrderForMatch(String orderId); } src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -8,6 +8,7 @@ import javax.annotation.Resource; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.common.enumerates.CoinTypeEnum; import com.xcong.excoin.modules.blackchain.service.RocService; import com.xcong.excoin.modules.coin.mapper.OrderCoinsDealMapper; @@ -349,10 +350,10 @@ } // 需要先 String phone = memberEntity.getPhone(); // if(!"13632989240".equals(phone) && !"15158130575".equals(phone)){ // if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){ // // 不能超过800个 // if (!"13632989240".equals(phone) && !"15158130575".equals(phone)) { if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { // 不能超过800个 // if(amount!=null && amount.compareTo(new BigDecimal("800"))>0){ // return Result.fail("买入额度受限"); // } @@ -374,10 +375,10 @@ // if(bigDecimal1!=null && bigDecimal1.compareTo(new BigDecimal("800"))>0){ // return Result.fail("买入额度受限"); // } // }else{ // return Result.fail("卖出受限"); // } // } } else { return Result.fail("卖出受限"); } } BigDecimal nowPriceinBigDecimal = price; //查询当前价 @@ -419,8 +420,6 @@ totalPayPrice = entrustAmount.add(closingPrice); } } // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice); String walletCode = MemberWalletCoinEnum.WALLETCOINCODE.getValue(); MemberWalletCoinEntity walletCoinUsdt = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, walletCode); if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { @@ -486,24 +485,12 @@ //冻结相应的资产 if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) { //如果是买入,所对应的币种增加,USDT账户减少金额 // BigDecimal availableBalance = walletCoinUsdt.getAvailableBalance().subtract(totalPayPrice); // BigDecimal frozenBalance = walletCoinUsdt.getFrozenBalance().add(totalPayPrice); // walletCoinUsdt.setAvailableBalance(availableBalance); // walletCoinUsdt.setFrozenBalance(frozenBalance); // memberWalletCoinDao.updateById(walletCoinUsdt); memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(),totalPayPrice.negate(),totalPayPrice.negate(),entrustAmount); } else { //如果是卖出,币种减少,USDT增加 // BigDecimal availableBalance = walletCoin.getAvailableBalance().subtract(amount); // BigDecimal frozenBalance = walletCoin.getFrozenBalance().add(amount); // walletCoin.setAvailableBalance(availableBalance); // walletCoin.setFrozenBalance(frozenBalance); // memberWalletCoinDao.updateById(walletCoin); memberWalletCoinDao.updateWalletBalance(walletCoin.getId(),amount.negate(),amount.negate(),amount); } // 加入到撮合 TODO 通过消息队列发送到交易撮合 //CoinTrader trader = factory.getTrader(symbol); //trader.trade(order); // 加入到撮合 order.setSymbol(symbol); orderSubmitProducer.sendMsg(JSONObject.toJSONString(order)); return Result.ok(MessageSourceUtils.getString("order_service_0011")); @@ -544,14 +531,25 @@ @Override @Transactional public Result cancelEntrustWalletCoinOrder(String orderId) { // 将这个取消放入redis boolean b = redisUtils.setNotExist(AppContants.ORDER_CANCEL_KEY + orderId, orderId, 10); if (!b) { return Result.ok(MessageSourceUtils.getString("order_service_0012")); } //获取用户ID Long memberId = LoginUserUtils.getAppLoginUser().getId(); OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId); if (ObjectUtil.isNotEmpty(orderCoinsEntity) && orderCoinsEntity.getMemberId().equals(memberId) ) { // 如果是撮合交易单 if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) { // 这里先更新状态 判断状态 防止消息发送过程中的二次提交 if (!orderCoinsEntity.getOrderStatus().equals(OrderCoinsEntity.ORDERSTATUS_DODING)) { // 不是持仓中 返回 return Result.ok(MessageSourceUtils.getString("order_service_0013")); } // 更新为已取消(可能在这个过程中 这个单已经成交) orderSubmitProducer.sendCancelMsg(orderId); // return this.cancelEntrustWalletCoinOrderForMatch(orderId); return Result.ok(MessageSourceUtils.getString("order_service_0013")); } if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) { @@ -629,7 +627,7 @@ @Override @Transactional public Result cancelEntrustWalletCoinOrderForMatch(String orderId) { //获取用户ID //如果redis中没有这个单 则不再往下走 OrderCoinsEntity orderCoinsEntity = orderCoinsDao.selectById(orderId); if(orderCoinsEntity==null){ return Result.ok(""); @@ -637,13 +635,23 @@ Long memberId = orderCoinsEntity.getMemberId(); // 取消撮合订单的单 CoinTrader trader = factory.getTrader(orderCoinsEntity.getSymbol()); trader.cancelOrder(orderCoinsEntity); // 从撮合交易系统得到的已成交的数据 OrderCoinsEntity coinsEntityCancel = trader.cancelOrder(orderCoinsEntity); if (coinsEntityCancel == null) { // 此时说明撮合系统已经没这个单了 不需要继续处理 return null; } if (ObjectUtil.isNotEmpty(orderCoinsEntity) ) { if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) { if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { // 已完成的直接返回 return Result.fail(MessageSourceUtils.getString("order_service_0012")); } orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); orderCoinsDao.updateById(orderCoinsEntity); OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(Long.valueOf(orderId)); update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); //orderCoinsEntity.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_CANCEL); orderCoinsDao.updateById(update); String symbol = orderCoinsEntity.getSymbol(); @@ -655,10 +663,10 @@ detail.setTradeType(orderCoinsEntity.getTradeType()); detail.setSymbol(symbol); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_CANCEL); detail.setSymbolCnt(orderCoinsEntity.getEntrustCnt()); detail.setSymbolCnt(BigDecimal.ZERO); detail.setEntrustPrice(orderCoinsEntity.getEntrustPrice()); detail.setDealPrice(orderCoinsEntity.getDealPrice()); detail.setDealAmount(orderCoinsEntity.getDealAmount()); detail.setDealPrice(BigDecimal.ZERO); detail.setDealAmount(BigDecimal.ZERO); detail.setFeeAmount(orderCoinsEntity.getFeeAmount()); if (OrderCoinsEntity.ORDERTYPE_BUY.equals(orderCoinsEntity.getOrderType())) { //如果是限价买入,撤单将USDT账户冻结金额返回 @@ -669,13 +677,7 @@ //手续费 = 开仓价*数量*手续费率 //返还金额=开仓价*未成交数量+手续费 // 这里根据成交的单计算 List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(Long.valueOf(orderId)); BigDecimal dealAmount = BigDecimal.ZERO; if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){ for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) { dealAmount = dealAmount.add(orderCoinsDealEntity.getDealAmount()); } } BigDecimal dealAmount = coinsEntityCancel.getDealAmount(); // 市价的按成交额退款 BigDecimal returnBalance = orderCoinsEntity.getEntrustAmount().subtract(dealAmount); @@ -709,10 +711,11 @@ MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol); if (ObjectUtil.isNotEmpty(walletCoin)) { // 卖出按卖出的数量计算手续费 BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(orderCoinsEntity.getDealCnt()); BigDecimal returnBalance = orderCoinsEntity.getEntrustCnt().subtract(coinsEntityCancel.getDealCnt()); walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance)); walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance)); memberWalletCoinDao.updateById(walletCoin); //memberWalletCoinDao.updateById(walletCoin); memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), returnBalance, null, returnBalance.negate()); // 流水记录 MemberAccountFlowEntity record = new MemberAccountFlowEntity(); record.setSource(MemberAccountFlowEntity.SOURCE_CANCEL); @@ -972,36 +975,14 @@ BigDecimal price = exchangeTrade.getPrice(); // 卖单 Long sellOrderId = exchangeTrade.getSellOrderId(); // 买卖单都需要处理 // 买单 OrderCoinsEntity buyOrderCoinsEntity = orderCoinsDao.selectById(buyOrderId); if(buyOrderCoinsEntity==null){ return; } BigDecimal buyEntrustCnt = buyOrderCoinsEntity.getEntrustCnt(); if(buyEntrustCnt==null){ buyEntrustCnt = BigDecimal.ZERO; } Long memberId = buyOrderCoinsEntity.getMemberId(); if (buyOrderCoinsEntity != null) { List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectCoinOrderDealByOrderId(buyOrderId); // 比较剩余的量 BigDecimal dealAmount = BigDecimal.ZERO; BigDecimal dealCnt = BigDecimal.ZERO; if(CollectionUtils.isNotEmpty(orderCoinsDealEntities)){ for (OrderCoinsDealEntity orderCoinsDealEntity : orderCoinsDealEntities) { dealAmount=dealAmount.add(orderCoinsDealEntity.getDealAmount()); dealCnt = dealCnt.add(orderCoinsDealEntity.getSymbolCnt()); } } // 单的总金额 BigDecimal entrustAmount = buyOrderCoinsEntity.getEntrustAmount(); BigDecimal add = dealAmount.add(buyTurnover); BigDecimal closingPrice = buyTurnover.multiply(new BigDecimal("0.002")); //成交总量 dealCnt = dealCnt.add(amount); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(buyOrderCoinsEntity.getMemberId()); @@ -1016,36 +997,37 @@ detail.setDealAmount(buyTurnover); detail.setFeeAmount(closingPrice); detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); // 如果这个单在取消状态 则不执行 orderCoinDealDao.insert(detail); // 如果这个单成交完 更改状态 if (add.compareTo(entrustAmount) >= 0 ||(buyEntrustCnt.compareTo(BigDecimal.ZERO)>0 &&dealCnt.compareTo(buyEntrustCnt)>=0) ) { OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(buyOrderId); update.setDealAmount(entrustAmount); update.setDealCnt(buyOrderCoinsEntity.getDealCnt().add(amount)); update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); update.setUpdateTime(new Date()); orderCoinsDao.updateById(update); // 限价买入时,如果成交价比设置的价格低,需要退还多余的冻结 OrderCoinsEntity coinsEntity = orderCoinsDao.selectById(buyOrderId); BigDecimal subtract = coinsEntity.getEntrustAmount().subtract(coinsEntity.getDealAmount()); if(subtract.compareTo(BigDecimal.ZERO)>=0){ // 下单扣的比较多 memberWalletCoinDao.updateWalletBalance(coinsEntity.getId(),subtract,subtract,subtract.negate()); } } else { // 更新买单 orderCoinsDao.updateDeal(buyOrderId,amount,buyTurnover); //orderCoinsDao.updateDeal(buyOrderId, amount, buyTurnover); // 买币扣除冻结usdt 增加币种的可用 MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue()); if (usdtWallet != null) { // 减少usdt冻结 memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate()); } // 增加买的币 MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol()); if (buySymbolWallet != null) { memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null); } // 流水记录 MemberAccountFlowEntity record = new MemberAccountFlowEntity(); record.setMemberId(buyOrderCoinsEntity.getMemberId()); record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate()); record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol()); record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount); record.setSymbol(buyOrderCoinsEntity.getSymbol()); record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover)); memberAccountFlowEntityDao.insert(record); } // 卖单 OrderCoinsEntity sellOrderCoinsEntity = orderCoinsDao.selectById(sellOrderId); if (sellOrderCoinsEntity != null) { // 比较剩余的量 BigDecimal dealAmount = sellOrderCoinsEntity.getDealCnt(); // 单的总数量 BigDecimal entrustCnt = sellOrderCoinsEntity.getEntrustCnt(); BigDecimal add = dealAmount.add(amount); // 创建一个完成的单 OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); detail.setMemberId(sellOrderCoinsEntity.getMemberId()); @@ -1062,46 +1044,7 @@ detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); orderCoinDealDao.insert(detail); // 如果这个单成交完 更改状态 if (add.compareTo(entrustCnt) >= 0) { OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(sellOrderId); // 总成交额 update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); update.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); update.setDealCnt(entrustCnt); update.setUpdateTime(new Date()); orderCoinsDao.updateById(update); } else { // 未完成 OrderCoinsEntity update = new OrderCoinsEntity(); update.setId(sellOrderId); // 总成交额 update.setDealAmount(buyTurnover.add(sellOrderCoinsEntity.getDealAmount())); update.setDealCnt(sellOrderCoinsEntity.getDealCnt().add(amount)); update.setUpdateTime(new Date()); orderCoinsDao.updateById(update); } // 买币扣除冻结usdt 增加币种的可用 MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue()); if (usdtWallet != null) { // 减少usdt冻结 memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate()); } // 增加买的币 MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol()); if (buySymbolWallet != null) { memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null); } // 流水记录 MemberAccountFlowEntity record = new MemberAccountFlowEntity(); record.setMemberId(buyOrderCoinsEntity.getMemberId()); record.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN).negate()); record.setSource(MemberAccountFlowEntity.SOURCE_BUY + buyOrderCoinsEntity.getSymbol()); record.setRemark(MemberAccountFlowEntity.REMARK_BUY + buyOrderCoinsEntity.getSymbol() + ":" + amount); record.setSymbol(buyOrderCoinsEntity.getSymbol()); record.setBalance(usdtWallet.getAvailableBalance().subtract(buyTurnover)); memberAccountFlowEntityDao.insert(record); //orderCoinsDao.updateDeal(sellOrderId, amount, buyTurnover); // 卖家需要减少冻结的币种 增加usdt MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol()); if (memberWalletCoinEntity != null) { @@ -1117,9 +1060,9 @@ MemberAccountFlowEntity recordSell = new MemberAccountFlowEntity(); recordSell.setMemberId(sellOrderCoinsEntity.getMemberId()); recordSell.setPrice(buyTurnover.setScale(4, BigDecimal.ROUND_DOWN)); recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + buyOrderCoinsEntity.getSymbol()); recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + buyOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString()); recordSell.setSymbol(buyOrderCoinsEntity.getSymbol()); recordSell.setSource(MemberAccountFlowEntity.SOURCE_SALE + sellOrderCoinsEntity.getSymbol()); recordSell.setRemark(MemberAccountFlowEntity.REMARK_SALE + sellOrderCoinsEntity.getSymbol() + ":" + amount.toPlainString()); recordSell.setSymbol(sellOrderCoinsEntity.getSymbol()); recordSell.setBalance(sellWalletCoinEntity.getAvailableBalance().add(buyTurnover)); memberAccountFlowEntityDao.insert(recordSell); } @@ -1127,6 +1070,35 @@ } @Override @Transactional public void completeOrder(List<OrderCoinsEntity> trades) { // 订单完成 更新他们的状态 List<Long> ids = new ArrayList<>(); if (CollectionUtils.isNotEmpty(trades)) { for (OrderCoinsEntity trade : trades) { if (trade != null) { //orderCoinsDao.updateStatus(trade.getId(),OrderCoinsEntity.ORDERSTATUS_DONE); ids.add(trade.getId()); // 买单 实际成交金额小于委托的 这一部分从冻结扣除 if(OrderCoinsEntity.ORDERTYPE_BUY==trade.getOrderType()){ if(trade.getEntrustAmount().compareTo(trade.getDealAmount())>0){ // 此时退回这部分的差额 BigDecimal subtract = trade.getEntrustAmount().subtract(trade.getDealAmount()); MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(trade.getMemberId(), CoinTypeEnum.USDT.name()); if(memberWalletCoinEntity!=null){ memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),subtract,null,subtract.negate()); } } } } } } if (CollectionUtils.isNotEmpty(ids)) { orderCoinsDao.batchUpdateStatus(ids, OrderCoinsEntity.ORDERSTATUS_DONE); } } @Override public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) { src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -4,6 +4,7 @@ import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.modules.exchange.service.HandleKlineService; import com.xcong.excoin.trade.ExchangeTrade; @@ -71,6 +72,8 @@ if(CollectionUtils.isEmpty(exchangeTrades)){ return; } // 先处理处理用户订单 orderCoinService.handleOrder(exchangeTrades); // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 @@ -112,8 +115,21 @@ newCandlestick.setTick(model); tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); } // 处理用户订单 orderCoinService.handleOrder(exchangeTrades); } /** * 撮合交易订单全部完成 * @param content */ @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_COMPLETE) public void doComplete(String content) { log.debug("#完成的订单---->{}#", content); List<OrderCoinsEntity> exchangeTrades = JSONObject.parseArray(content, OrderCoinsEntity.class); if(CollectionUtils.isEmpty(exchangeTrades)){ return; } orderCoinService.completeOrder(exchangeTrades); } } src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java
@@ -6,12 +6,16 @@ import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.trade.ExchangeTrade; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Iterator; import java.util.List; /** * 提交买卖单进入撮合系统 @@ -42,7 +46,9 @@ */ @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_CANCEL) public void doCancel(String content) { log.info("#取消的订单---->{}#", content); log.debug("#取消的订单---->{}#", content); orderCoinService.cancelEntrustWalletCoinOrderForMatch(content); } } src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -37,7 +37,7 @@ @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS) public void addUsdtAddress(String content) { if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){ log.info("#添加新地址---->{}#", content); log.debug("#添加新地址---->{}#", content); UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(content); } } src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
@@ -40,6 +40,12 @@ } public void sendCompleteMsg(String content) { CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ROC_ORDER_COMPLETE, RabbitMqConfig.ROUTING_KEY_ROC_ORDER_COMPLETE, content, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { //log.info("#----->{}#", correlationData); src/main/java/com/xcong/excoin/rabbit/producer/OrderSubmitProducer.java
@@ -35,6 +35,7 @@ } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("#----->{}#", correlationData); src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -2,11 +2,8 @@ import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.rabbit.producer.ExchangeProducer; import org.apache.commons.collections.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -313,7 +310,7 @@ if (trade != null) { exchangeTrades.add(trade); } //判断匹配单是否完成 TODO //判断匹配单是否完成 if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { //当前匹配的订单完成交易,删除该订单 orderIterator.remove(); @@ -355,7 +352,7 @@ */ private BigDecimal calculateTradedAmount(OrderCoinsEntity order, BigDecimal dealPrice) { if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { //剩余成交量 TODO ? //剩余成交量 // 委托量-成交量=剩余量 BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount()); return leftTurnover.divide(dealPrice, coinScale, BigDecimal.ROUND_DOWN); @@ -517,11 +514,11 @@ for (int index = 0; index < size; index += maxSize) { int length = (size - index) > maxSize ? maxSize : size - index; List<OrderCoinsEntity> subOrders = orders.subList(index, index + length); // TODO 通知订单完成 //kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(subOrders)); // 通知订单完成 exchangeProducer.sendCompleteMsg(JSON.toJSONString(subOrders)); } } else { // kafkaTemplate.send("exchange-order-completed", JSON.toJSONString(orders)); exchangeProducer.sendCompleteMsg(JSON.toJSONString(orders)); } } } src/main/resources/application-test.yml
@@ -7,9 +7,9 @@ profiles: active: dev datasource: url: jdbc:mysql://47.114.114.219:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 username: roc_user password: roc123pasd!@ url: jdbc:mysql://47.96.73.250:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 username: shop_user password: 123456 driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource druid: @@ -50,11 +50,11 @@ ## Redis数据库索引(默认为0) database: 1 ## Redis服务器地址 host: 47.114.114.219 host: 47.96.73.250 ## Redis服务器连接端口 port: 6379 ## Redis服务器连接密码(默认为空) password: biyi123 password: qwer12345678 jedis: pool: ## 连接池最大连接数(使用负值表示没有限制) @@ -72,9 +72,9 @@ ## 连接超时时间(毫秒) timeout: 30000 rabbitmq: host: 120.27.238.55 host: 47.96.73.250 port: 5672 username: ct_rabbit username: rabbit password: 123456 publisher-confirm-type: correlated @@ -91,17 +91,16 @@ app: debug: false debug: true redis_expire: 3000 kline-update-job: false newest-price-update-job: true #日线 该任务不能与最新价处于同一个服务器 trade: true newest-price-update-job: false exchange-trade: true day-line: false other-job: true loop-job: true other-job: false loop-job: false rabbit-consumer: false block-job: true block-job: false aliyun: oss: src/main/resources/application.yml
@@ -5,11 +5,11 @@ spring: profiles: active: prod active: prodapp datasource: url: jdbc:mysql://rm-bp151tw8er79ig9kb5o.mysql.rds.aliyuncs.com:3306/db_biue?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 username: ctcoin_data password: ctcoin_123 url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 username: roc_user password: roc123pasd!@ driver-class-name: com.mysql.jdbc.Driver type: com.alibaba.druid.pool.DruidDataSource druid: @@ -48,7 +48,7 @@ ## redis配置 redis: ## Redis数据库索引(默认为0) database: 2 database: 1 ## Redis服务器地址 host: 47.114.114.219 ## Redis服务器连接端口 @@ -72,10 +72,10 @@ ## 连接超时时间(毫秒) timeout: 30000 rabbitmq: host: 120.27.238.55 host: 47.114.114.219 port: 5672 username: ct_rabbit password: 123456 username: roc_user password: roc123456 publisher-confirm-type: correlated @@ -93,13 +93,10 @@ app: debug: false redis_expire: 3000 # k线更新任务控制 kline-update-job: false #最新价任务控制 newest-price-update-job: false #日线 该任务不能与最新价处于同一个服务器 exchange-trade: false day-line: false #其他任务控制 other-job: false loop-job: false rabbit-consumer: false src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
@@ -8,11 +8,12 @@ </select> <select id="findCoinOrderListByMemberIdAndSysmbol" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity"> SELECT * FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status} SELECT (select sum(symbol_cnt) from coins_order_deal where order_id = a.id) as deal_cnt, a.create_by,a.create_time, a.update_by, a.update_time, a.version,a.id, a.member_id, a.order_no, a.order_type, a.symbol, a.mark_price, a.entrust_cnt, a.entrust_price, a.deal_price, a.deal_amount, a.order_status, a.trade_type, a.fee_amount, a.entrust_amount FROM coins_order a where a.member_id= #{memberId} and a.order_status = #{status} <if test="symbol != null and symbol !=''"> and a.symbol = #{symbol} </if> order by create_time desc order by a.create_time desc </select> <select id="findWalletCoinOrderByOrderNo" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity"> @@ -57,4 +58,17 @@ </set> where id = #{id} </update> <update id="batchUpdateStatus" parameterType="map"> update coins_order set order_status = #{status} where id in <foreach collection="list" item="item" separator="," open="(" close=")"> #{item} </foreach> </update> <update id="updateStatus" parameterType="map"> update coins_order set order_status = #{status} where id =#{id} </update> </mapper> src/test/java/com/xcong/excoin/TradeTest.java
@@ -5,6 +5,7 @@ import com.xcong.excoin.modules.coin.service.OrderCoinService; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.utils.RedisUtils; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @@ -13,6 +14,7 @@ import java.math.BigDecimal; import java.text.ParseException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; @Slf4j @@ -31,4 +33,39 @@ public void buy(){ redisUtils.set("ROC_NEW_PRICE",new BigDecimal("12.33")); } public static void main(String[] args) throws InterruptedException { // 测试两个地方 List<String> list = new ArrayList<>(); list.add("1"); list.add("2"); list.add("3"); list.add("4"); // 开一个线程 Thread thread = new Thread(new Runnable() { @SneakyThrows @Override public void run() { Thread.sleep(1000); synchronized (list){ Iterator<String> iterator = list.iterator(); while (iterator.hasNext()){ System.out.println("线程里"+iterator.next()); iterator.remove(); } } } }); thread.start(); synchronized (list){ Iterator<String> iterator = list.iterator(); while (iterator.hasNext()){ Thread.sleep(1000); System.out.println(iterator.next()); } } // } }