zainali5120
2020-10-08 c24fc100ef9966495dc706e110fc37f13e003448
优化usdt同步
11 files modified
4 files added
307 ■■■■ changed files
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java 25 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java 25 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java 49 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java 43 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java 14 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java 20 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java 10 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java 31 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java 33 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java 42 ●●●●● patch | view | raw | blame | history
src/main/resources/mapper/member/MemberCoinAddressDao.xml 7 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -29,6 +29,14 @@
    public static final String EXCHANGE_A = "biue-exchange-A";
    public static final String EXCHANGE_USDT_UPDATE = "exchange_usdt_update";
    public static final String QUEUE_USDT_UPDATE = "queue_usdt_update";
    public static final String ROUTING_KEY_USDT_UPDATE = "routing_key_usdt_update";
    /**
     * 撮合交易
     */
@@ -131,6 +139,23 @@
    }
   @Bean
    public DirectExchange usdtUpdateExchange() {
        return new DirectExchange(EXCHANGE_USDT_UPDATE);
    }
    @Bean
    public Queue usdtUpdateQueue() {
        return new Queue(QUEUE_USDT_UPDATE, true);
    }
    @Bean
    public Binding usdtUpdatebinding() {
        return BindingBuilder.bind(usdtUpdateQueue()).to(usdtUpdateExchange()).with(ROUTING_KEY_USDT_UPDATE);
    }
    /**
     * 交换器A 可以继续添加交换器B C
     *
src/main/java/com/xcong/excoin/modules/blackchain/model/EthUsdtChargeDto.java
New file
@@ -0,0 +1,25 @@
package com.xcong.excoin.modules.blackchain.model;
import lombok.Data;
import java.math.BigDecimal;
/**
 *  充值扫块dto
 */
@Data
public class EthUsdtChargeDto {
    private String address;
    private String hash;
    private BigDecimal balance;
    public EthUsdtChargeDto() {
    }
    public EthUsdtChargeDto(String address, String hash, BigDecimal balance) {
        this.address = address;
        this.hash = hash;
        this.balance = balance;
    }
}
src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
@@ -178,6 +178,9 @@
                                coinAddress.setLabel(uuid);
                                memberMapper.insert(coinAddress);
                            }
                            if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(address)){
                                UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address);
                            }
                        }
                        break;
                    case "ROC":
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -1,6 +1,12 @@
package com.xcong.excoin.modules.blackchain.service;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.enumerates.CoinTypeEnum;
import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
import com.xcong.excoin.modules.member.dao.MemberCoinAddressDao;
import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer;
import com.xcong.excoin.utils.RedisUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Service;
import org.web3j.crypto.Credentials;
import org.web3j.protocol.Web3j;
@@ -18,11 +24,18 @@
@Service
public class UsdtErc20UpdateService {
    @Resource
    private UsdtUpdateProducer usdtUpdateProducer;
    @Resource
    private MemberCoinAddressDao  coinWalletDao;
    public final static List<String> ALL_ADDRESS_LIST = new ArrayList<>();
    public final static String USDT_BLOCK_NUM = "USDT_BLOCK_NUM";
    public final static String USDT_BLOCK_NUM_GOLDEN = "USDT_BLOCK_NUM_GOLDEN";
    private final static BigInteger DIVIDE_USDT = new BigInteger("1000000");
    private final static BigDecimal DIVIDE_USDT = new BigDecimal("1000000");
    private static Web3j web3;
@@ -58,22 +71,39 @@
    @Resource
    private RedisUtils redisUtils;
    public void updateUsdt(){
        // 首先查询所有的钱包地址
        List<String> tdCoinWallets = coinWalletDao.selectAllSymbolAddress(CoinTypeEnum.USDT.toString(),"ERC20");
        if(tdCoinWallets!=null){
            ALL_ADDRESS_LIST.addAll(tdCoinWallets);
        }
        // 获取最新区块
        String string = redisUtils.getString(USDT_BLOCK_NUM);
        String string = redisUtils.getString(USDT_BLOCK_NUM_GOLDEN);
        if(string==null){
            string = "11014249";
        }
        BigInteger blockNum = new BigInteger(string);
        Credentials credentials = Credentials.create(privateKey);
        EthUsdtContract contract = EthUsdtContract.load(contractAddr, web3, credentials, getStaticGasProvider());
        EthFilter filter = getFilter(new BigInteger("10943021"));
        EthUsdtContract contract = EthUsdtContract.load(contractAddr, getInstance(), credentials, getStaticGasProvider());
        EthFilter filter = getFilter(blockNum);
        contract.transferEventFlowable(filter).subscribe(e->{
            if(e!=null){
            if(e!=null && StringUtils.isNotBlank(e.to)){
                String transactionHash = e.log.getTransactionHash();
                BigInteger blockNumber1 = e.log.getBlockNumber();
                String toAddress = e.to;
                BigInteger tokenBalance = e.tokens;
                if(ALL_ADDRESS_LIST.contains(toAddress)){
                    System.out.println("存在本地的地址:"+toAddress);
                // 金额
                BigInteger divide = tokenBalance.divide(DIVIDE_USDT);
                // 发送消息队列 TODO
                    BigDecimal divide = new BigDecimal(tokenBalance.toString()).divide(DIVIDE_USDT);
                    // 发送消息队列
                    EthUsdtChargeDto dto = new EthUsdtChargeDto(toAddress,transactionHash,divide);
                    usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto));
                }
                redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString());
            }
        });
