wzy
2022-12-04 b8670e655a7b0aa581e49f448f3b0661ada77324
增加BNB监听逻辑、转账方法
7 files modified
1 files added
273 ■■■■ changed files
src/main/java/cc/mrbird/febs/dapp/chain/BaseCoinService.java 87 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java 24 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java 4 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/dapp/chain/EthService.java 45 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/job/ChainListenerJob.java 39 ●●●● patch | view | raw | blame | history
src/test/java/cc/mrbird/febs/ChainTest.java 56 ●●●●● patch | view | raw | blame | history
src/test/java/cc/mrbird/febs/MemberTest.java 8 ●●●● patch | view | raw | blame | history
src/main/java/cc/mrbird/febs/dapp/chain/BaseCoinService.java
New file
@@ -0,0 +1,87 @@
package cc.mrbird.febs.dapp.chain;
import cc.mrbird.febs.common.contants.AppContants;
import cc.mrbird.febs.common.utils.RedisUtils;
import cc.mrbird.febs.dapp.entity.DappFundFlowEntity;
import cc.mrbird.febs.dapp.entity.DappTransferRecordEntity;
import cc.mrbird.febs.dapp.mapper.DappFundFlowDao;
import cc.mrbird.febs.dapp.mapper.DappWalletCoinDao;
import cc.mrbird.febs.dapp.utils.OnlineTransferUtil;
import cc.mrbird.febs.rabbit.producer.ChainProducer;
import cn.hutool.core.collection.CollUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.web3j.protocol.core.methods.response.Transaction;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.util.List;
@Slf4j
@Service
public class BaseCoinService {
    @Resource
    private RedisUtils redisUtils;
    @Resource
    private DappFundFlowDao dappFundFlowDao;
    @Resource
    private DappWalletCoinDao dappWalletCoinDao;
    @Resource
    private ChainProducer chainProducer;
    public void compile(Transaction e) {
        redisUtils.set(AppContants.REDIS_KEY_BLOCK_USDT_NUM, e.getBlockNumber());
        // 判断对方打款地址是否为源池地址
        if (ChainEnum.BSC_USDT.getAddress().toLowerCase().equals(e.getTo())) {
            log.info("触发BNB监听事件");
            redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, e.getBlockNumber());
            // 如果得到触发,则休眠10秒。 因为此处监听器触发可能优先于前端调用transfer接口
            try {
                Thread.sleep(10000);
            } catch (InterruptedException ex) {
                ex.printStackTrace();
            }
            BigDecimal amount = new BigDecimal(e.getValue()).divide(BigDecimal.TEN.pow(18), 8, RoundingMode.HALF_DOWN);
            DappFundFlowEntity fundFlow = dappFundFlowDao.selectByFromHash(e.getHash(), null);
            if (fundFlow != null && fundFlow.getStatus() != 1) {
                return;
            }
            if (fundFlow == null) {
                List<DappFundFlowEntity> flows = dappFundFlowDao.selectFundFlowListByAddress(e.getFrom(), 1);
                if (CollUtil.isEmpty(flows)) {
                    OnlineTransferUtil.addTransferRecord(e.getFrom(), e.getTo(), amount, e.getHash(), DappTransferRecordEntity.TRANSFER_SOURCE_FLAG_ONLINE, "USDT");
                    log.info("本地无交易:{}", e.getHash());
                    return;
                }
                for (DappFundFlowEntity flow : flows) {
                    if (flow.getStatus() == 1) {
                        if (amount.compareTo(flow.getAmount().multiply(flow.getNewestPrice()).setScale(4, RoundingMode.HALF_UP)) == 0) {
                            fundFlow = flow;
                            fundFlow.setFromHash(e.getHash());
                            break;
                        }
                    }
                }
            }
            if (fundFlow == null) {
                return;
            }
            fundFlow.setAmount(fundFlow.getAmount().negate());
            // 更改状态为已同步
            fundFlow.setStatus(2);
            dappFundFlowDao.updateById(fundFlow);
        }
    }
}
src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
@@ -7,11 +7,13 @@
import com.alibaba.fastjson.JSONObject;
import io.reactivex.Flowable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import org.reactivestreams.Subscription;
import org.springframework.data.repository.query.ParameterOutOfBoundsException;
import org.springframework.util.Base64Utils;
import org.web3j.abi.FunctionReturnDecoder;
@@ -25,15 +27,19 @@
import org.web3j.protocol.core.DefaultBlockParameterName;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.core.methods.request.EthFilter;
import org.web3j.protocol.core.methods.response.Log;
import org.web3j.protocol.core.methods.response.TransactionReceipt;
import org.web3j.protocol.http.HttpService;
import org.web3j.protocol.websocket.WebSocketClient;
import org.web3j.protocol.websocket.WebSocketService;
import org.web3j.tx.Contract;
import org.web3j.tx.gas.StaticGasProvider;
import org.web3j.utils.Async;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.rmi.activation.UnknownObjectException;
@@ -126,6 +132,24 @@
    }
    public static void wssBaseCoinEventListener(BigInteger startBlock, BaseCoinService event) {
        WebSocketService ws = null;
        WebSocketClient webSocketClient = null;
        Web3j web3j = null;
        try {
            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
            ws = new WebSocketService(webSocketClient, true);
            ws.connect();
            web3j = Web3j.build(ws);
            Disposable subscribe = web3j.replayPastAndFutureTransactionsFlowable(new DefaultBlockParameterNumber(startBlock)).subscribe(event::compile, error ->{
                log.error("监听链上异常", error);
            });
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static EthUsdtContract contract(String privateKey, String contractAddress, String url) {
        Credentials credentials = Credentials.create(privateKey);
src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java
@@ -28,4 +28,8 @@
    BigInteger totalSupplyNFT();
    String safeMintNFT(String address);
    String transferBaseToken(String address, BigDecimal amount);
    BigDecimal balanceOfBaseToken(String address);
}
src/main/java/cc/mrbird/febs/dapp/chain/EthService.java
@@ -426,4 +426,49 @@
            return "";
        }
    }
    @Override
    public String transferBaseToken(String address, BigDecimal amount) {
        String gas = getGas();
        try {
            Credentials credentials = Credentials.create(privateKey);
            EthGetTransactionCount ethGetTransactionCount = web3j
                    .ethGetTransactionCount(ownerAddress, DefaultBlockParameterName.LATEST).sendAsync().get();
            BigInteger nonce = ethGetTransactionCount.getTransactionCount();
            BigInteger value = Convert.toWei(amount, Convert.Unit.ETHER).toBigInteger();
            RawTransaction rawTransaction = RawTransaction.createEtherTransaction(nonce,
                    Convert.toWei(gas, Convert.Unit.GWEI).toBigInteger(),
                    Convert.toWei("100000", Convert.Unit.WEI).toBigInteger(), address, value);
            byte[] signedMessage = TransactionEncoder.signMessage(rawTransaction, credentials);
            String hexValue = Numeric.toHexString(signedMessage);
            EthSendTransaction ethSendTransaction = web3j.ethSendRawTransaction(hexValue).sendAsync().get();
            if (ethSendTransaction.hasError()) {
                return "";
            } else {
                String transactionHash = ethSendTransaction.getTransactionHash();
                return transactionHash;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return "";
        }
    }
    @Override
    public BigDecimal balanceOfBaseToken(String address) {
        EthGetBalance balanceWei;
        try {
            balanceWei = web3j.ethGetBalance(address, DefaultBlockParameterName.LATEST).send();
            if (balanceWei.getResult() == null) {
                return null;
            }
            return Convert.fromWei(balanceWei.getBalance().toString(), Convert.Unit.ETHER);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}
src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java
@@ -123,4 +123,14 @@
    public String safeMintNFT(String address) {
        return null;
    }
    @Override
    public String transferBaseToken(String address, BigDecimal amount) {
        return null;
    }
    @Override
    public BigDecimal balanceOfBaseToken(String address) {
        return null;
    }
}
src/main/java/cc/mrbird/febs/job/ChainListenerJob.java
@@ -2,6 +2,7 @@
import cc.mrbird.febs.common.contants.AppContants;
import cc.mrbird.febs.common.utils.RedisUtils;
import cc.mrbird.febs.dapp.chain.BaseCoinService;
import cc.mrbird.febs.dapp.chain.ChainEnum;
import cc.mrbird.febs.dapp.chain.ChainService;
import cc.mrbird.febs.dapp.chain.ContractEventService;
@@ -23,9 +24,7 @@
public class ChainListenerJob implements ApplicationRunner {
    @Autowired
    private ContractEventService bscUsdtContractEvent;
    @Autowired
    private ContractEventService bscCoinContractEvent;
    private BaseCoinService baseCoinService;
    @Autowired
    private RedisUtils redisUtils;
@@ -70,34 +69,14 @@
        log.info("区块链监听开始启动");
        Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM);
        BigInteger newest = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber();
        BigInteger block;
        if (incrementObj == null) {
            block = newest;
        } else {
            block = (BigInteger) incrementObj;
        }
        ChainService.wssContractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name());
        ChainService.wssContractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name());
