src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -29,6 +29,14 @@ public static final String EXCHANGE_A = "biue-exchange-A"; public static final String EXCHANGE_USDT_UPDATE = "exchange_usdt_update"; public static final String QUEUE_USDT_UPDATE = "queue_usdt_update"; public static final String ROUTING_KEY_USDT_UPDATE = "routing_key_usdt_update"; /** * 撮合交易 */ @@ -131,6 +139,23 @@ } @Bean public DirectExchange usdtUpdateExchange() { return new DirectExchange(EXCHANGE_USDT_UPDATE); } @Bean public Queue usdtUpdateQueue() { return new Queue(QUEUE_USDT_UPDATE, true); } @Bean public Binding usdtUpdatebinding() { return BindingBuilder.bind(usdtUpdateQueue()).to(usdtUpdateExchange()).with(ROUTING_KEY_USDT_UPDATE); } /** * 交换器A 可以继续添加交换器B C * src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java
New file @@ -0,0 +1,25 @@ package com.xcong.excoin.modules.blackchain.model; import lombok.Data; import java.math.BigDecimal; /** * 充值扫块dto */ @Data public class EthUsdtChargeDto { private String address; private String hash; private BigDecimal balance; public EthUsdtChargeDto() { } public EthUsdtChargeDto(String address, String hash, BigDecimal balance) { this.address = address; this.hash = hash; this.balance = balance; } } src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
@@ -178,6 +178,9 @@ coinAddress.setLabel(uuid); memberMapper.insert(coinAddress); } if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(address)){ UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address); } } break; case "ROC": src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -1,6 +1,12 @@ package com.xcong.excoin.modules.blackchain.service; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.enumerates.CoinTypeEnum; import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; import com.xcong.excoin.modules.member.dao.MemberCoinAddressDao; import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer; import com.xcong.excoin.utils.RedisUtils; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Service; import org.web3j.crypto.Credentials; import org.web3j.protocol.Web3j; @@ -18,11 +24,18 @@ @Service public class UsdtErc20UpdateService { @Resource private UsdtUpdateProducer usdtUpdateProducer; @Resource private MemberCoinAddressDao coinWalletDao; public final static List<String> ALL_ADDRESS_LIST = new ArrayList<>(); public final static String USDT_BLOCK_NUM = "USDT_BLOCK_NUM"; public final static String USDT_BLOCK_NUM_GOLDEN = "USDT_BLOCK_NUM_GOLDEN"; private final static BigInteger DIVIDE_USDT = new BigInteger("1000000"); private final static BigDecimal DIVIDE_USDT = new BigDecimal("1000000"); private static Web3j web3; @@ -58,22 +71,39 @@ @Resource private RedisUtils redisUtils; public void updateUsdt(){ // 首先查询所有的钱包地址 List<String> tdCoinWallets = coinWalletDao.selectAllSymbolAddress(CoinTypeEnum.USDT.toString(),"ERC20"); if(tdCoinWallets!=null){ ALL_ADDRESS_LIST.addAll(tdCoinWallets); } // 获取最新区块 String string = redisUtils.getString(USDT_BLOCK_NUM); String string = redisUtils.getString(USDT_BLOCK_NUM_GOLDEN); if(string==null){ string = "11014249"; } BigInteger blockNum = new BigInteger(string); Credentials credentials = Credentials.create(privateKey); EthUsdtContract contract = EthUsdtContract.load(contractAddr, web3, credentials, getStaticGasProvider()); EthFilter filter = getFilter(new BigInteger("10943021")); EthUsdtContract contract = EthUsdtContract.load(contractAddr, getInstance(), credentials, getStaticGasProvider()); EthFilter filter = getFilter(blockNum); contract.transferEventFlowable(filter).subscribe(e->{ if(e!=null){ if(e!=null && StringUtils.isNotBlank(e.to)){ String transactionHash = e.log.getTransactionHash(); BigInteger blockNumber1 = e.log.getBlockNumber(); String toAddress = e.to; BigInteger tokenBalance = e.tokens; // 金额 BigInteger divide = tokenBalance.divide(DIVIDE_USDT); // 发送消息队列 TODO if(ALL_ADDRESS_LIST.contains(toAddress)){ System.out.println("存在本地的地址:"+toAddress); // 金额 BigDecimal divide = new BigDecimal(tokenBalance.toString()).divide(DIVIDE_USDT); // 发送消息队列 EthUsdtChargeDto dto = new EthUsdtChargeDto(toAddress,transactionHash,divide); usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto)); } redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString()); } }); @@ -88,7 +118,6 @@ return new EthFilter(DefaultBlockParameterName.EARLIEST, DefaultBlockParameterName.LATEST, contractAddr); } } } src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java
@@ -1,5 +1,6 @@ package com.xcong.excoin.modules.coin.service; import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; import com.xcong.excoin.modules.blackchain.model.RocTransferDetail; public interface BlockCoinService { @@ -20,4 +21,6 @@ public void updateRoc(RocTransferDetail transferDetail); void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto); } src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java
@@ -101,7 +101,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), newBalance, balance, 0); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance,null); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance, null); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.USDT.name(), 1, 1); @@ -151,7 +151,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.ETH.name(), null, balance,null); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.ETH.name(), null, balance, null); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.ETH.name(), 1, 1); @@ -201,7 +201,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "OMNI", balance,null); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "OMNI", balance, null); ThreadPoolUtils.sendDingTalk(5); MemberEntity member = memberDao.selectById(memberId); @@ -249,7 +249,7 @@ BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoin.getId(), newBalance, balance, 0); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.BTC.name(), null, balance,null); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.BTC.name(), null, balance, null); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.BTC.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -311,7 +311,7 @@ if (memberCoinAddressEntity != null) { memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0); // 添加冲币记录 String orderNo = insertCoinCharge(EosService.ACCOUNT, memberId, amount, CoinTypeEnum.EOS.name(), memo, BigDecimal.ZERO,null); String orderNo = insertCoinCharge(EosService.ACCOUNT, memberId, amount, CoinTypeEnum.EOS.name(), memo, BigDecimal.ZERO, null); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.EOS.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -388,7 +388,7 @@ if (memberCoinAddressEntity != null) { memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0); // 添加冲币记录 String orderNo = insertCoinCharge(XrpService.ACCOUNT, memberId, amount, CoinTypeEnum.XRP.name(), memo, BigDecimal.ZERO,null); String orderNo = insertCoinCharge(XrpService.ACCOUNT, memberId, amount, CoinTypeEnum.XRP.name(), memo, BigDecimal.ZERO, null); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.XRP.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -462,7 +462,7 @@ memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0); // 添加冲币记录 String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO,transactionId); String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO, transactionId); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.USDT.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); @@ -488,23 +488,23 @@ String address = transferDetail.getAddress(); BigDecimal balance = transferDetail.getBalance(); String symbol = transferDetail.getSymbol(); if(org.apache.commons.lang.StringUtils.isBlank(address) || org.apache.commons.lang.StringUtils.isBlank(symbol) || balance ==null ){ if (org.apache.commons.lang.StringUtils.isBlank(address) || org.apache.commons.lang.StringUtils.isBlank(symbol) || balance == null) { return; } if(balance.compareTo(new BigDecimal("0.0001"))<=0){ if (balance.compareTo(new BigDecimal("0.0001")) <= 0) { return; } MemberCoinAddressEntity memberCoinAddress = memberCoinAddressDao.selectCoinAddressByAddressAndSymbol(address, symbol); if(memberCoinAddress==null){ if (memberCoinAddress == null) { return; } Long memberId = memberCoinAddress.getMemberId(); // 查询钱包 并更新 MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.ROC.name()); if (walletCoinEntity == null) { // 创建一个钱包 // 创建一个钱包 // 创建这个钱包 walletCoinEntity = new MemberWalletCoinEntity(); walletCoinEntity.setAvailableBalance(BigDecimal.ZERO); @@ -518,11 +518,11 @@ memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), balance, BigDecimal.ZERO, 0); String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.ROC.name(), "", BigDecimal.ZERO,null); String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.ROC.name(), "", BigDecimal.ZERO, null); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", balance, CoinTypeEnum.ROC.name(), 1, 1); try{ try { ThreadPoolUtils.sendDingTalk(5); MemberEntity member = memberDao.selectById(memberId); if (StrUtil.isNotBlank(member.getPhone())) { @@ -531,10 +531,53 @@ } else { SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); } }catch (Exception e){ } catch (Exception e) { //e.printStackTrace(); } } @Override public void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto) { String address = ethUsdtChargeDto.getAddress(); String hash = ethUsdtChargeDto.getHash(); // hash没有用过 Map<String,Object> param = new HashMap<>(); param.put("hash",hash); param.put("address",address); List<MemberCoinChargeEntity> memberCoinChargeEntities = memberCoinChargeDao.selectByMap(param); if(CollectionUtils.isNotEmpty(memberCoinChargeEntities)){ return; } MemberCoinAddressEntity coinAddressEntity = memberCoinAddressDao.selectBlockAddressWithTag(null, CoinTypeEnum.USDT.toString(), "ERC20"); if (coinAddressEntity == null) { return; } Long memberId = coinAddressEntity.getMemberId(); BigDecimal balance = ethUsdtChargeDto.getBalance(); if (balance != null && balance.compareTo(new BigDecimal("0.1")) > 0) { balance = balance.setScale(8, RoundingMode.CEILING); BigDecimal early = BigDecimal.ZERO; MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.USDT.name()); if (walletCoinEntity == null) { return; } BigDecimal newBalance = balance.subtract(early); memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), newBalance, balance, 0); String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance, ethUsdtChargeDto.getHash()); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.USDT.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); MemberEntity member = memberDao.selectById(memberId); if (StrUtil.isNotBlank(member.getPhone())) { String amount = newBalance.toPlainString() + "USDT-ERC20"; Sms106Send.sendRechargeMsg(member.getPhone(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); } else { SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); } } } private String generateNo() { @@ -545,7 +588,7 @@ return String.valueOf(timestamp).substring(2) + random; } public String insertCoinCharge(String address, Long memberId, BigDecimal newBalance, String symbol, String tag, BigDecimal lastAmount,String hash) { public String insertCoinCharge(String address, Long memberId, BigDecimal newBalance, String symbol, String tag, BigDecimal lastAmount, String hash) { MemberCoinChargeEntity memberCoinChargeEntity = new MemberCoinChargeEntity(); memberCoinChargeEntity.setAddress(address); memberCoinChargeEntity.setMemberId(memberId); src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java
@@ -23,5 +23,6 @@ List<MemberCoinAddressEntity> selectAllBlockAddressBySymbol(@Param("symbol") String symbol); List<String> selectAllSymbolAddress(@Param("symbol")String symbol,@Param("tag") String tag); } src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -362,6 +362,7 @@ // 存储昨日K线 if("day".equals(rangeUnit)){ System.out.println("存储日K线"); redisUtils.set("ROC/USDT",kLine); } } src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java
@@ -24,10 +24,10 @@ /** * ETH_USDT 同步 */ @Scheduled(cron = "0 0/10 * * * ? ") public void ethUsdtUpdate() { blockCoinService.updateEthUsdt(); } //@Scheduled(cron = "0 0/10 * * * ? ") // public void ethUsdtUpdate() { // blockCoinService.updateEthUsdt(); // } /** * eth 同步 @@ -40,7 +40,7 @@ /** * BTC_USDT 同步 */ @Scheduled(cron = "0 2/10 * * * ? ") //@Scheduled(cron = "0 2/10 * * * ? ") public void btcUsdtUpdate() { blockCoinService.updateBtcUsdt(); } @@ -63,8 +63,8 @@ /** * ETH_USDT 同步 */ @Scheduled(cron = "0 0/5 * * * ? ") //Scheduled(cron = "0 0/5 * * * ? ") public void rocUpdate() { blockCoinService.updateEthUsdt(); // blockCoinService.updateEthUsdt(); } } src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -23,26 +23,6 @@ @Resource private CoinProcessorFactory processorFactory; @Resource private OrderCoinService orderCoinService; //@Scheduled(cron = "0/40 * * * * *") public void test(){ Random random = new Random(); Integer type = OrderCoinsDealEntity.ORDERTYPE_BUY; Integer tradeType = OrderCoinsDealEntity.TRADETYPE_FIXEDPRICE; double random1 = Math.random(); BigDecimal price = new BigDecimal(random1).setScale(4, RoundingMode.HALF_UP).multiply(new BigDecimal("2")); if(price.compareTo(BigDecimal.ZERO)==0){ price = BigDecimal.ONE; } System.out.println(price); orderCoinService.initOrders("ROC",type,tradeType,price,new BigDecimal(2),null); orderCoinService.initOrders("ROC",OrderCoinsDealEntity.ORDERTYPE_SELL,tradeType,price,new BigDecimal(2),null); } /** * 每分钟定时器,处理分钟K线 src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
@@ -38,11 +38,11 @@ } } @Scheduled(cron = "0 2/8 * * * ? ") public void usdtEthPoolCheck() { log.info("USDTETH归集结果扫描开始"); usdtEthService.usdtEthPoolCheck(); } // @Scheduled(cron = "0 2/8 * * * ? ") // public void usdtEthPoolCheck() { // log.info("USDTETH归集结果扫描开始"); // usdtEthService.usdtEthPoolCheck(); // } @Scheduled(cron = "0 2/30 * * * ? ") public void poolEth() { src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java
New file @@ -0,0 +1,31 @@ package com.xcong.excoin.quartz.job; import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; /** * 开启撮合交易 * * @author wzy * @date 2020-05-28 **/ @Slf4j @Component @ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true") public class UsdtErc20InitJob { @Resource private UsdtErc20UpdateService usdtErc20UpdateService; @PostConstruct public void initCoinTrade() { System.out.println("开启USDT同步"); usdtErc20UpdateService.updateUsdt(); } } src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
New file @@ -0,0 +1,33 @@ package com.xcong.excoin.rabbit.consumer; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; import com.xcong.excoin.modules.coin.service.BlockCoinService; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author wzy * @date 2020-05-25 **/ @Slf4j @Component public class UsdtUpdateConsumer { @Resource private BlockCoinService blockCoinService; @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE) public void doSomething(String content) { log.info("#---->{}#", content); EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class); // 更新这个用户的余额 blockCoinService.updateEthUsdtNew(ethUsdtChargeDto); } } src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
New file @@ -0,0 +1,42 @@ package com.xcong.excoin.rabbit.producer; import cn.hutool.core.util.IdUtil; import com.xcong.excoin.configurations.RabbitMqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * @author wzy * @date 2020-05-25 **/ @Slf4j @Component public class UsdtUpdateProducer implements RabbitTemplate.ConfirmCallback { private RabbitTemplate rabbitTemplate; @Autowired public UsdtUpdateProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } public void sendMsg(String content) { CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_UPDATE, RabbitMqConfig.ROUTING_KEY_USDT_UPDATE, content, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("#----->{}#", correlationData); if (ack) { log.info("success"); } else { log.info("--->{}", cause); } } } src/main/resources/mapper/member/MemberCoinAddressDao.xml
@@ -81,4 +81,11 @@ select * from member_coin_address where symbol=#{symbol} </select> <select id="selectAllSymbolAddress" resultType="string" parameterType="map"> select address from member_coin_address where symbol =#{symbol} <if test="tag!=null and tag !=''"> and tag = #{tag} </if> </select> </mapper>