@@ -88,7 +118,6 @@
            return new EthFilter(DefaultBlockParameterName.EARLIEST,
                    DefaultBlockParameterName.LATEST, contractAddr);
        }
    }
}
src/main/java/com/xcong/excoin/modules/coin/service/BlockCoinService.java
@@ -1,5 +1,6 @@
package com.xcong.excoin.modules.coin.service;
import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
import com.xcong.excoin.modules.blackchain.model.RocTransferDetail;
public interface BlockCoinService {
@@ -20,4 +21,6 @@
    public void updateRoc(RocTransferDetail transferDetail);
    void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto);
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java
@@ -537,6 +537,49 @@
    }
    @Override
    public void updateEthUsdtNew(EthUsdtChargeDto ethUsdtChargeDto) {
        String address = ethUsdtChargeDto.getAddress();
        String hash = ethUsdtChargeDto.getHash();
        // hash没有用过
        Map<String,Object> param = new HashMap<>();
        param.put("hash",hash);
        param.put("address",address);
        List<MemberCoinChargeEntity> memberCoinChargeEntities = memberCoinChargeDao.selectByMap(param);
        if(CollectionUtils.isNotEmpty(memberCoinChargeEntities)){
            return;
        }
        MemberCoinAddressEntity coinAddressEntity = memberCoinAddressDao.selectBlockAddressWithTag(null, CoinTypeEnum.USDT.toString(), "ERC20");
        if (coinAddressEntity == null) {
            return;
        }
        Long memberId = coinAddressEntity.getMemberId();
        BigDecimal balance = ethUsdtChargeDto.getBalance();
        if (balance != null && balance.compareTo(new BigDecimal("0.1")) > 0) {
            balance = balance.setScale(8, RoundingMode.CEILING);
            BigDecimal early = BigDecimal.ZERO;
            MemberWalletCoinEntity walletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, CoinTypeEnum.USDT.name());
            if (walletCoinEntity == null) {
                return;
            }
            BigDecimal newBalance = balance.subtract(early);
            memberWalletCoinDao.updateBlockBalance(walletCoinEntity.getId(), newBalance, balance, 0);
            String orderNo = insertCoinCharge(address, memberId, newBalance, CoinTypeEnum.USDT.name(), "ERC20", balance, ethUsdtChargeDto.getHash());
            // 插入财务记录
            LogRecordUtils.insertMemberAccountMoneyChange(memberId, "转入", newBalance, CoinTypeEnum.USDT.name(), 1, 1);
            ThreadPoolUtils.sendDingTalk(5);
            MemberEntity member = memberDao.selectById(memberId);
            if (StrUtil.isNotBlank(member.getPhone())) {
                String amount = newBalance.toPlainString() + "USDT-ERC20";
                Sms106Send.sendRechargeMsg(member.getPhone(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo);
            } else {
                SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo);
            }
        }
    }
    private String generateNo() {
        // 生成订单号
        Long timestamp = System.currentTimeMillis();
src/main/java/com/xcong/excoin/modules/member/dao/MemberCoinAddressDao.java
@@ -23,5 +23,6 @@
    List<MemberCoinAddressEntity> selectAllBlockAddressBySymbol(@Param("symbol") String symbol);
    List<String> selectAllSymbolAddress(@Param("symbol")String symbol,@Param("tag") String tag);
}
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -362,6 +362,7 @@
        // 存储昨日K线
        if("day".equals(rangeUnit)){
            System.out.println("存储日K线");
            redisUtils.set("ROC/USDT",kLine);
        }
    }
src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java
@@ -24,10 +24,10 @@
    /**
     * ETH_USDT 同步
     */
    @Scheduled(cron = "0 0/10 * * * ? ")
    public void ethUsdtUpdate() {
        blockCoinService.updateEthUsdt();
    }
    //@Scheduled(cron = "0 0/10 * * * ? ")
