src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -87,4 +87,6 @@ public static final String WHOLE_BOMB_PREFIX = "whole_bomb_"; public static final String HOLD_BOND_RATIO = "hold_bond_ratio"; } src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -57,6 +57,9 @@ // 全仓爆仓 public static final String QUEUE_WHOLE_BOMB = "QUEUE_WHOLE_BOMB_NEW"; // 全仓价格操作 public static final String QUEUE_WHOLE_PRICE = "QUEUE_WHOLE_PRCE"; // 开多止盈路由键 public static final String ROUTINGKEY_MOREPRO = "ROUTINGKEY_MOREPRO"; @@ -83,6 +86,9 @@ // 全仓爆仓 public static final String ROUTINGKEY_WHOLE_BOMB = "ROUTINGKEY_WHOLE_BOMB"; // 全仓价格操作 public static final String ROUTINGKEY_WHOLE_PRICE = "ROUTINGKEY_WHOLE_PRICE"; @Resource private ConnectionFactory connectionFactory; @@ -224,6 +230,10 @@ return new Queue(QUEUE_CLOSETRADE, true); } @Bean public Queue queueWholePrice() { return new Queue(QUEUE_WHOLE_PRICE, true); } /** * 全仓爆仓 * @return @@ -233,6 +243,12 @@ return new Queue(QUEUE_WHOLE_BOMB, true); } @Bean public Binding bindingWholePrice() { return BindingBuilder.bind(queueWholePrice()).to(orderExchange()).with(RabbitMqConfig.ROUTINGKEY_WHOLE_PRICE); } /** * 开多止盈 * src/main/java/com/xcong/excoin/modules/contract/entity/ContractHoldOrderEntity.java
@@ -163,4 +163,9 @@ */ // @TableField(exist = false) private BigDecimal rewardAmount; /** * 维持保证金 */ private BigDecimal holdBond; } src/main/java/com/xcong/excoin/modules/contract/entity/ContractOrderEntity.java
@@ -205,4 +205,9 @@ private int operateNo; /** * 维持保证金 */ private BigDecimal holdBond; } src/main/java/com/xcong/excoin/modules/contract/service/RabbitOrderService.java
@@ -3,6 +3,7 @@ import com.xcong.excoin.modules.member.entity.MemberEntity; import com.xcong.excoin.rabbit.pricequeue.OrderModel; import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; import java.util.List; @@ -16,4 +17,6 @@ public void entrustCloseOrder(List<OrderModel> list); public void wholeBombOrder(List<OrderModel> list); public void wholeBombOrder(WholePriceDataModel wholePriceData); } src/main/java/com/xcong/excoin/modules/contract/service/impl/ContractHoldOrderServiceImpl.java
@@ -192,6 +192,7 @@ wholeHoldOrder.setSymbolCntSale(wholeHoldOrder.getSymbolCntSale() + submitOrderDto.getSymbolCnt()); wholeHoldOrder.setPrePaymentAmount(prePaymentAmount.add(wholeHoldOrder.getPrePaymentAmount())); wholeHoldOrder.setOperateNo(wholeHoldOrder.getOperateNo()); wholeHoldOrder.setHoldBond(CalculateUtil.calMemberHoldBond(wholeHoldOrder)); int i = contractHoldOrderDao.updateById(wholeHoldOrder); if (i > 0) { memberWalletContractDao.increaseWalletContractBalanceById(prePaymentAmount.negate(), openFeePrice.negate(), null, walletContract.getId()); @@ -238,6 +239,8 @@ ContractOrderEntity contractOrderEntity = ContractHoldOrderEntityMapper.INSTANCE.holdOrderToOrder(holdOrderEntity); contractOrderEntity.setOpeningTime(new Date()); holdOrderEntity.setHoldBond(CalculateUtil.calMemberHoldBond(holdOrderEntity)); contractHoldOrderDao.insert(holdOrderEntity); int i = contractOrderDao.insert(contractOrderEntity); src/main/java/com/xcong/excoin/modules/contract/service/impl/RabbitOrderServiceImpl.java
@@ -35,6 +35,7 @@ import com.xcong.excoin.modules.member.entity.MemberWalletContractEntity; import com.xcong.excoin.modules.platform.entity.PlatformTradeSettingEntity; import com.xcong.excoin.rabbit.pricequeue.OrderModel; import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; import com.xcong.excoin.utils.*; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -471,6 +472,10 @@ ThreadPoolUtils.calReturnMoney(memberEntity.getId(), fee, contractOrderEntity, AgentReturnEntity.ORDER_TYPE_CLOSE); } @Override public void wholeBombOrder(WholePriceDataModel wholePriceData) { } @Transactional(rollbackFor = Exception.class) @Override src/main/java/com/xcong/excoin/modules/platform/entity/PlatformTradeSettingEntity.java
@@ -65,4 +65,9 @@ * 手续费系数 */ private BigDecimal feeSpreadRatio; /** * 维持保证金率 */ private BigDecimal holdBondRatio; } src/main/java/com/xcong/excoin/quartz/job/NewestPriceUpdateJob.java
@@ -52,7 +52,7 @@ // TODO 测试环境关闭这个插入redis redisUtils.set(CoinTypeConvert.convertToKey(symbol), price); if ("ETC/USDT".equalsIgnoreCase(symbol)) { websocketPriceService.wholeBomb(symbol, price); websocketPriceService.wholeBomb(); } // 比较 websocketPriceService.comparePriceAsc(symbol, price); src/main/java/com/xcong/excoin/rabbit/consumer/OperateOrderPriceConsumer.java
@@ -5,6 +5,8 @@ import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.rabbit.pricequeue.OrderModel; import com.xcong.excoin.rabbit.pricequeue.OrderOperatePriceService; import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -16,6 +18,7 @@ * 后台打包开启 APP 不开启 * @author helius */ @Slf4j @Component @ConditionalOnProperty(prefix = "app", name = "newest-price-update-job", havingValue = "true") public class OperateOrderPriceConsumer { @@ -31,7 +34,7 @@ @RabbitListener(queues = RabbitMqConfig.QUEUE_PRICEOPERATE) public void onMessageMorePro(Message message, Channel channel) { String content = new String(message.getBody()); System.out.println("我收到了用户的订单操作消息:" + content); log.info("我收到了用户的订单操作消息:{}", content); // 操作前的map // 转为model OrderModel orderModel = JSONObject.parseObject(content, OrderModel.class); @@ -40,5 +43,15 @@ } @RabbitListener(queues = RabbitMqConfig.QUEUE_WHOLE_PRICE) public void onMessageWholePrice(Message message, Channel channel) { log.info("我收到了用户的全仓价格消息"); String content = new String(message.getBody()); WholePriceDataModel wholePriceData = JSONObject.parseObject(content, WholePriceDataModel.class); OrderOperatePriceService.wholePriceDataOperation(wholePriceData); } } src/main/java/com/xcong/excoin/rabbit/consumer/WebsocketPriceConsumer.java
@@ -6,6 +6,7 @@ import com.xcong.excoin.modules.contract.service.RabbitOrderService; import com.xcong.excoin.modules.contract.service.impl.OrderWebsocketServiceImpl; import com.xcong.excoin.rabbit.pricequeue.OrderModel; import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -169,8 +170,8 @@ @RabbitListener(queues = RabbitMqConfig.QUEUE_WHOLE_BOMB) public void onMessageWholeBomb(Message message, Channel channel) { String content = new String(message.getBody()); log.info("==message-price-consumer==我收到消息了全仓爆仓: {}", content); List<OrderModel> list = JSONArray.parseArray(content, OrderModel.class); orderService.wholeBombOrder(list); WholePriceDataModel wholePriceDataModel = JSONArray.parseObject(content, WholePriceDataModel.class); log.info("==message-price-consumer==我收到消息了全仓爆仓: {}", wholePriceDataModel.getMemberId()); orderService.wholeBombOrder(wholePriceDataModel); } } src/main/java/com/xcong/excoin/rabbit/pricequeue/OrderOperatePriceService.java
@@ -1,6 +1,8 @@ package com.xcong.excoin.rabbit.pricequeue; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.rabbit.pricequeue.whole.WholeDataQueue; import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; @@ -129,4 +131,15 @@ } } public static void wholePriceDataOperation(WholePriceDataModel wholePriceData) { Map<String, WholePriceDataModel> dataModelMap = WholeDataQueue.MAP; WholePriceDataModel isExistData = dataModelMap.get(wholePriceData.getMemberId().toString()); if (isExistData != null) { dataModelMap.remove(wholePriceData.getMemberId().toString()); } dataModelMap.put(wholePriceData.getMemberId().toString(), wholePriceData); } } src/main/java/com/xcong/excoin/rabbit/pricequeue/WebsocketPriceService.java
@@ -12,6 +12,8 @@ import com.xcong.excoin.modules.member.dao.MemberWalletContractDao; import com.xcong.excoin.modules.member.entity.MemberEntity; import com.xcong.excoin.modules.member.entity.MemberWalletContractEntity; import com.xcong.excoin.rabbit.pricequeue.whole.WholeDataQueue; import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; import com.xcong.excoin.rabbit.producer.OrderProducer; import com.xcong.excoin.utils.*; import lombok.extern.slf4j.Slf4j; @@ -279,6 +281,47 @@ } } public void wholeBomb() { Map<String, WholePriceDataModel> dataModelMap = WholeDataQueue.MAP; if (CollUtil.isEmpty(dataModelMap)) { return; } for (Map.Entry<String, WholePriceDataModel> entry : dataModelMap.entrySet()) { WholePriceDataModel wholePriceData = entry.getValue(); List<ContractHoldOrderEntity> list = wholePriceData.getList(); if (CollUtil.isNotEmpty(list)) { BigDecimal totalProfitOrLoss = BigDecimal.ZERO; Map<String, BigDecimal> prices = new HashMap<>(); for (ContractHoldOrderEntity holdOrderEntity : list) { BigDecimal newPrice = (BigDecimal) redisUtils.get(CoinTypeConvert.convertToKey(holdOrderEntity.getSymbol())); BigDecimal rewardRatio = null; if (ContractHoldOrderEntity.OPENING_TYPE_MORE == holdOrderEntity.getOpeningType()) { // (最新价-开仓价)*规格*张数 rewardRatio = newPrice.subtract(holdOrderEntity.getOpeningPrice()).multiply(holdOrderEntity.getSymbolSku()).multiply(new BigDecimal(holdOrderEntity.getSymbolCntSale())); // 开空 } else { // (开仓价-最新价)*规格*张数 rewardRatio = holdOrderEntity.getOpeningPrice().subtract(newPrice).multiply(holdOrderEntity.getSymbolSku()).multiply(new BigDecimal(holdOrderEntity.getSymbolCntSale())); } totalProfitOrLoss = totalProfitOrLoss.add(rewardRatio); prices.put(holdOrderEntity.getSymbol(), newPrice); } BigDecimal holdBond = wholePriceData.getHoldBond(); BigDecimal balance = wholePriceData.getBalance(); if (balance.add(totalProfitOrLoss).compareTo(holdBond) > 0) { continue; } wholePriceData.setPrices(prices); orderProducer.sendWholeBomb(JSONObject.toJSONString(wholePriceData)); } } } public void wholeBomb(String symbol, String price) { List<Long> memberIds = contractHoldOrderDao.selectMemberHasWholeOrder(); src/main/java/com/xcong/excoin/rabbit/pricequeue/whole/WholeDataQueue.java
New file @@ -0,0 +1,13 @@ package com.xcong.excoin.rabbit.pricequeue.whole; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @author wzy * @date 2021-01-27 **/ public class WholeDataQueue { public static Map<String, WholePriceDataModel> MAP = new ConcurrentHashMap<String, WholePriceDataModel>(); } src/main/java/com/xcong/excoin/rabbit/pricequeue/whole/WholePriceDataModel.java
New file @@ -0,0 +1,44 @@ package com.xcong.excoin.rabbit.pricequeue.whole; import com.xcong.excoin.modules.contract.entity.ContractHoldOrderEntity; import lombok.Data; import java.io.Serializable; import java.math.BigDecimal; import java.util.List; import java.util.Map; /** * @author wzy * @date 2021-01-27 **/ @Data public class WholePriceDataModel implements Serializable { private static final long serialVersionUID = -2933657807376740142L; /** * 会员ID */ private Long memberId; /** * 当前持仓列表 */ private List<ContractHoldOrderEntity> list; /** * 维持保证金 */ private BigDecimal holdBond; /** * 权益 */ private BigDecimal balance; /** * 爆仓时,各币种价格 */ private Map<String, BigDecimal> prices; } src/main/java/com/xcong/excoin/rabbit/producer/OrderProducer.java
@@ -161,4 +161,12 @@ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_A, RabbitMqConfig.ROUTINGKEY_WHOLE_BOMB, content, correlationData); } public void sendWholePrice(String content) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("发送全仓价格操作 == pid : {}", correlationData.getId()); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_A, RabbitMqConfig.ROUTINGKEY_WHOLE_PRICE, content, correlationData); } } src/main/java/com/xcong/excoin/utils/CalculateUtil.java
@@ -4,6 +4,7 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.common.enumerates.CoinTypeEnum; import com.xcong.excoin.common.enumerates.RabbitPriceTypeEnum; import com.xcong.excoin.common.exception.GlobalException; @@ -14,6 +15,7 @@ import com.xcong.excoin.modules.member.entity.MemberEntity; import com.xcong.excoin.modules.member.entity.MemberSettingEntity; import com.xcong.excoin.modules.member.entity.MemberWalletContractEntity; import com.xcong.excoin.modules.platform.dao.TradeSettingDao; import com.xcong.excoin.modules.platform.entity.PlatformTradeSettingEntity; import com.xcong.excoin.rabbit.pricequeue.OrderModel; import com.xcong.excoin.rabbit.producer.OrderProducer; @@ -269,4 +271,24 @@ return profitOrLess; } /** * 全仓模式下,维持保证金 * 维持保证金 = 持仓价值*维持保证金率= 面值*张数*开仓价格*维持保证金率 * @param contractHoldOrder * @return */ public static BigDecimal calMemberHoldBond(ContractHoldOrderEntity contractHoldOrder) { TradeSettingDao tradeSettingDao = SpringContextHolder.getBean(TradeSettingDao.class); RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class); PlatformTradeSettingEntity tradeSetting = tradeSettingDao.findTradeSetting(); BigDecimal holdBondRatio = (BigDecimal) redisUtils.get(AppContants.HOLD_BOND_RATIO); if (holdBondRatio == null) { holdBondRatio = tradeSetting.getHoldBondRatio(); redisUtils.set(AppContants.HOLD_BOND_RATIO, tradeSetting.getHoldBondRatio()); } return contractHoldOrder.getOpeningPrice().multiply(new BigDecimal(contractHoldOrder.getSymbolCntSale())).multiply(holdBondRatio).multiply(contractHoldOrder.getSymbolSku()); } } src/main/java/com/xcong/excoin/utils/ThreadPoolUtils.java
@@ -1,14 +1,25 @@ package com.xcong.excoin.utils; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.enumerates.CoinTypeEnum; import com.xcong.excoin.modules.contract.dao.ContractHoldOrderDao; import com.xcong.excoin.modules.contract.dao.ContractOrderDao; import com.xcong.excoin.modules.contract.entity.ContractHoldOrderEntity; import com.xcong.excoin.modules.contract.entity.ContractOrderEntity; import com.xcong.excoin.modules.contract.service.impl.OrderWebsocketServiceImpl; import com.xcong.excoin.modules.documentary.service.FollowOrderOperationService; import com.xcong.excoin.modules.member.dao.MemberWalletContractDao; import com.xcong.excoin.modules.member.entity.MemberEntity; import com.xcong.excoin.modules.member.entity.MemberWalletContractEntity; import com.xcong.excoin.rabbit.pricequeue.whole.WholePriceDataModel; import com.xcong.excoin.rabbit.producer.OrderProducer; import com.xcong.excoin.utils.dingtalk.DingTalkUtils; import lombok.extern.slf4j.Slf4j; import javax.validation.constraints.NotNull; import java.math.BigDecimal; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -85,4 +96,43 @@ } }); } /** * 发送全仓价格操作 * * @param memberId */ public static void sendWholePrice(@NotNull Long memberId) { EXECUTOR.execute(new Runnable() { @Override public void run() { log.info("全仓操作价格"); ContractHoldOrderDao contractHoldOrderDao = SpringContextHolder.getBean(ContractHoldOrderDao.class); MemberWalletContractDao memberWalletContractDao = SpringContextHolder.getBean(MemberWalletContractDao.class); List<ContractHoldOrderEntity> holdOrders = contractHoldOrderDao.selectHoldOrderListByMemberId(memberId); if (CollUtil.isEmpty(holdOrders)) { return; } MemberWalletContractEntity wallet = memberWalletContractDao.findWalletContractByMemberIdAndSymbol(memberId, CoinTypeEnum.USDT.name()); WholePriceDataModel wholePriceData = new WholePriceDataModel(); wholePriceData.setList(holdOrders); BigDecimal totalHoldBond = BigDecimal.ZERO; for (ContractHoldOrderEntity holdOrder : holdOrders) { totalHoldBond = totalHoldBond.add(holdOrder.getHoldBond() == null ? BigDecimal.ZERO : holdOrder.getHoldBond()); } wholePriceData.setHoldBond(totalHoldBond); wholePriceData.setBalance(wallet.getTotalBalance()); wholePriceData.setMemberId(memberId); OrderProducer orderProducer = SpringContextHolder.getBean(OrderProducer.class); orderProducer.sendWholePrice(JSONObject.toJSONString(wholePriceData)); } }); } }