From dcac20ece30dddba0fd21ca0d7a965de5914189d Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Mon, 12 Oct 2020 15:46:48 +0800 Subject: [PATCH] 交易所K线优化 --- src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 34 ++++++++ src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java | 6 + src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 22 +++++ src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java | 9 ++ src/main/java/com/xcong/excoin/processor/MarketService.java | 23 +++++ src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 16 ++- src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java | 3 src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java | 9 + src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java | 3 src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java | 2 src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml | 15 +++ src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java | 5 + src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java | 13 ++ src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java | 20 ++-- src/main/resources/application-prodapp.yml | 2 15 files changed, 155 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index 9a4d13c..b9d1449 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -36,6 +36,12 @@ public static final String ROUTING_KEY_USDT_UPDATE = "routing_key_usdt_update"; + public static final String EXCHANGE_USDT_ADDRESS = "exchange_usdt_address"; + + public static final String QUEUE_USDT_ADDRESS= "queue_usdt_address"; + + public static final String ROUTING_KEY_USDT_ADDRESS = "routing_key_usdt_address"; + /** * 撮合交易 @@ -201,6 +207,22 @@ return BindingBuilder.bind(usdtUpdateQueue()).to(usdtUpdateExchange()).with(ROUTING_KEY_USDT_UPDATE); } + @Bean + public DirectExchange usdtAddressExchange() { + return new DirectExchange(EXCHANGE_USDT_ADDRESS); + } + + + @Bean + public Queue usdtAddressQueue() { + return new Queue(QUEUE_USDT_ADDRESS, true); + } + + @Bean + public Binding usdtAddressbinding() { + return BindingBuilder.bind(usdtAddressQueue()).to(usdtAddressExchange()).with(ROUTING_KEY_USDT_ADDRESS); + } + /** * 交换器A 可以继续添加交换器B C diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java index c52b39b..c2c9379 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java @@ -10,6 +10,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import org.web3j.abi.FunctionEncoder; @@ -237,8 +238,9 @@ // log.debug("transfer hexValue:" + hexValue); - EthSendTransaction ethSendTransaction = web3j.ethSendRawTransaction(hexValue).sendAsync().get(); - + CompletableFuture<EthSendTransaction> ethSendTransactionCompletableFuture = web3j.ethSendRawTransaction(hexValue).sendAsync(); + EthSendTransaction ethSendTransaction = ethSendTransactionCompletableFuture.get(); + //return "hash"; if (ethSendTransaction.hasError()) { // log.info("transfer error:", ethSendTransaction.getError().getMessage()); diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java index 33c2c20..e21dc6b 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java @@ -6,6 +6,7 @@ import javax.annotation.Resource; import com.xcong.excoin.modules.blackchain.service.*; +import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -29,6 +30,9 @@ MemberDao memberDao; @Resource MemberCoinAddressDao memberMapper; + + @Resource + private UsdtUpdateProducer usdtUpdateProducer; @Override public Result findBlockAddress(String symbol) { @@ -178,9 +182,8 @@ coinAddress.setLabel(uuid); memberMapper.insert(coinAddress); } - if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(address)){ - UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address); - } + // 发送新增的地址到监听集合 + usdtUpdateProducer.sendAddressMsg(address); } break; case "ROC": diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java index 6daf22c..2ad5e2c 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java @@ -20,7 +20,9 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; @Service public class UsdtErc20UpdateService { @@ -88,8 +90,10 @@ Credentials credentials = Credentials.create(privateKey); EthUsdtContract contract = EthUsdtContract.load(contractAddr, getInstance(), credentials, getStaticGasProvider()); EthFilter filter = getFilter(blockNum); + Map<String,BigInteger> map = new HashMap<String,BigInteger>(); + map.put("blockNum",blockNum); contract.transferEventFlowable(filter).subscribe(e->{ - if(e!=null && StringUtils.isNotBlank(e.to)){ + if(e!=null && StringUtils.isNotBlank(e.to) && e.log.getBlockNumber()!=null){ String transactionHash = e.log.getTransactionHash(); BigInteger blockNumber1 = e.log.getBlockNumber(); String toAddress = e.to; @@ -102,13 +106,16 @@ EthUsdtChargeDto dto = new EthUsdtChargeDto(toAddress,transactionHash,divide); usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto)); } - - redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString()); + if(map.get("blockNum").compareTo(blockNumber1)!=0){ + redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString()); + map.put("blockNum",blockNumber1); + } } }); } + private static EthFilter getFilter(BigInteger startBlock) { if (startBlock != null) { EthFilter filter = new EthFilter(new DefaultBlockParameterNumber(startBlock), diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java index 4979a57..aa6d7fb 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java @@ -29,10 +29,10 @@ private static final BigDecimal LIMIT = new BigDecimal("50"); private static final BigDecimal LIMIT_ETH = new BigDecimal("0.2"); - private static final BigDecimal FEE = new BigDecimal("0.005"); + private static final BigDecimal FEE = new BigDecimal("0.0042"); private static final BigDecimal ETH_TR_FEE = new BigDecimal("0.0032"); - public static String ETH_FEE = "0.005"; + public static String ETH_FEE = "0.0042"; public static final String TOTAL_ADDRESS = "0x3d83A28B6C2d599d2B6D272c5DBcDC9c976d344F"; public static final String TOTAL_PRIVATE = "4a1ce332133d8917360c5f3b194f703a0cf5b86c4eea319b1cd01197e68dad27"; @@ -59,13 +59,13 @@ } BigDecimal usdt = ethService.tokenGetBalance(address); - log.info("地址:{}, 金额:{}", address, usdt); + //log.info("地址:{}, 金额:{}", address, usdt); if (usdt != null && usdt.compareTo(LIMIT) > 0) { usdt = usdt.subtract(new BigDecimal("0.01")); // 查询eth是否足够 BigDecimal eth = EthService.getEthBlance(address); - log.info("地址:{}, ETH:{}", address, eth); + //log.info("地址:{}, ETH:{}", address, eth); if (eth != null && eth.compareTo(FEE) >= 0) { MemberCoinAddressEntity memberCoinAddressEntity = memberCoinAddressDao.selectBlockAddressWithTag(memberId, CoinTypeEnum.USDT.name(), "ERC20"); if (memberCoinAddressEntity == null) { @@ -82,14 +82,14 @@ String hash = ethService.tokenSend(privateKey, address, TOTAL_ADDRESS, usdtStr); log.info("归集:{}", hash); - if (StrUtil.isNotBlank(hash)) { - // 归集成功更新状态 先保存本次的hash值,待交易成功后再更新 - coinCharge.setHash(hash); - memberCoinChargeDao.updateById(coinCharge); - } +// if (StrUtil.isNotBlank(hash)) { +// // 归集成功更新状态 先保存本次的hash值,待交易成功后再更新 +// coinCharge.setHash(hash); +// memberCoinChargeDao.updateById(coinCharge); +// } } else { String hash = ethService.ethSend(TOTAL_PRIVATE, TOTAL_ADDRESS, address, ETH_FEE); - log.info("转手续费:{}", hash); + //log.info("转手续费:{}", hash); } } } diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java index 989d4bc..0a9e313 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java +++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java @@ -1,5 +1,6 @@ package com.xcong.excoin.modules.coin.dao; +import java.math.BigDecimal; import java.util.Date; import java.util.List; @@ -15,6 +16,8 @@ public interface OrderCoinDealDao extends BaseMapper<OrderCoinsDealEntity>{ List<OrderCoinsDealEntity> selectAllWalletCoinOrder(@Param("memberId")Long memberId); + BigDecimal sumTodayBuyAmount(@Param("memberId")Long memberId, @Param("symbol") String symbol); + BigDecimal sumTodayEntrustCntBuyAmount(@Param("memberId")Long memberId, @Param("symbol") String symbol); List<OrderCoinsDealEntity> selectCoinOrderDealByOrderId(@Param("orderId")Long orderId); List<OrderCoinsDealEntity> selectAllWalletCoinOrderBySymbol(@Param("memberId")Long memberId,@Param("symbol")String symbol); diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java index 0988cb9..eb034ca 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java @@ -347,6 +347,38 @@ if(!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())){ return Result.fail(MessageSourceUtils.getString("member_controller_0001")); } + // 需要先 + String phone = memberEntity.getPhone(); + if(!"13632989240".equals(phone) && !"15158130575".equals(phone)){ + if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){ + // 不能超过800个 + + if(amount!=null && amount.compareTo(new BigDecimal("800"))>0){ + return Result.fail("买入额度受限"); + } + BigDecimal bigDecimal = orderCoinDealDao.sumTodayBuyAmount(memberId, symbol); + if(bigDecimal==null){ + bigDecimal= BigDecimal.ZERO; + } + amount= amount==null?BigDecimal.ZERO:amount; + bigDecimal = bigDecimal.add(amount); + if(bigDecimal!=null && bigDecimal.compareTo(new BigDecimal("800"))>0){ + return Result.fail("买入额度受限"); + } + // 挂单不能超过800 + BigDecimal bigDecimal1 = orderCoinDealDao.sumTodayEntrustCntBuyAmount(memberId, symbol); + if(bigDecimal1==null){ + bigDecimal1=BigDecimal.ZERO; + } + bigDecimal1 = bigDecimal1.add(amount); + if(bigDecimal1!=null && bigDecimal1.compareTo(new BigDecimal("800"))>0){ + return Result.fail("买入额度受限"); + } + }else{ + return Result.fail("卖出受限"); + } + } + BigDecimal nowPriceinBigDecimal = price; //查询当前价 //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT"))); @@ -520,7 +552,7 @@ if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) { orderSubmitProducer.sendCancelMsg(orderId); // return this.cancelEntrustWalletCoinOrderForMatch(orderId); - return Result.ok("order_service_0013"); + return Result.ok(MessageSourceUtils.getString("order_service_0013")); } if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) { return Result.fail(MessageSourceUtils.getString("order_service_0012")); diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java index 872cae4..5873683 100644 --- a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java +++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java @@ -363,6 +363,9 @@ // 存储昨日K线 if("day".equals(rangeUnit)){ System.out.println("存储日K线"); + kLine.setOpen(kLine.getClose()); + kLine.setLow(kLine.getClose()); + kLine.setHigh(kLine.getClose()); redisUtils.set("ROC/USDT",kLine); } } diff --git a/src/main/java/com/xcong/excoin/processor/MarketService.java b/src/main/java/com/xcong/excoin/processor/MarketService.java index d47b2c9..3f3f067 100644 --- a/src/main/java/com/xcong/excoin/processor/MarketService.java +++ b/src/main/java/com/xcong/excoin/processor/MarketService.java @@ -88,7 +88,17 @@ list = (List) data; } list.add(kLine); - redisUtils.set(key, list); + int size = list.size(); + if(size>500){ + list = list.subList(size-500,size); + } + List lines = new ArrayList(); + if(CollectionUtils.isNotEmpty(list)){ + for(Object object:list){ + lines.add(object); + } + } + redisUtils.set(key, lines); // mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod()); } @@ -122,4 +132,15 @@ return totalVolume; } + + public static void main(String[] args) { + List<String> list = new ArrayList<>(); + list.add("1"); + list.add("2"); + list.add("3"); + list.add("4"); + list.add("5"); + list=list.subList(2,5); + System.out.println(list); + } } diff --git a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java index 25b7fdc..f0d22e5 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java @@ -27,7 +27,7 @@ /** * usdt 归集 */ - @Scheduled(cron = "0 5/30 * * * ? ") + @Scheduled(cron = "0 15/30 * * * ? ") public void poolUsdtEth() { try { log.info("USDT归集开始"); diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java index c9560ed..7288fc6 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java @@ -3,6 +3,7 @@ import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; +import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService; import com.xcong.excoin.modules.coin.service.BlockCoinService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -32,4 +33,12 @@ // 更新这个用户的余额 blockCoinService.updateEthUsdtNew(ethUsdtChargeDto); } + + @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS) + public void addUsdtAddress(String content) { + if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){ + log.info("#添加新地址---->{}#", content); + UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(content); + } + } } diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java index 8f12cda..c89f4c9 100644 --- a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java +++ b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java @@ -29,6 +29,11 @@ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_UPDATE, RabbitMqConfig.ROUTING_KEY_USDT_UPDATE, content, correlationData); } + public void sendAddressMsg(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_ADDRESS, RabbitMqConfig.ROUTING_KEY_USDT_ADDRESS, content, correlationData); + } + @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java index 8e48637..3889dff 100644 --- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java +++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java @@ -45,7 +45,7 @@ @OnOpen public void onOpen(Session session) { onlineCount.incrementAndGet(); // 在线数加1 - log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); + // log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** @@ -67,7 +67,7 @@ map.remove(session.getId()); } } - log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); + //log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** @@ -227,8 +227,14 @@ result.setRep(sub); if (o != null) { List<Candlestick> list = (List<Candlestick>) o; + + if(list!=null && list.size()>300){ + int size = list.size(); + list = list.subList(size-300,size); + } + CandlestickModel model = null; for (Candlestick candlestick : list) { - CandlestickModel model = new CandlestickModel(); + model = new CandlestickModel(); model.setAmount(candlestick.getAmount()); model.setClose(candlestick.getClose()); model.setCount(candlestick.getCount()); @@ -248,8 +254,8 @@ @OnError public void onError(Session session, Throwable error) { - log.error("发生错误"); - error.printStackTrace(); + // log.error("发生错误"); + //error.printStackTrace(); } /** diff --git a/src/main/resources/application-prodapp.yml b/src/main/resources/application-prodapp.yml index edf1a56..0c57f69 100644 --- a/src/main/resources/application-prodapp.yml +++ b/src/main/resources/application-prodapp.yml @@ -95,7 +95,7 @@ redis_expire: 3000 kline-update-job: false newest-price-update-job: false - exchange-trade: true + exchange-trade: false day-line: false other-job: false loop-job: false diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml index fc2d73d..339e355 100644 --- a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml +++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml @@ -12,6 +12,21 @@ order by create_time desc </select> + <select id="sumTodayBuyAmount" resultType="java.math.BigDecimal"> + select sum(symbol_cnt) from coins_order_deal + where member_id = #{memberId} and symbol=#{symbol} + and order_type =1 + and order_status=3 + and DATE_FORMAT(create_time, '%Y-%m-%d') = DATE_FORMAT(now(),'%Y-%m-%d') + </select> + <select id="sumTodayEntrustCntBuyAmount" resultType="java.math.BigDecimal"> + select sum(entrust_cnt) from coins_order + where member_id = #{memberId} and symbol=#{symbol} + and order_type =1 + and order_status=1 + and DATE_FORMAT(create_time, '%Y-%m-%d') = DATE_FORMAT(now(),'%Y-%m-%d') + </select> + <select id="selectCoinOrderDealByOrderId" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity"> select * from coins_order_deal where order_id = #{orderId} order by create_time desc -- Gitblit v1.9.1