//    public void ethUsdtUpdate() {
//        blockCoinService.updateEthUsdt();
//    }
    /**
     * eth 同步
@@ -40,7 +40,7 @@
    /**
     * BTC_USDT 同步
     */
    @Scheduled(cron = "0 2/10 * * * ? ")
    //@Scheduled(cron = "0 2/10 * * * ? ")
    public void btcUsdtUpdate() {
        blockCoinService.updateBtcUsdt();
    }
@@ -63,8 +63,8 @@
    /**
     * ETH_USDT 同步
     */
    @Scheduled(cron = "0 0/5 * * * ? ")
    //Scheduled(cron = "0 0/5 * * * ? ")
    public void rocUpdate() {
        blockCoinService.updateEthUsdt();
       // blockCoinService.updateEthUsdt();
    }
}
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -23,26 +23,6 @@
    @Resource
    private CoinProcessorFactory processorFactory;
    @Resource
    private OrderCoinService orderCoinService;
    //@Scheduled(cron = "0/40 * * * * *")
    public void test(){
        Random random = new Random();
        Integer type = OrderCoinsDealEntity.ORDERTYPE_BUY;
        Integer tradeType = OrderCoinsDealEntity.TRADETYPE_FIXEDPRICE;
        double random1 = Math.random();
        BigDecimal price = new BigDecimal(random1).setScale(4, RoundingMode.HALF_UP).multiply(new BigDecimal("2"));
        if(price.compareTo(BigDecimal.ZERO)==0){
            price = BigDecimal.ONE;
        }
        System.out.println(price);
        orderCoinService.initOrders("ROC",type,tradeType,price,new BigDecimal(2),null);
        orderCoinService.initOrders("ROC",OrderCoinsDealEntity.ORDERTYPE_SELL,tradeType,price,new BigDecimal(2),null);
    }
    /**
     * 每分钟定时器,处理分钟K线
src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
@@ -38,11 +38,11 @@
        }
    }
    @Scheduled(cron = "0 2/8 * * * ? ")
    public void usdtEthPoolCheck() {
        log.info("USDTETH归集结果扫描开始");
        usdtEthService.usdtEthPoolCheck();
    }
//    @Scheduled(cron = "0 2/8 * * * ? ")
//    public void usdtEthPoolCheck() {
//        log.info("USDTETH归集结果扫描开始");
//        usdtEthService.usdtEthPoolCheck();
//    }
    @Scheduled(cron = "0 2/30 * * * ? ")
    public void poolEth() {
src/main/java/com/xcong/excoin/quartz/job/UsdtErc20InitJob.java
New file
@@ -0,0 +1,31 @@
package com.xcong.excoin.quartz.job;
import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
/**
 *  开启撮合交易
 *
 * @author wzy
 * @date 2020-05-28
 **/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true")
public class UsdtErc20InitJob {
    @Resource
    private UsdtErc20UpdateService usdtErc20UpdateService;
    @PostConstruct
    public void initCoinTrade() {
        System.out.println("开启USDT同步");
        usdtErc20UpdateService.updateUsdt();
    }
}
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
New file
@@ -0,0 +1,33 @@
package com.xcong.excoin.rabbit.consumer;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
import com.xcong.excoin.modules.coin.service.BlockCoinService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
/**
 * @author wzy
 * @date 2020-05-25
 **/
@Slf4j
@Component
public class UsdtUpdateConsumer {
    @Resource
    private BlockCoinService blockCoinService;
    @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE)
    public void doSomething(String content) {
        log.info("#---->{}#", content);
        EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class);
        // 更新这个用户的余额
        blockCoinService.updateEthUsdtNew(ethUsdtChargeDto);
    }
}
src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
New file
@@ -0,0 +1,42 @@
package com.xcong.excoin.rabbit.producer;
import cn.hutool.core.util.IdUtil;
import com.xcong.excoin.configurations.RabbitMqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
 * @author wzy
 * @date 2020-05-25
 **/
@Slf4j
@Component
public class UsdtUpdateProducer implements RabbitTemplate.ConfirmCallback {
    private RabbitTemplate rabbitTemplate;
    @Autowired
    public UsdtUpdateProducer(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
        rabbitTemplate.setConfirmCallback(this);
    }
    public void sendMsg(String content) {
        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_UPDATE, RabbitMqConfig.ROUTING_KEY_USDT_UPDATE, content, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("#----->{}#", correlationData);
        if (ack) {
            log.info("success");
        } else {
            log.info("--->{}", cause);
        }
    }
}
src/main/resources/mapper/member/MemberCoinAddressDao.xml
@@ -81,4 +81,11 @@
        select * from member_coin_address
        where symbol=#{symbol}
    </select>
    <select id="selectAllSymbolAddress" resultType="string" parameterType="map">
        select address from member_coin_address where symbol =#{symbol}
        <if test="tag!=null and tag !=''">
            and tag = #{tag}
        </if>
    </select>
</mapper>