From e1048126f8b32d263b92ffd891aaeb56e9de6a50 Mon Sep 17 00:00:00 2001
From: xiaoyong931011 <15274802129@163.com>
Date: Mon, 05 Dec 2022 10:52:16 +0800
Subject: [PATCH] Merge branch 'BNBWEB' of http://120.27.238.55:7000/r/sys-dapp into BNBWEB

---
 src/main/java/cc/mrbird/febs/dapp/chain/EthService.java           |   45 +++++++++
 src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java         |   24 ++++
 src/main/java/cc/mrbird/febs/job/ChainListenerJob.java            |   29 -----
 src/test/java/cc/mrbird/febs/MemberTest.java                      |    8 +
 src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java |    4 
 src/main/java/cc/mrbird/febs/dapp/chain/BaseCoinService.java      |   87 +++++++++++++++++
 src/test/java/cc/mrbird/febs/ChainTest.java                       |   56 +++++++++++
 src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java           |   10 ++
 8 files changed, 236 insertions(+), 27 deletions(-)

diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/BaseCoinService.java b/src/main/java/cc/mrbird/febs/dapp/chain/BaseCoinService.java
new file mode 100644
index 0000000..3cf11f5
--- /dev/null
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/BaseCoinService.java
@@ -0,0 +1,87 @@
+package cc.mrbird.febs.dapp.chain;
+
+import cc.mrbird.febs.common.contants.AppContants;
+import cc.mrbird.febs.common.utils.RedisUtils;
+import cc.mrbird.febs.dapp.entity.DappFundFlowEntity;
+import cc.mrbird.febs.dapp.entity.DappTransferRecordEntity;
+import cc.mrbird.febs.dapp.mapper.DappFundFlowDao;
+import cc.mrbird.febs.dapp.mapper.DappWalletCoinDao;
+import cc.mrbird.febs.dapp.utils.OnlineTransferUtil;
+import cc.mrbird.febs.rabbit.producer.ChainProducer;
+import cn.hutool.core.collection.CollUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.web3j.protocol.core.methods.response.Transaction;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.math.RoundingMode;
+import java.util.List;
+
+@Slf4j
+@Service
+public class BaseCoinService {
+
+    @Resource
+    private RedisUtils redisUtils;
+    @Resource
+    private DappFundFlowDao dappFundFlowDao;
+    @Resource
+    private DappWalletCoinDao dappWalletCoinDao;
+    @Resource
+    private ChainProducer chainProducer;
+
+    public void compile(Transaction e) {
+        redisUtils.set(AppContants.REDIS_KEY_BLOCK_USDT_NUM, e.getBlockNumber());
+
+        // 判断对方打款地址是否为源池地址
+        if (ChainEnum.BSC_USDT.getAddress().toLowerCase().equals(e.getTo())) {
+            log.info("触发BNB监听事件");
+            redisUtils.set(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM, e.getBlockNumber());
+
+            // 如果得到触发,则休眠10秒。 因为此处监听器触发可能优先于前端调用transfer接口
+            try {
+                Thread.sleep(10000);
+            } catch (InterruptedException ex) {
+                ex.printStackTrace();
+            }
+
+            BigDecimal amount = new BigDecimal(e.getValue()).divide(BigDecimal.TEN.pow(18), 8, RoundingMode.HALF_DOWN);
+
+            DappFundFlowEntity fundFlow = dappFundFlowDao.selectByFromHash(e.getHash(), null);
+            if (fundFlow != null && fundFlow.getStatus() != 1) {
+                return;
+            }
+
+            if (fundFlow == null) {
+                List<DappFundFlowEntity> flows = dappFundFlowDao.selectFundFlowListByAddress(e.getFrom(), 1);
+                if (CollUtil.isEmpty(flows)) {
+                    OnlineTransferUtil.addTransferRecord(e.getFrom(), e.getTo(), amount, e.getHash(), DappTransferRecordEntity.TRANSFER_SOURCE_FLAG_ONLINE, "USDT");
+                    log.info("本地无交易:{}", e.getHash());
+                    return;
+                }
+
+                for (DappFundFlowEntity flow : flows) {
+                    if (flow.getStatus() == 1) {
+                        if (amount.compareTo(flow.getAmount().multiply(flow.getNewestPrice()).setScale(4, RoundingMode.HALF_UP)) == 0) {
+                            fundFlow = flow;
+                            fundFlow.setFromHash(e.getHash());
+                            break;
+                        }
+                    }
+                }
+            }
+
+            if (fundFlow == null) {
+                return;
+            }
+
+            fundFlow.setAmount(fundFlow.getAmount().negate());
+            // 更改状态为已同步
+            fundFlow.setStatus(2);
+            dappFundFlowDao.updateById(fundFlow);
+
+        }
+    }
+}
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
index f88fb84..e62286b 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/ChainService.java
@@ -7,11 +7,13 @@
 import com.alibaba.fastjson.JSONObject;
 import io.reactivex.Flowable;
 import io.reactivex.disposables.Disposable;
