src/main/java/com/xcong/excoin/common/enumerates/CoinTypeEnum.java
@@ -6,5 +6,5 @@ * @author wzy */ public enum CoinTypeEnum { USDT, BTC, ETH, LTC, EOS, XRP, BCH, ETC,GBZ USDT, BTC, ETH, LTC, EOS, XRP, BCH, ETC,BZZ } src/main/java/com/xcong/excoin/common/enumerates/SymbolEnum.java
@@ -7,7 +7,7 @@ */ @Getter public enum SymbolEnum { GBZ("GBZ", "GBZ/USDT") BZZ("BZZ", "BZZ/USDT") ,BTC("BTC", "BTC/USDT") ,ETH("ETH", "ETH/USDT") ,LTC("LTC", "LTC/USDT") src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -127,6 +127,8 @@ public static final String ROUTING_KEY_ROC_ORDER_COMPLETE = "roc-order-routingKey-complete"; public static final String QUEUE_TRC20_BLOCK = "QUEUE_TRC20_BLOCK"; public static final String ROUTING_TRC20_BLOCK = "ROUTING_TRC20_BLOCK"; @Resource private ConnectionFactory connectionFactory; @@ -245,6 +247,16 @@ return BindingBuilder.bind(usdtAddressQueue()).to(usdtAddressExchange()).with(ROUTING_KEY_USDT_ADDRESS); } @Bean public Queue trc20Queue() { return new Queue(QUEUE_TRC20_BLOCK); } @Bean public Binding trc20Binding() { return BindingBuilder.bind(trc20Queue()).to(defaultExchange()).with(ROUTING_TRC20_BLOCK); } /** * 交换器A 可以继续添加交换器B C src/main/java/com/xcong/excoin/configurations/ScheduleConfig.java
New file @@ -0,0 +1,18 @@ package com.xcong.excoin.configurations; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.SchedulingConfigurer; import org.springframework.scheduling.config.ScheduledTaskRegistrar; import java.util.concurrent.Executors; @Configuration public class ScheduleConfig implements SchedulingConfigurer { @Override public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { taskRegistrar.setScheduler(Executors.newScheduledThreadPool(50)); } } src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java
@@ -13,6 +13,12 @@ private String address; private String hash; private BigDecimal balance; private String symbol; public interface Symbol{ String USDT_ERC20 = "USDT_ERC20"; String USDT_TRC20 = "USDT_TRC20"; } public EthUsdtChargeDto() { } src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java
@@ -17,7 +17,7 @@ public void updateXrp(); public void updateTrc20(); public void updateTrc20(EthUsdtChargeDto dto); public void updateRoc(RocTransferDetail transferDetail); src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java
@@ -411,77 +411,42 @@ } @Override public void updateTrc20() { // 首先去查redis上的上次同步时间 Object lastUpdateTime = redisUtils.get(trc20_update_key); SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm"); String start = null; if (lastUpdateTime == null) { // 没有 说明是第一次同步 此时从第一天开始同步2020 0905开始 start = "2020-09-05'T'00:00"; } else { // 有上次时间 start = lastUpdateTime.toString(); } // 去查询上次同步时间后的所有记录 //Trc20Service.getAddressTransactions() // 写入本次更新时间 String updateTime = format.format(new Date()); redisUtils.set(trc20_update_key, updateTime); public void updateTrc20(EthUsdtChargeDto dto) { String address = dto.getAddress(); BigDecimal amount = dto.getBalance(); String hash = dto.getHash(); // 判断有无 List<MemberCoinAddressEntity> addressList = memberCoinAddressDao.selectAllBlockAddressBySymbolAndTag(CoinTypeEnum.USDT.name(), "TRC20"); if (CollectionUtils.isNotEmpty(addressList)) { //List<MemberCoinAddressEntity> addressList = memberCoinAddressDao.selectAllBlockAddressBySymbolAndTag(CoinTypeEnum.USDT.name(), "TRC20"); Map<String, Object> hashParam = new HashMap<>(); for (MemberCoinAddressEntity coinAddressEntity : addressList) { String address = coinAddressEntity.getAddress(); List<Trc20TransactionsData> addressTransactions = Trc20Service.getAddressTransactions(address, start); if (CollectionUtils.isNotEmpty(addressTransactions)) { for (Trc20TransactionsData trc20TransactionsData : addressTransactions) { String transactionId = trc20TransactionsData.getTransaction_id(); String value = trc20TransactionsData.getValue(); // 本次转账金额 BigDecimal amount = new BigDecimal(value).divide(new BigDecimal("1000000")); // 校验token是否为trc20USD if (trc20TransactionsData.getToken_info() != null && trc20TransactionsData.getToken_info().containsKey("address")) { String tokenTrc = trc20TransactionsData.getToken_info().get("address").toString(); if (!"TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t".equals(tokenTrc)) { continue; } } else { continue; } // 校验hash是否已同步过 hashParam.put("hash", transactionId); hashParam.put("hash", hash); List<MemberCoinChargeEntity> memberCoinChargeEntities = memberCoinChargeDao.selectByMap(hashParam); if (CollectionUtils.isNotEmpty(memberCoinChargeEntities)) { // 若已同步过 continue; return; } // 添加钱包余额 // 用户ID Long memberId = coinAddressEntity.getMemberId(); MemberCoinAddressEntity memberCoinAddress = memberCoinAddressDao.selectCoinAddressByAddressAndSymbolTag(address, CoinTypeEnum.USDT.name(),"TRC20"); if (memberCoinAddress == null) { return; } Long memberId = memberCoinAddress.getMemberId(); // 查询钱包 并更新 MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.USDT.name()); memberWalletCoinDao.updateBlockBalance(memberWalletCoinEntity.getId(), amount, BigDecimal.ZERO, 0); // 添加冲币记录 String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO, transactionId); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.USDT.name(), 1, 1); String orderNo = insertCoinCharge(address, memberId, amount, CoinTypeEnum.USDT.name(), "TRC20", BigDecimal.ZERO, hash); // LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", amount, CoinTypeEnum.USDT.name(), 1, 1); ThreadPoolUtils.sendDingTalk(5); MemberEntity member = memberDao.selectById(memberId); if (StrUtil.isNotBlank(member.getPhone())) { //String amountEos = amount + "XRP"; Sms106Send.sendRechargeMsg(member.getPhone(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); // Sms106Send.sendRechargeMsg(member.getPhone(), new Date(), amount+"U", orderNo); } else { SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo); } } } } } } @Override @@ -505,7 +470,7 @@ } Long memberId = memberCoinAddress.getMemberId(); // 查询钱包 并更新 MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.GBZ.name()); MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.BZZ.name()); if (walletCoinEntity == null) { // 创建一个钱包 // 创建这个钱包 @@ -521,9 +486,9 @@ memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), balance, BigDecimal.ZERO, 0); String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.GBZ.name(), "", BigDecimal.ZERO, null); String orderNo = insertCoinCharge(address, memberId, balance, CoinTypeEnum.BZZ.name(), "", BigDecimal.ZERO, null); // 插入财务记录 LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", balance, CoinTypeEnum.GBZ.name(), 1, 1); LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", balance, CoinTypeEnum.BZZ.name(), 1, 1); try { ThreadPoolUtils.sendDingTalk(5); src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -809,7 +809,7 @@ @Override public Result findAllWalletCoinOrder() { List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectAllCoinDealsOrderBySymbol(CoinTypeEnum.GBZ.toString()); List<OrderCoinsDealEntity> orderCoinsDealEntities = orderCoinDealDao.selectAllCoinDealsOrderBySymbol(CoinTypeEnum.BZZ.toString()); return Result.ok(orderCoinsDealEntities); } src/main/java/com/xcong/excoin/modules/symbols/constants/SymbolsConstats.java
@@ -5,8 +5,8 @@ public class SymbolsConstats { public final static List<String> EXCHANGE_SYMBOLS = new ArrayList<>(); public final static String ROC = "GBZ"; public final static String ROC = "BZZ"; static { EXCHANGE_SYMBOLS.add("GBZ"); EXCHANGE_SYMBOLS.add("BZZ"); } } src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java
@@ -1,6 +1,8 @@ package com.xcong.excoin.quartz.job; import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.modules.coin.service.BlockCoinService; import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer; import com.xcong.excoin.utils.RedisUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; @@ -26,7 +28,11 @@ @Autowired private RedisUtils redisUtils; private @Autowired private TrxUsdtUpdateService trxUsdtUpdateService; @Autowired private UsdtUpdateProducer usdtUpdateProducer; public static ConcurrentLinkedQueue<Long> TRC_BLOCK = new ConcurrentLinkedQueue<>(); src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -1,16 +1,25 @@ package com.xcong.excoin.rabbit.consumer; import cn.hutool.http.HttpException; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; import com.xcong.excoin.modules.blackchain.service.Trc20Service; import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService; import com.xcong.excoin.modules.coin.service.BlockCoinService; import com.xcong.excoin.quartz.job.BlockCoinUpdateJob; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.StringUtils; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.springframework.web.client.RestClientException; import javax.annotation.Resource; import java.math.BigDecimal; /** * @author wzy @@ -25,20 +34,61 @@ @Resource private BlockCoinService blockCoinService; @Resource TrxUsdtUpdateService trxUsdtUpdateService; @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE) public void doSomething(String content) { log.info("#USDT同步---->{}#", content); EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class); // 更新这个用户的余额 if(EthUsdtChargeDto.Symbol.USDT_ERC20.equals(ethUsdtChargeDto.getSymbol())){ blockCoinService.updateEthUsdtNew(ethUsdtChargeDto); } if(EthUsdtChargeDto.Symbol.USDT_TRC20.equals(ethUsdtChargeDto.getSymbol())){ blockCoinService.updateTrc20(ethUsdtChargeDto); // 同步完直接归集 trxUsdtUpdateService.poolByAddress(ethUsdtChargeDto.getAddress()); } } @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS) public void addUsdtAddress(String content) { if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){ log.debug("#添加新地址---->{}#", content); UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(content); log.info("#添加新地址---->{}#", content); if(StringUtils.isBlank(content)){ return; } String[] split = content.split(","); if(split.length<2){ return; } String address = split[0]; String tag = split[1]; if("ERC20".equals(tag)){ UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address); } if("TRC20".equals(tag)){ TrxUsdtUpdateService.addressList.add(address); // 此时还需要给这个地址转账用于激活及后续手续费 Trc20Service.sendTrx(Trc20Service.TRX_PRIVATE_KEY,address,new BigDecimal(10)); } } } @RabbitListener(queues = RabbitMqConfig.QUEUE_TRC20_BLOCK) public void trc20BlockMsg(String content) { Long blocnNum = Long.parseLong(content); try { trxUsdtUpdateService.monitorCoinListener(blocnNum); } catch (RestClientException | HttpException e) { // 此时是连接问题 这个块需要重新扫描 log.info("查询区块超时:" + blocnNum); BlockCoinUpdateJob.TRC_BLOCK.add(blocnNum); } catch (Exception e) { e.printStackTrace(); } } } src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
@@ -34,12 +34,17 @@ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_ADDRESS, RabbitMqConfig.ROUTING_KEY_USDT_ADDRESS, content, correlationData); } public void sendTrc20BlockMsg(String content) { CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID()); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_TRC20_BLOCK, content, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("#----->{}#", correlationData); // log.info("#----->{}#", correlationData); if (ack) { log.info("success"); // log.info("success"); } else { log.info("--->{}", cause); } src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -22,8 +22,8 @@ return "EOS/USDT"; case "etcusdt": return "ETC/USDT"; case "gbzusdt": return "GBZ/USDT"; case "BZZusdt": return "BZZ/USDT"; default: return null; } @@ -33,8 +33,8 @@ switch (symbol) { case "BTC/USDT": return "btcusdt"; case "GBZ/USDT": return "gbzusdt"; case "BZZ/USDT": return "BZZusdt"; default: return null; } @@ -56,8 +56,8 @@ return "EOS_NEW_PRICE"; case "ETC/USDT": return "ETC_NEW_PRICE"; case "GBZ/USDT": return "GBZ_NEW_PRICE"; case "BZZ/USDT": return "BZZ_NEW_PRICE"; default: return null; } src/main/resources/mapper/member/MemberCoinAddressDao.xml
@@ -88,4 +88,21 @@ and tag = #{tag} </if> </select> <select id="selectCoinAddressByAddressAndSymbolTag" resultType="com.xcong.excoin.modules.member.entity.MemberCoinAddressEntity"> select * from member_coin_address <where> is_biyict = 1 <if test="symbol != null and symbol != ''"> and symbol = #{symbol} </if> <if test="address != null and address != ''"> and address = #{address} </if> <if test="tag != null and tag != ''"> and tag = #{tag} </if> </where> </select> </mapper>