//        BigInteger section = BigInteger.valueOf(5000);
//        while (newest.subtract(block).compareTo(section) > -1) {
//            BigInteger end = block.add(section);
//
//            BigInteger finalBlock = block;
//            new Thread(() -> {
//                log.info("监听:[{} - {}]", finalBlock, end);
//                ChainService.contractEventListener(finalBlock, end, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name());
//                ChainService.contractEventListener(finalBlock, end, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name());
//            }).start();
//
//            block = block.add(section);
//            if (block.compareTo(newest) > 0) {
//                block = newest;
//            }
        BigInteger block = new BigInteger("23591200");
//        if (incrementObj == null) {
//            block = newest;
//        } else {
//            block = (BigInteger) incrementObj;
//        }
//        ChainService.contractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name());
//        ChainService.contractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name());
        ChainService.wssBaseCoinEventListener(block, baseCoinService);
        long end = System.currentTimeMillis();
        log.info("区块链监听启动完成, 消耗时间{}", end - start);
src/test/java/cc/mrbird/febs/ChainTest.java
@@ -15,13 +15,20 @@
import cc.mrbird.febs.job.ProfitDailyJob;
import cc.mrbird.febs.rabbit.producer.ChainProducer;
import com.alibaba.fastjson.JSONObject;
import io.reactivex.disposables.Disposable;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.web3j.protocol.Web3j;
import org.web3j.protocol.core.DefaultBlockParameterNumber;
import org.web3j.protocol.http.HttpService;
import org.web3j.protocol.websocket.WebSocketClient;
import org.web3j.protocol.websocket.WebSocketService;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -50,7 +57,54 @@
    @Test
    public void wssChainListener(){
        ChainService.wssContractEventListener(new BigInteger("22819014"), bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
        ChainService.wssContractEventListener(new BigInteger("23591200"), bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
    }
    @Test
    public void wssBaseChainListener(){
//        WebSocketService ws = null;
//        WebSocketClient webSocketClient = null;
//        Web3j web3j = null;
        try {
//            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
//            ws = new WebSocketService(webSocketClient, true);
//            ws.connect();
//            web3j = Web3j.build(ws);
            HttpService service = new HttpService("https://bsc-dataseed1.ninicoin.io");
            Web3j web3j = Web3j.build(service);
//            EthFilter filter = getFilter(startBlock, null, "0xd9076245473060dda1a65f5f3d89a4d0598995e6");
//
//            Flowable<Log> eventFlowable = web3j.ethLogFlowable(filter);
//            Disposable subscribe = eventFlowable.subscribe(log -> {
//                System.out.println(1111111);
//                System.out.println(log);
//            }, error -> {
//                log.error("币安监听异常", error);
//            });
            System.out.println(1111111);
            Disposable subscription = web3j.blockFlowable(true).subscribe(block -> {
                System.out.println(111);
            }, error -> {
                System.out.println(2222);
            });
//            Disposable subscribe = web3j.replayPastAndFutureTransactionsFlowable(new DefaultBlockParameterNumber(startBlock)).subscribe(tx -> {
//                System.out.println(1111);
//            });
//            Disposable subscribe = web3j.transactionFlowable().subscribe(tx -> {
//                System.out.println(1111);
//            });
//
//            subscription.dispose();
        } catch (Exception e) {
            System.out.println(2222222);
            e.printStackTrace();
        }
    }
//    @Autowired
src/test/java/cc/mrbird/febs/MemberTest.java
@@ -42,7 +42,7 @@
    public void tranfer(){
        String address = "0xD9076245473060DDa1a65f5f3D89a4D0598995E6";
        BigDecimal amount = new BigDecimal("0.0009");
        String hash = ChainService.getInstance(ChainEnum.BNB.name()).transfer(address, amount);
        String hash = ChainService.getInstance(ChainEnum.BNB.name()).transferBaseToken(address, amount);
////        if(StrUtil.isEmpty(hash)){
////            return;
////        }
@@ -52,6 +52,12 @@
    }
    @Test
    public void balance() {
        BigDecimal hash = ChainService.getInstance(ChainEnum.BNB.name()).balanceOfBaseToken(ChainEnum.BNB.getAddress());
        System.out.println(hash);
    }
    @Test
    public void refererIdsTest() {
        DappMemberEntity member = dappMemberDao.selectById(24);