+import io.reactivex.functions.Function;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.Interceptor;
 import okhttp3.OkHttpClient;
 import okhttp3.Request;
 import okhttp3.Response;
+import org.reactivestreams.Subscription;
 import org.springframework.data.repository.query.ParameterOutOfBoundsException;
 import org.springframework.util.Base64Utils;
 import org.web3j.abi.FunctionReturnDecoder;
@@ -25,15 +27,19 @@
 import org.web3j.protocol.core.DefaultBlockParameterName;
 import org.web3j.protocol.core.DefaultBlockParameterNumber;
 import org.web3j.protocol.core.methods.request.EthFilter;
+import org.web3j.protocol.core.methods.response.Log;
 import org.web3j.protocol.core.methods.response.TransactionReceipt;
 import org.web3j.protocol.http.HttpService;
 import org.web3j.protocol.websocket.WebSocketClient;
 import org.web3j.protocol.websocket.WebSocketService;
+import org.web3j.tx.Contract;
 import org.web3j.tx.gas.StaticGasProvider;
+import org.web3j.utils.Async;
 
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.math.RoundingMode;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.rmi.activation.UnknownObjectException;
@@ -126,6 +132,24 @@
 
     }
 
+    public static void wssBaseCoinEventListener(BigInteger startBlock, BaseCoinService event) {
+        WebSocketService ws = null;
+        WebSocketClient webSocketClient = null;
+        Web3j web3j = null;
+
+        try {
+            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
+            ws = new WebSocketService(webSocketClient, true);
+            ws.connect();
+            web3j = Web3j.build(ws);
+
+            Disposable subscribe = web3j.replayPastAndFutureTransactionsFlowable(new DefaultBlockParameterNumber(startBlock)).subscribe(event::compile, error ->{
+                log.error("监听链上异常", error);
+            });
+        }catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
 
     private static EthUsdtContract contract(String privateKey, String contractAddress, String url) {
         Credentials credentials = Credentials.create(privateKey);
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java b/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java
index 7e11eb8..8ab7dc6 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/ContractChainService.java
@@ -28,4 +28,8 @@
     BigInteger totalSupplyNFT();
 
     String safeMintNFT(String address);
+
+    String transferBaseToken(String address, BigDecimal amount);
+
+    BigDecimal balanceOfBaseToken(String address);
 }
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java b/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java
index 57ae327..0ea611c 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/EthService.java
@@ -426,4 +426,49 @@
             return "";
         }
     }
