From c24fc100ef9966495dc706e110fc37f13e003448 Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Thu, 08 Oct 2020 21:34:28 +0800 Subject: [PATCH] 优化usdt同步 --- src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java | 14 +- src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 25 ++++ src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java | 33 +++++ src/main/resources/mapper/member/MemberCoinAddressDao.xml | 7 + src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java | 1 src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java | 3 src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java | 73 +++++++++-- src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java | 3 src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java | 1 src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java | 10 src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java | 25 ++++ src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java | 42 +++++++ src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java | 51 ++++++- src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java | 31 +++++ src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 20 --- 15 files changed, 281 insertions(+), 58 deletions(-) diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index 3549a5f..501d615 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -29,6 +29,14 @@ public static final String EXCHANGE_A = "biue-exchange-A"; + + public static final String EXCHANGE_USDT_UPDATE = "exchange_usdt_update"; + + public static final String QUEUE_USDT_UPDATE = "queue_usdt_update"; + + public static final String ROUTING_KEY_USDT_UPDATE = "routing_key_usdt_update"; + + /** * 撮合交易 */ @@ -131,6 +139,23 @@ } + @Bean + public DirectExchange usdtUpdateExchange() { + return new DirectExchange(EXCHANGE_USDT_UPDATE); + } + + + @Bean + public Queue usdtUpdateQueue() { + return new Queue(QUEUE_USDT_UPDATE, true); + } + + @Bean + public Binding usdtUpdatebinding() { + return BindingBuilder.bind(usdtUpdateQueue()).to(usdtUpdateExchange()).with(ROUTING_KEY_USDT_UPDATE); + } + + /** * 交换器A 可以继续添加交换器B C * diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java b/src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java new file mode 100644 index 0000000..8d07d27 --- /dev/null +++ b/src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java @@ -0,0 +1,25 @@ +package com.xcong.excoin.modules.blackchain.model; + +import lombok.Data; + +import java.math.BigDecimal; + +/** + * 充值扫块dto + */ +@Data +public class EthUsdtChargeDto { + + private String address; + private String hash; + private BigDecimal balance; + + public EthUsdtChargeDto() { + } + + public EthUsdtChargeDto(String address, String hash, BigDecimal balance) { + this.address = address; + this.hash = hash; + this.balance = balance; + } +} diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java index cd653c2..33c2c20 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java @@ -178,6 +178,9 @@ coinAddress.setLabel(uuid); memberMapper.insert(coinAddress); } + if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(address)){ + UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address); + } } break; case "ROC": diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java index a67b220..6daf22c 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java @@ -1,6 +1,12 @@ package com.xcong.excoin.modules.blackchain.service; +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.enumerates.CoinTypeEnum; +import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; +import com.xcong.excoin.modules.member.dao.MemberCoinAddressDao; +import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer; import com.xcong.excoin.utils.RedisUtils; +import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Service; import org.web3j.crypto.Credentials; import org.web3j.protocol.Web3j; @@ -18,11 +24,18 @@ @Service public class UsdtErc20UpdateService { + + @Resource + private UsdtUpdateProducer usdtUpdateProducer; + + @Resource + private MemberCoinAddressDao coinWalletDao; + public final static List<String> ALL_ADDRESS_LIST = new ArrayList<>(); - public final static String USDT_BLOCK_NUM = "USDT_BLOCK_NUM"; + public final static String USDT_BLOCK_NUM_GOLDEN = "USDT_BLOCK_NUM_GOLDEN"; - private final static BigInteger DIVIDE_USDT = new BigInteger("1000000"); + private final static BigDecimal DIVIDE_USDT = new BigDecimal("1000000"); private static Web3j web3; @@ -58,22 +71,39 @@ @Resource private RedisUtils redisUtils; + + public void updateUsdt(){ + // 首先查询所有的钱包地址 + List<String> tdCoinWallets = coinWalletDao.selectAllSymbolAddress(CoinTypeEnum.USDT.toString(),"ERC20"); + if(tdCoinWallets!=null){ + ALL_ADDRESS_LIST.addAll(tdCoinWallets); + } // 获取最新区块 - String string = redisUtils.getString(USDT_BLOCK_NUM); + String string = redisUtils.getString(USDT_BLOCK_NUM_GOLDEN); + if(string==null){ + string = "11014249"; + } BigInteger blockNum = new BigInteger(string); Credentials credentials = Credentials.create(privateKey); - EthUsdtContract contract = EthUsdtContract.load(contractAddr, web3, credentials, getStaticGasProvider()); - EthFilter filter = getFilter(new BigInteger("10943021")); + EthUsdtContract contract = EthUsdtContract.load(contractAddr, getInstance(), credentials, getStaticGasProvider()); + EthFilter filter = getFilter(blockNum); contract.transferEventFlowable(filter).subscribe(e->{ - if(e!=null){ + if(e!=null && StringUtils.isNotBlank(e.to)){ String transactionHash = e.log.getTransactionHash(); + BigInteger blockNumber1 = e.log.getBlockNumber(); String toAddress = e.to; BigInteger tokenBalance = e.tokens; - // 金额 - BigInteger divide = tokenBalance.divide(DIVIDE_USDT); - // 发送消息队列 TODO + if(ALL_ADDRESS_LIST.contains(toAddress)){ + System.out.println("存在本地的地址:"+toAddress); + // 金额 + BigDecimal divide = new BigDecimal(tokenBalance.toString()).divide(DIVIDE_USDT); + // 发送消息队列 + EthUsdtChargeDto dto = new EthUsdtChargeDto(toAddress,transactionHash,divide); + usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto)); + } + redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString()); } }); @@ -88,7 +118,6 @@ return new EthFilter(DefaultBlockParameterName.EARLIEST, DefaultBlockParameterName.LATEST, contractAddr); } - - } + } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java index 1567cce..5113425 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java @@ -1,5 +1,6 @@ package com.xcong.excoin.modules.coin.service; +import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; import com.xcong.excoin.modules.blackchain.model.RocTransferDetail; public interface BlockCoinService { @@ -20,4 +21,6 @@ public void updateRoc(RocTransferDetail transferDetail); + void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto); + } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java index 5d1e101..73d4a1e 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java @@ -101,7 +101,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), newBalance, balance, 0); - String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance,null); + String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance, null); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.USDT.name(), 1, 1); @@ -151,7 +151,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0); - String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.ETH.name(), null, balance,null); + String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.ETH.name(), null, balance, null); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.ETH.name(), 1, 1); @@ -201,7 +201,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0); - String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "OMNI", balance,null); + String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "OMNI", balance, null); ThreadPoolUtils.sendDingTalk(5); MemberEntity member = memberDao.selectById(memberId); @@ -249,7 +249,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0); - String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.BTC.name(), null, balance,null); + String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.BTC.name(), null, balance, null); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.BTC.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -311,7 +311,7 @@ if (memberCoinAddressEntity != null) { memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0); // 添加冲币记录 - String orderNo = insertCoinCharge(EosService.ACCOUNT, memberId, amount, CoinTypeEnum.EOS.name(), memo, BigDecimal.ZERO,null); + String orderNo = insertCoinCharge(EosService.ACCOUNT, memberId, amount, CoinTypeEnum.EOS.name(), memo, BigDecimal.ZERO, null); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.EOS.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -388,7 +388,7 @@ if (memberCoinAddressEntity != null) { memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0); // 添加冲币记录 - String orderNo = insertCoinCharge(XrpService.ACCOUNT, memberId, amount, CoinTypeEnum.XRP.name(), memo, BigDecimal.ZERO,null); + String orderNo = insertCoinCharge(XrpService.ACCOUNT, memberId, amount, CoinTypeEnum.XRP.name(), memo, BigDecimal.ZERO, null); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.XRP.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -462,7 +462,7 @@ memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0); // 添加冲币记录 - String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO,transactionId); + String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO, transactionId); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.USDT.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -488,23 +488,23 @@ String address = transferDetail.getAddress(); BigDecimal balance = transferDetail.getBalance(); String symbol = transferDetail.getSymbol(); - if(org.apache.commons.lang.StringUtils.isBlank(address) || org.apache.commons.lang.StringUtils.isBlank(symbol) || balance ==null ){ + if (org.apache.commons.lang.StringUtils.isBlank(address) || org.apache.commons.lang.StringUtils.isBlank(symbol) || balance == null) { return; } - if(balance.compareTo(new BigDecimal("0.0001"))<=0){ + if (balance.compareTo(new BigDecimal("0.0001")) <= 0) { return; } MemberCoinAddressEntity memberCoinAddress = memberCoinAddressDao.selectCoinAddressByAddressAndSymbol(address, symbol); - if(memberCoinAddress==null){ + if (memberCoinAddress == null) { return; } Long memberId = memberCoinAddress.getMemberId(); // 查询钱包 并更新 MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.ROC.name()); if (walletCoinEntity == null) { - // 创建一个钱包 + // 创建一个钱包 // 创建这个钱包 walletCoinEntity = new MemberWalletCoinEntity(); walletCoinEntity.setAvailableBalance(BigDecimal.ZERO); @@ -518,11 +518,11 @@ memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), balance, BigDecimal.ZERO, 0); - String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.ROC.name(), "", BigDecimal.ZERO,null); + String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.ROC.name(), "", BigDecimal.ZERO, null); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", balance, CoinTypeEnum.ROC.name(), 1, 1); - try{ + try { ThreadPoolUtils.sendDingTalk(5); MemberEntity member = memberDao.selectById(memberId); if (StrUtil.isNotBlank(member.getPhone())) { @@ -531,10 +531,53 @@ } else { SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); } - }catch (Exception e){ + } catch (Exception e) { //e.printStackTrace(); } + } + + @Override + public void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto) { + String address = ethUsdtChargeDto.getAddress(); + String hash = ethUsdtChargeDto.getHash(); + // hash没有用过 + Map<String,Object> param = new HashMap<>(); + param.put("hash",hash); + param.put("address",address); + List<MemberCoinChargeEntity> memberCoinChargeEntities = memberCoinChargeDao.selectByMap(param); + if(CollectionUtils.isNotEmpty(memberCoinChargeEntities)){ + return; + } + MemberCoinAddressEntity coinAddressEntity = memberCoinAddressDao.selectBlockAddressWithTag(null, CoinTypeEnum.USDT.toString(), "ERC20"); + if (coinAddressEntity == null) { + return; + } + Long memberId = coinAddressEntity.getMemberId(); + BigDecimal balance = ethUsdtChargeDto.getBalance(); + if (balance != null && balance.compareTo(new BigDecimal("0.1")) > 0) { + balance = balance.setScale(8, RoundingMode.CEILING); + BigDecimal early = BigDecimal.ZERO; + + MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.USDT.name()); + if (walletCoinEntity == null) { + return; + } + BigDecimal newBalance = balance.subtract(early); + memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), newBalance, balance, 0); + String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance, ethUsdtChargeDto.getHash()); + // 插入财务记录 + LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.USDT.name(), 1, 1); + ThreadPoolUtils.sendDingTalk(5); + MemberEntity member = memberDao.selectById(memberId); + if (StrUtil.isNotBlank(member.getPhone())) { + String amount = newBalance.toPlainString() + "USDT-ERC20"; + Sms106Send.sendRechargeMsg(member.getPhone(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); + } else { + SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); + } + + } } private String generateNo() { @@ -545,7 +588,7 @@ return String.valueOf(timestamp).substring(2) + random; } - public String insertCoinCharge(String address, Long memberId, BigDecimal newBalance, String symbol, String tag, BigDecimal lastAmount,String hash) { + public String insertCoinCharge(String address, Long memberId, BigDecimal newBalance, String symbol, String tag, BigDecimal lastAmount, String hash) { MemberCoinChargeEntity memberCoinChargeEntity = new MemberCoinChargeEntity(); memberCoinChargeEntity.setAddress(address); memberCoinChargeEntity.setMemberId(memberId); diff --git a/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java b/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java index a53bba9..baf9016 100644 --- a/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java +++ b/src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java @@ -23,5 +23,6 @@ List<MemberCoinAddressEntity> selectAllBlockAddressBySymbol(@Param("symbol") String symbol); + List<String> selectAllSymbolAddress(@Param("symbol")String symbol,@Param("tag") String tag); } diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java index 3953174..872cae4 100644 --- a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java +++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java @@ -362,6 +362,7 @@ // 存储昨日K线 if("day".equals(rangeUnit)){ + System.out.println("存储日K线"); redisUtils.set("ROC/USDT",kLine); } } diff --git a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java index 65e8c1e..f91418b 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java @@ -24,10 +24,10 @@ /** * ETH_USDT 同步 */ - @Scheduled(cron = "0 0/10 * * * ? ") - public void ethUsdtUpdate() { - blockCoinService.updateEthUsdt(); - } + //@Scheduled(cron = "0 0/10 * * * ? ") +// public void ethUsdtUpdate() { +// blockCoinService.updateEthUsdt(); +// } /** * eth 同步 @@ -40,7 +40,7 @@ /** * BTC_USDT 同步 */ - @Scheduled(cron = "0 2/10 * * * ? ") + //@Scheduled(cron = "0 2/10 * * * ? ") public void btcUsdtUpdate() { blockCoinService.updateBtcUsdt(); } @@ -63,8 +63,8 @@ /** * ETH_USDT 同步 */ - @Scheduled(cron = "0 0/5 * * * ? ") + //Scheduled(cron = "0 0/5 * * * ? ") public void rocUpdate() { - blockCoinService.updateEthUsdt(); + // blockCoinService.updateEthUsdt(); } } diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java index 8fb4f5c..6099153 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java @@ -23,26 +23,6 @@ @Resource private CoinProcessorFactory processorFactory; - @Resource - private OrderCoinService orderCoinService; - - - - //@Scheduled(cron = "0/40 * * * * *") - public void test(){ - Random random = new Random(); - Integer type = OrderCoinsDealEntity.ORDERTYPE_BUY; - Integer tradeType = OrderCoinsDealEntity.TRADETYPE_FIXEDPRICE; - double random1 = Math.random(); - BigDecimal price = new BigDecimal(random1).setScale(4, RoundingMode.HALF_UP).multiply(new BigDecimal("2")); - if(price.compareTo(BigDecimal.ZERO)==0){ - price = BigDecimal.ONE; - } - System.out.println(price); - orderCoinService.initOrders("ROC",type,tradeType,price,new BigDecimal(2),null); - orderCoinService.initOrders("ROC",OrderCoinsDealEntity.ORDERTYPE_SELL,tradeType,price,new BigDecimal(2),null); - } - /** * 每分钟定时器,处理分钟K线 diff --git a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java index 262123e..25b7fdc 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java @@ -38,11 +38,11 @@ } } - @Scheduled(cron = "0 2/8 * * * ? ") - public void usdtEthPoolCheck() { - log.info("USDTETH归集结果扫描开始"); - usdtEthService.usdtEthPoolCheck(); - } +// @Scheduled(cron = "0 2/8 * * * ? ") +// public void usdtEthPoolCheck() { +// log.info("USDTETH归集结果扫描开始"); +// usdtEthService.usdtEthPoolCheck(); +// } @Scheduled(cron = "0 2/30 * * * ? ") public void poolEth() { diff --git a/src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java b/src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java new file mode 100644 index 0000000..aacf3ff --- /dev/null +++ b/src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java @@ -0,0 +1,31 @@ +package com.xcong.excoin.quartz.job; + +import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import javax.annotation.Resource; + +/** + * 开启撮合交易 + * + * @author wzy + * @date 2020-05-28 + **/ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true") +public class UsdtErc20InitJob { + + + @Resource + private UsdtErc20UpdateService usdtErc20UpdateService; + + @PostConstruct + public void initCoinTrade() { + System.out.println("开启USDT同步"); + usdtErc20UpdateService.updateUsdt(); + } +} diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java new file mode 100644 index 0000000..2e375be --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java @@ -0,0 +1,33 @@ +package com.xcong.excoin.rabbit.consumer; + +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; +import com.xcong.excoin.modules.coin.service.BlockCoinService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * @author wzy + * @date 2020-05-25 + **/ +@Slf4j +@Component +public class UsdtUpdateConsumer { + + + @Resource + private BlockCoinService blockCoinService; + + + @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE) + public void doSomething(String content) { + log.info("#---->{}#", content); + EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class); + // 更新这个用户的余额 + blockCoinService.updateEthUsdtNew(ethUsdtChargeDto); + } +} diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java new file mode 100644 index 0000000..8f12cda --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java @@ -0,0 +1,42 @@ +package com.xcong.excoin.rabbit.producer; + +import cn.hutool.core.util.IdUtil; +import com.xcong.excoin.configurations.RabbitMqConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author wzy + * @date 2020-05-25 + **/ +@Slf4j +@Component +public class UsdtUpdateProducer implements RabbitTemplate.ConfirmCallback { + + private RabbitTemplate rabbitTemplate; + + @Autowired + public UsdtUpdateProducer(RabbitTemplate rabbitTemplate) { + this.rabbitTemplate = rabbitTemplate; + rabbitTemplate.setConfirmCallback(this); + } + + public void sendMsg(String content) { + CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_UPDATE, RabbitMqConfig.ROUTING_KEY_USDT_UPDATE, content, correlationData); + } + + + @Override + public void confirm(CorrelationData correlationData, boolean ack, String cause) { + log.info("#----->{}#", correlationData); + if (ack) { + log.info("success"); + } else { + log.info("--->{}", cause); + } + } +} diff --git a/src/main/resources/mapper/member/MemberCoinAddressDao.xml b/src/main/resources/mapper/member/MemberCoinAddressDao.xml index edb4a57..0dc07a9 100644 --- a/src/main/resources/mapper/member/MemberCoinAddressDao.xml +++ b/src/main/resources/mapper/member/MemberCoinAddressDao.xml @@ -81,4 +81,11 @@ select * from member_coin_address where symbol=#{symbol} </select> + + <select id="selectAllSymbolAddress" resultType="string" parameterType="map"> + select address from member_coin_address where symbol =#{symbol} + <if test="tag!=null and tag !=''"> + and tag = #{tag} + </if> + </select> </mapper> \ No newline at end of file -- Gitblit v1.9.1