+
+    @Override
+    public String transferBaseToken(String address, BigDecimal amount) {
+        String gas = getGas();
+        try {
+            Credentials credentials = Credentials.create(privateKey);
+
+            EthGetTransactionCount ethGetTransactionCount = web3j
+                    .ethGetTransactionCount(ownerAddress, DefaultBlockParameterName.LATEST).sendAsync().get();
+
+            BigInteger nonce = ethGetTransactionCount.getTransactionCount();
+            BigInteger value = Convert.toWei(amount, Convert.Unit.ETHER).toBigInteger();
+            RawTransaction rawTransaction = RawTransaction.createEtherTransaction(nonce,
+                    Convert.toWei(gas, Convert.Unit.GWEI).toBigInteger(),
+                    Convert.toWei("100000", Convert.Unit.WEI).toBigInteger(), address, value);
+            byte[] signedMessage = TransactionEncoder.signMessage(rawTransaction, credentials);
+            String hexValue = Numeric.toHexString(signedMessage);
+
+            EthSendTransaction ethSendTransaction = web3j.ethSendRawTransaction(hexValue).sendAsync().get();
+            if (ethSendTransaction.hasError()) {
+                return "";
+            } else {
+                String transactionHash = ethSendTransaction.getTransactionHash();
+                return transactionHash;
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+            return "";
+        }
+    }
+
+    @Override
+    public BigDecimal balanceOfBaseToken(String address) {
+        EthGetBalance balanceWei;
+        try {
+            balanceWei = web3j.ethGetBalance(address, DefaultBlockParameterName.LATEST).send();
+            if (balanceWei.getResult() == null) {
+                return null;
+            }
+            return Convert.fromWei(balanceWei.getBalance().toString(), Convert.Unit.ETHER);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
 }
diff --git a/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java b/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java
index 61c6595..7de4a6c 100644
--- a/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java
+++ b/src/main/java/cc/mrbird/febs/dapp/chain/TrxService.java
@@ -123,4 +123,14 @@
     public String safeMintNFT(String address) {
         return null;
     }
+
+    @Override
+    public String transferBaseToken(String address, BigDecimal amount) {
+        return null;
+    }
+
+    @Override
+    public BigDecimal balanceOfBaseToken(String address) {
+        return null;
+    }
 }
diff --git a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java b/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java
index c59f2d3..cc656a5 100644
--- a/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java
+++ b/src/main/java/cc/mrbird/febs/job/ChainListenerJob.java
@@ -2,6 +2,7 @@
 
 import cc.mrbird.febs.common.contants.AppContants;
 import cc.mrbird.febs.common.utils.RedisUtils;
+import cc.mrbird.febs.dapp.chain.BaseCoinService;
 import cc.mrbird.febs.dapp.chain.ChainEnum;
 import cc.mrbird.febs.dapp.chain.ChainService;
 import cc.mrbird.febs.dapp.chain.ContractEventService;
@@ -23,9 +24,7 @@
 public class ChainListenerJob implements ApplicationRunner {
 
     @Autowired
-    private ContractEventService bscUsdtContractEvent;
-    @Autowired
-    private ContractEventService bscCoinContractEvent;
+    private BaseCoinService baseCoinService;
 
     @Autowired
     private RedisUtils redisUtils;
@@ -70,34 +69,14 @@
         log.info("区块链监听开始启动");
         Object incrementObj = redisUtils.get(AppContants.REDIS_KEY_BLOCK_ETH_INCREMENT_NUM);
         BigInteger newest = ChainService.getInstance(ChainEnum.BSC_TFC.name()).blockNumber();
-        BigInteger block;
+        BigInteger block = null;
         if (incrementObj == null) {
             block = newest;
         } else {
             block = (BigInteger) incrementObj;
         }
 
-        ChainService.wssContractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name());
-        ChainService.wssContractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name());
-
-//        BigInteger section = BigInteger.valueOf(5000);
-//        while (newest.subtract(block).compareTo(section) > -1) {
-//            BigInteger end = block.add(section);
-//
-//            BigInteger finalBlock = block;
-//            new Thread(() -> {
-//                log.info("监听:[{} - {}]", finalBlock, end);
-//                ChainService.contractEventListener(finalBlock, end, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name());
-//                ChainService.contractEventListener(finalBlock, end, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name());
-//            }).start();
-//
-//            block = block.add(section);
-//            if (block.compareTo(newest) > 0) {
-//                block = newest;
-//            }
-//        }
-//        ChainService.contractEventListener(block, bscUsdtContractEvent, ChainEnum.BSC_USDT_LISTENER.name());
-//        ChainService.contractEventListener(block, bscCoinContractEvent, ChainEnum.BSC_TFC_LISTENER.name());
+        ChainService.wssBaseCoinEventListener(block, baseCoinService);
 
         long end = System.currentTimeMillis();
         log.info("区块链监听启动完成, 消耗时间{}", end - start);
diff --git a/src/test/java/cc/mrbird/febs/ChainTest.java b/src/test/java/cc/mrbird/febs/ChainTest.java
index 136bc99..84f0521 100644
--- a/src/test/java/cc/mrbird/febs/ChainTest.java
+++ b/src/test/java/cc/mrbird/febs/ChainTest.java
@@ -15,13 +15,20 @@
 import cc.mrbird.febs.job.ProfitDailyJob;
 import cc.mrbird.febs.rabbit.producer.ChainProducer;
 import com.alibaba.fastjson.JSONObject;
+import io.reactivex.disposables.Disposable;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+import org.web3j.protocol.Web3j;
+import org.web3j.protocol.core.DefaultBlockParameterNumber;
+import org.web3j.protocol.http.HttpService;
+import org.web3j.protocol.websocket.WebSocketClient;
+import org.web3j.protocol.websocket.WebSocketService;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.math.RoundingMode;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -50,7 +57,54 @@
 
     @Test
     public void wssChainListener(){
-        ChainService.wssContractEventListener(new BigInteger("22819014"), bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
+        ChainService.wssContractEventListener(new BigInteger("23591200"), bscUsdtContractEvent, ChainEnum.BSC_USDT.name());
+    }
+
+
+    @Test
+    public void wssBaseChainListener(){
+//        WebSocketService ws = null;
+//        WebSocketClient webSocketClient = null;
+//        Web3j web3j = null;
+
+        try {
+//            webSocketClient = new WebSocketClient(new URI("wss://bsc-mainnet.nodereal.io/ws/v1/78074065950e4915aef4f12b6f357d16"));
+//            ws = new WebSocketService(webSocketClient, true);
+//            ws.connect();
+//            web3j = Web3j.build(ws);
+
+
+            HttpService service = new HttpService("https://bsc-dataseed1.ninicoin.io");
+            Web3j web3j = Web3j.build(service);
+
+//            EthFilter filter = getFilter(startBlock, null, "0xd9076245473060dda1a65f5f3d89a4d0598995e6");
+//
+//            Flowable<Log> eventFlowable = web3j.ethLogFlowable(filter);
+//            Disposable subscribe = eventFlowable.subscribe(log -> {
+//                System.out.println(1111111);
+//                System.out.println(log);
+//            }, error -> {
+//                log.error("币安监听异常", error);
+//            });
+            System.out.println(1111111);
+            Disposable subscription = web3j.blockFlowable(true).subscribe(block -> {
+                System.out.println(111);
+            }, error -> {
+                System.out.println(2222);
+            });
+
+//            Disposable subscribe = web3j.replayPastAndFutureTransactionsFlowable(new DefaultBlockParameterNumber(startBlock)).subscribe(tx -> {
+//                System.out.println(1111);
+//            });
+//            Disposable subscribe = web3j.transactionFlowable().subscribe(tx -> {
+//                System.out.println(1111);
+//            });
+//
+//            subscription.dispose();
+        } catch (Exception e) {
+            System.out.println(2222222);
+            e.printStackTrace();
+        }
     }
 
 //    @Autowired
diff --git a/src/test/java/cc/mrbird/febs/MemberTest.java b/src/test/java/cc/mrbird/febs/MemberTest.java
index 50933d3..85d9dca 100644
--- a/src/test/java/cc/mrbird/febs/MemberTest.java
+++ b/src/test/java/cc/mrbird/febs/MemberTest.java
@@ -42,7 +42,7 @@
     public void tranfer(){
         String address = "0xD9076245473060DDa1a65f5f3D89a4D0598995E6";
         BigDecimal amount = new BigDecimal("0.0009");
-        String hash = ChainService.getInstance(ChainEnum.BNB.name()).transfer(address, amount);
+        String hash = ChainService.getInstance(ChainEnum.BNB.name()).transferBaseToken(address, amount);
 ////        if(StrUtil.isEmpty(hash)){
 ////            return;
 ////        }
@@ -52,6 +52,12 @@
     }
 
     @Test
+    public void balance() {
+        BigDecimal hash = ChainService.getInstance(ChainEnum.BNB.name()).balanceOfBaseToken(ChainEnum.BNB.getAddress());
+        System.out.println(hash);
+    }
+
+    @Test
     public void refererIdsTest() {
         DappMemberEntity member = dappMemberDao.selectById(24);
 

--
Gitblit v1.9.1