From 5575818556096be1dadaf5ff356b5db4c832aaa2 Mon Sep 17 00:00:00 2001 From: Helius <wangdoubleone@gmail.com> Date: Wed, 26 May 2021 19:27:09 +0800 Subject: [PATCH] modify --- /dev/null | 76 --------------- src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java | 10 +- src/main/java/com/xcong/excoin/modules/member/service/impl/MemberServiceImpl.java | 1 src/main/java/com/xcong/excoin/common/system/controller/CommonController.java | 39 +++---- src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java | 6 src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 10 - pom.xml | 14 +- src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 71 +++++++------- src/main/resources/application.yml | 2 src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java | 11 + src/test/java/com/xcong/excoin/TrcTest.java | 35 +++--- 11 files changed, 96 insertions(+), 179 deletions(-) diff --git a/pom.xml b/pom.xml index 2b52ce0..822d885 100644 --- a/pom.xml +++ b/pom.xml @@ -293,13 +293,13 @@ <scope>system</scope> <systemPath>${basedir}/lib/taobao-sdk-java.jar</systemPath> </dependency> - <dependency> - <groupId>ztron-sdk</groupId> - <artifactId>ztron-sdk</artifactId> - <version>0.0.1</version> - <scope>system</scope> - <systemPath>${basedir}/lib/tron-sdk.jar</systemPath> - </dependency> +<!-- <dependency>--> +<!-- <groupId>ztron-sdk</groupId>--> +<!-- <artifactId>ztron-sdk</artifactId>--> +<!-- <version>0.0.1</version>--> +<!-- <scope>system</scope>--> +<!-- <systemPath>${basedir}/lib/tron-sdk.jar</systemPath>--> +<!-- </dependency>--> <dependency> <groupId>org.jetbrains</groupId> <artifactId>annotations</artifactId> diff --git a/src/main/java/com/xcong/excoin/common/system/controller/CommonController.java b/src/main/java/com/xcong/excoin/common/system/controller/CommonController.java index bca5a3c..76f20e3 100644 --- a/src/main/java/com/xcong/excoin/common/system/controller/CommonController.java +++ b/src/main/java/com/xcong/excoin/common/system/controller/CommonController.java @@ -11,7 +11,6 @@ import com.xcong.excoin.common.system.service.CommonService; import com.xcong.excoin.common.system.vo.Base64UploadFilesVo; import com.xcong.excoin.configurations.properties.AliOssProperties; -import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; import com.xcong.excoin.modules.platform.dao.SysExceptionDetailDao; import com.xcong.excoin.utils.MessageSourceUtils; import com.xcong.excoin.utils.OssUtils; @@ -140,25 +139,25 @@ } return Result.ok(MessageSourceUtils.getString("result_success_msg"), base64UploadFilesVo); } - - @Autowired - private TrxUsdtUpdateService trxUsdtUpdateService; - - @ApiOperation(value = "trc20测试") - @Transactional(rollbackFor = Exception.class) - @GetMapping(value = "/getTrc20Test") - public Result getTrc20Test() { - long start = System.currentTimeMillis(); - System.out.println(start); - long getnowblock = trxUsdtUpdateService.getnowblock(); - System.out.println(System.currentTimeMillis()); - Object trc20BlockNum = redisUtils.get("USDT_TRC20_BLOCK_NUM"); - TrxUsdtUpdateService.getblockbynum(BigInteger.valueOf(Long.parseLong(trc20BlockNum.toString()))); - long end = System.currentTimeMillis(); - System.out.println(end); - BigDecimal.ONE.divide(BigDecimal.ZERO); - return Result.ok(end - start); - } +// +// @Autowired +// private TrxUsdtUpdateService trxUsdtUpdateService; +// +// @ApiOperation(value = "trc20测试") +// @Transactional(rollbackFor = Exception.class) +// @GetMapping(value = "/getTrc20Test") +// public Result getTrc20Test() { +// long start = System.currentTimeMillis(); +// System.out.println(start); +// long getnowblock = trxUsdtUpdateService.getnowblock(); +// System.out.println(System.currentTimeMillis()); +// Object trc20BlockNum = redisUtils.get("USDT_TRC20_BLOCK_NUM"); +// TrxUsdtUpdateService.getblockbynum(BigInteger.valueOf(Long.parseLong(trc20BlockNum.toString()))); +// long end = System.currentTimeMillis(); +// System.out.println(end); +// BigDecimal.ONE.divide(BigDecimal.ZERO); +// return Result.ok(end - start); +// } @Autowired private SysExceptionDetailDao sysExceptionDetailDao; diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java index 3f547ce..69ceb02 100644 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java +++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java @@ -190,9 +190,9 @@ // 发送新增的地址到监听集合 usdtUpdateProducer.sendAddressMsg(address+","+"ERC20"); } else if ("TRC20".equals(lable)) { - Map<String, String> usdtMap = Trc20Service.createAddress(); - address = usdtMap.get("address"); - key = usdtMap.get("privateKey"); +// Map<String, String> usdtMap = Trc20Service.createAddress(); +// address = usdtMap.get("address"); +// key = usdtMap.get("privateKey"); map.put("address", address); // 发送新增的地址到监听集合 usdtUpdateProducer.sendAddressMsg(address+","+"TRC20"); diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/Trc20Service.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/Trc20Service.java deleted file mode 100644 index dca43d1..0000000 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/Trc20Service.java +++ /dev/null @@ -1,110 +0,0 @@ -package com.xcong.excoin.modules.blackchain.service; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONObject; -import com.sunlight.tronsdk.TrxService; -import com.sunlight.tronsdk.transaction.TransactionResult; -import com.xcong.excoin.modules.blackchain.model.Trc20TransactionsData; -import com.xcong.excoin.modules.blackchain.model.Trc20TransactionsResult; -import org.apache.commons.codec.binary.Hex; -import org.tron.common.crypto.SignInterface; -import org.tron.common.crypto.SignUtils; -import org.tron.common.utils.ByteArray; -import org.tron.common.utils.Utils; -import org.tron.walletserver.WalletApi; - -import java.math.BigDecimal; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * TRC20 服务类 - * https://cn.developers.tron.network/reference - */ -public class Trc20Service { - - private final static String FULL_NODE_URL = "https://api.trongrid.io"; - - public final static String TRX_PRIVATE_KEY = "a3f6e445fb7e8b835f84431cc6f4ffc991745338d61d9b2ca1575b822563e1f4"; - public final static String TRX_ADDRESS = "TUEZxDjAKw6aCwQJVKX9RGWZUKDyzFkunA"; - public final static String POOL_ADDRESS = "TXeawtLhKDPYKFhET1MnHRMXYVKPo5ThxB"; - - public final static String API_KEY="a7b0c96a-cfcd-474d-88c5-75c6277fedbf"; - - - /** - * 创建用户钱包地址 - **/ - public static Map<String,String> createAddress() { -// String url = http + "/wallet/generateaddress"; - SignInterface sign = SignUtils.getGeneratedRandomSign(Utils.getRandom(), true); - byte[] priKey = sign.getPrivateKey(); - byte[] address = sign.getAddress(); - String priKeyStr = Hex.encodeHexString(priKey); - String base58check = WalletApi.encode58Check(address); - String hexString = ByteArray.toHexString(address); - Map<String,String> jsonAddress = new HashMap<>(); - jsonAddress.put("address", base58check); - jsonAddress.put("hexAddress", hexString); - jsonAddress.put("privateKey", priKeyStr); - return jsonAddress; - } - - /** - * 转TRX - * @param sendPrivateKey - * @param receiveAddress - * @param amount - */ - public static void sendTrx(String sendPrivateKey,String receiveAddress,BigDecimal amount) { - TrxService service = new TrxService(); - try { - TransactionResult transactionResult = service.testSendTrxTransaction(sendPrivateKey, receiveAddress, amount,API_KEY); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - /** - * 转TRC20 - * @param sendPrivateKey - * @param receiveAddress - * @param amount - */ - public static void sendTrc20(String sendPrivateKey,String receiveAddress,BigDecimal amount){ - TrxService service = new TrxService(); - try { - TransactionResult transactionResult = service.sendTrc20TransactionTest(sendPrivateKey, receiveAddress, amount,API_KEY); - } catch (Exception e) { - e.printStackTrace(); - } - } - - public static BigDecimal getTrxBalance(String address){ - TrxService service = new TrxService(); - try { - BigDecimal trxBalanceTest = service.getTrxBalanceTest(address,API_KEY); - return trxBalanceTest; - } catch (Exception e) { - e.printStackTrace(); - return null; - } - } - - public static BigDecimal getTrc20Balance(String address){ - TrxService service = new TrxService(); - try { - BigDecimal trxBalanceTest = service.trc20BalanceOfTest(address,API_KEY); - return trxBalanceTest; - } catch (Exception e) { - e.printStackTrace(); - return null; - } - } - public static void main(String[] args) { - System.out.println(createAddress()); - } - -} diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/TrxUsdtUpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/TrxUsdtUpdateService.java deleted file mode 100644 index fbe30a5..0000000 --- a/src/main/java/com/xcong/excoin/modules/blackchain/service/TrxUsdtUpdateService.java +++ /dev/null @@ -1,378 +0,0 @@ -package com.xcong.excoin.modules.blackchain.service; - -import cn.hutool.core.math.MathUtil; -import cn.hutool.http.HttpResponse; -import cn.hutool.http.HttpUtil; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.xcong.excoin.common.enumerates.CoinTypeEnum; -import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; -import com.xcong.excoin.modules.member.dao.MemberCoinAddressDao; -import com.xcong.excoin.modules.member.entity.MemberCoinAddressEntity; -import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer; -import com.xcong.excoin.utils.RedisUtils; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; -import org.springframework.http.*; -import org.springframework.http.client.SimpleClientHttpRequestFactory; -import org.springframework.stereotype.Service; -import org.springframework.web.client.RestTemplate; -import org.tron.common.utils.ByteArray; -import org.tron.walletserver.WalletApi; - -import javax.annotation.Resource; -import java.math.BigDecimal; -import java.math.BigInteger; -import java.util.*; - -/** - * TRX TRC20服务类 - */ -@Slf4j -@Service -public class TrxUsdtUpdateService { - - public static List<String> addressList = new ArrayList<>(); - private static String http = "https://api.trongrid.io"; - - private static String TRC20_CONTRACT_ADDRESS = "TR7NHqjeKQxGTCi8q8ZY4pL8otSzgjLj6t"; - - /** - * 手续费 - */ - private final static BigDecimal TRX_FEE = new BigDecimal("10"); - - @Resource - private UsdtUpdateProducer usdtUpdateProducer; - - @Resource - private MemberCoinAddressDao memberCoinAddressDao; - - @Resource - RedisUtils redisUtils; - - /** - * 扫块 同步充值USDT-TRC20和TRX - */ - public void monitorCoinListener(Long blockNum) { - if (CollectionUtils.isEmpty(addressList)) { - List<MemberCoinAddressEntity> coinAddressList = memberCoinAddressDao.selectAllBlockAddressBySymbolAndTag(CoinTypeEnum.USDT.name(), "TRC20"); - if (CollectionUtils.isNotEmpty(coinAddressList)) { - coinAddressList.forEach(e -> { - addressList.add(e.getAddress()); - }); - } - } - if (CollectionUtils.isEmpty(addressList)) { - return; - } - - - // 解析区块 - httpTransactionInfo(addressList, blockNum); - - } - - /** - * 解析区块数据 同步用户充值 - * - * @param addressList - * @param num - */ - private void httpTransactionInfo(List<String> addressList, Long num) { - // 查询详情,包含了所有交易信息 - String transactionInfoByBlockNum = getblockbynum(BigInteger.valueOf(num)); - if (StringUtils.isBlank(transactionInfoByBlockNum)) { - return; - } -// log.info("--->{}, {}", num, System.currentTimeMillis()); - // 不用等到扫完再累加 只要进来就加 还有一个条件是必须查询出区块再加 否则当区块超过实际区块 -// redisUtils.set("USDT_TRC20_BLOCK_NUM", (num + 1L)); - JSONArray parseArray = JSON.parseObject(transactionInfoByBlockNum).getJSONArray("transactions"); - if (parseArray != null && parseArray.size() > 0) { - for (Object e : parseArray) { - try { -// String txId = JSON.parseObject(e.toString()).getString("id"); -// String contract_address = JSON.parseObject(e.toString()).getString("contract_address"); -// if(!"41a614f803b6fd780986a42c78ec9c7f77e6ded13c".equals(contract_address)){ -// continue; -// } - //判断 数据库 txId 有 就不用往下继续了 - JSONObject parseObject = JSON.parseObject(e.toString()); - String txId = parseObject.getString("txID"); - String contractRet = parseObject.getJSONArray("ret").getJSONObject(0).getString("contractRet"); - //交易成功 - if ("SUCCESS".equals(contractRet)) { - String type = parseObject.getJSONObject("raw_data").getJSONArray("contract").getJSONObject(0).getString("type"); - if ("TriggerSmartContract".equals(type)) { - //合约地址转账 - triggerSmartContract(addressList, txId, parseObject); - - } else if ("TransferContract".equals(type)) { - //trx 转账 - //transferContract(parseObject); - } - } - } catch (Exception exception) { - exception.printStackTrace(); - } - } - } - } - - - /** - * 比对本地地址 同步TRX充值 - * - * @param parseObject - */ - private void transferContract(JSONObject parseObject) { - //数量 - BigDecimal amount = parseObject.getJSONObject("raw_data").getJSONArray("contract").getJSONObject(0).getJSONObject("parameter").getJSONObject("value").getBigDecimal("amount"); - - //调用者地址 - String owner_address = parseObject.getJSONObject("raw_data").getJSONArray("contract").getJSONObject(0).getJSONObject("parameter").getJSONObject("value").getString("owner_address"); - owner_address = WalletApi.encode58Check(ByteArray.fromHexString(owner_address)); - - //转入地址 - String to_address = parseObject.getJSONObject("raw_data").getJSONArray("contract").getJSONObject(0).getJSONObject("parameter").getJSONObject("value").getString("to_address"); - to_address = WalletApi.encode58Check(ByteArray.fromHexString(to_address)); - - amount = amount.divide(new BigDecimal(1 + TransformUtil.getSeqNumByLong(0L, 6))); - - } - - /** - * 获取特定区块的所有交易 Info 信息 - * - * @param num 区块 - * @return - */ - public static String getTransactionInfoByBlockNum(BigInteger num) { - String url = http + "/wallet/gettransactioninfobyblocknum"; - Map<String, Object> map = new HashMap<>(); - map.put("num", num); - String param = JSON.toJSONString(map); - return postForEntity(url, param).getBody(); - } - - /** - * 获取特定区块的所有交易 Info 信息 - * - * @param num 区块 - * @return - */ - public static String getblockbynum(BigInteger num) { - String url = http + "/wallet/getblockbynum"; - Map<String, Object> map = new HashMap<>(); - map.put("num", num); - String param = JSON.toJSONString(map); -// return postForEntity(url, param).getBody(); - return postForEntityHuTool(url, param).body(); - } - - - /** - * https://cn.developers.tron.network/docs/%E4%BA%A4%E6%98%9311#%E4%BA%A4%E6%98%93%E7%A1%AE%E8%AE%A4%E6%96%B9%E6%B3%95 - * 按交易哈希查询交易 - * - * @param txId 交易id - * @return - */ - public static String getTransactionById(String txId) { - // String url = walletSolidityHttp + "/walletsolidity/gettransactionbyid"; - String url = http + "/wallet/gettransactionbyid"; - Map<String, Object> map = new HashMap<>(); - map.put("value", txId); - String param = JSON.toJSONString(map); - return postForEntity(url, param).getBody(); - } - - /** - * 执行 post 请求 - * - * @param url url - * @param param 请求参数 - * @return - */ - private static ResponseEntity<String> postForEntity(String url, String param) { - SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); - factory.setConnectTimeout(20000); - factory.setReadTimeout(20000); - RestTemplate restTemplate = new RestTemplate(factory); - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - headers.set("TRON-PRO-API-KEY", Trc20Service.API_KEY); - HttpEntity<String> request = new HttpEntity<>(param, headers); - ResponseEntity<String> result = restTemplate.postForEntity(url, request, String.class); -// System.out.println("url:" + url + ",param:" + param + ",result:" + result.getBody()); - return result; - } - - private static HttpResponse postForEntityHuTool(String url, String param) { - System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); - return HttpUtil.createPost(url).body(param) - .timeout(20000).contentType("application/json") - .header("TRON-PRO-API-KEY", Trc20Service.API_KEY) - .execute(); - } - - /** - * 比对本地地址 同步充值USDT-TRC20 - * - * @param addressList - * @param txId - * @param parseObject - */ - private void triggerSmartContract(List<String> addressList, String txId, JSONObject parseObject) { - //方法参数 - String data = parseObject.getJSONObject("raw_data").getJSONArray("contract").getJSONObject(0).getJSONObject("parameter").getJSONObject("value").getString("data"); - // 调用者地址 - String owner_address = parseObject.getJSONObject("raw_data").getJSONArray("contract").getJSONObject(0).getJSONObject("parameter").getJSONObject("value").getString("owner_address"); - owner_address = WalletApi.encode58Check(ByteArray.fromHexString(owner_address)); - //System.out.println("owner_address:"+owner_address); - // 合约地址 - String contract_address = parseObject.getJSONObject("raw_data").getJSONArray("contract").getJSONObject(0).getJSONObject("parameter").getJSONObject("value").getString("contract_address"); - contract_address = WalletApi.encode58Check(ByteArray.fromHexString(contract_address)); - - String dataStr = data.substring(8); - List<String> strList = TransformUtil.getStrList(dataStr, 64); - //System.out.println(strList); - if (strList.size() != 2) { - return; - } - - String to_address = TransformUtil.delZeroForNum(strList.get(0)); - - if (!to_address.startsWith("41")) { - to_address = "41" + to_address; - } - - to_address = WalletApi.encode58Check(ByteArray.fromHexString(to_address)); - //System.out.println("to_address:"+to_address); - String amountStr = TransformUtil.delZeroForNum(strList.get(1)); - - if (amountStr.length() > 0) { - amountStr = new BigInteger(amountStr, 16).toString(10); - } - - BigDecimal amount = BigDecimal.ZERO; - //相匹配的合约地址 - if (!TRC20_CONTRACT_ADDRESS.equals(contract_address)) { - return; - } - - //币种 - if (StringUtils.isNotEmpty(amountStr)) { - amount = new BigDecimal(amountStr).divide(new BigDecimal(1 + TransformUtil.getSeqNumByLong(0L, 6))); - } - for (String address : addressList) { - if (address.equals(to_address)) { - log.info("存在本地的地址:" + address); - // 金额 - // 发送消息队列 - EthUsdtChargeDto dto = new EthUsdtChargeDto(address, txId, amount); - dto.setSymbol(EthUsdtChargeDto.Symbol.USDT_TRC20); - usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto)); - log.info("===to_address:" + to_address + "===amount:" + amount); - } - } - - } - - /** - * 根据地址归集USDT-TRC20 - * - * @param address - * @return - */ - public boolean poolByAddress(String address) { - // 首先查询trx余额 - BigDecimal trxBalance = Trc20Service.getTrxBalance(address); - if (trxBalance == null) { - return false; - } - if (trxBalance.compareTo(TRX_FEE) >= 0) { - // 转 - BigDecimal trc20Balance = Trc20Service.getTrc20Balance(address); - if (trc20Balance == null) { - return false; - } - MemberCoinAddressEntity coinAddressEntity = memberCoinAddressDao.selectCoinAddressByAddressAndSymbolTag(address, "USDT", "TRC20"); - if (coinAddressEntity == null) { - return false; - } - Trc20Service.sendTrc20(coinAddressEntity.getPrivateKey(), Trc20Service.POOL_ADDRESS, trc20Balance); - // 需要将存在redis的待归集地址删除 - Object trc20_pool = redisUtils.get("TRC20_POOL"); - if (trc20_pool != null) { - List<String> poolList = (List) trc20_pool; - Iterator<String> iterator = poolList.iterator(); - while (iterator.hasNext()) { - String next = iterator.next(); - if (address.equals(next)) { - iterator.remove(); - } - } - if (CollectionUtils.isEmpty(poolList)) { - redisUtils.del("TRC20_POOL"); - } else { - redisUtils.set("TRC20_POOL", poolList); - } - } - return true; - } else { - Trc20Service.sendTrx(Trc20Service.TRX_PRIVATE_KEY, address, TRX_FEE); - // 将这个地址记录,后续同步 - Object trc20_pool = redisUtils.get("TRC20_POOL"); - List<String> poolList = new ArrayList<>(); - if (trc20_pool != null) { - poolList = (List) trc20_pool; - } - poolList.add(address); - redisUtils.set("TRC20_POOL", poolList); - return true; - } - } - - // https://api.trongrid.io/wallet/getnowblock - - /** - * 获取最新区块 - * - * @return - */ - public long getnowblock() { - String url = http + "/wallet/getnowblock"; - RestTemplate restTemplate = new RestTemplate(); - HttpHeaders headers = new HttpHeaders(); - headers.setContentType(MediaType.APPLICATION_JSON); - headers.set("TRON-PRO-API-KEY", Trc20Service.API_KEY); - HttpEntity<String> request = new HttpEntity<>(headers); - ResponseEntity<String> exchange = restTemplate.exchange(url, HttpMethod.GET, request, String.class); - String forObject = exchange.getBody(); - //System.out.println(forObject); - // String forObject = restTemplate.getForObject(url, String.class); - String number = JSON.parseObject(forObject).getJSONObject("block_header").getJSONObject("raw_data").getString("number"); - return Long.valueOf(number); - } - - /** - * 从tronscan.io查询最新区块 - * {"whole_block_count":29625671,"whole_pay":3392835760,"last_day_pay":460432,"last_day_block_count":28777} - * @return - */ - public Long getnowblockFromTronScan() { - String roundNum = Math.random() + ""; - String url = "https://apiasia.tronscan.io:5566/api/block/statistic?randomNum=" + roundNum; - SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory(); - factory.setConnectTimeout(20000); - factory.setReadTimeout(20000); - RestTemplate restTemplate = new RestTemplate(factory); - String forObject = restTemplate.getForObject(url, String.class); - String wholeBlockCount = JSON.parseObject(forObject).getString("whole_block_count"); - return Long.valueOf(wholeBlockCount); - } -} diff --git a/src/main/java/com/xcong/excoin/modules/member/service/impl/MemberServiceImpl.java b/src/main/java/com/xcong/excoin/modules/member/service/impl/MemberServiceImpl.java index a0fc791..2672612 100644 --- a/src/main/java/com/xcong/excoin/modules/member/service/impl/MemberServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/member/service/impl/MemberServiceImpl.java @@ -41,7 +41,6 @@ import com.xcong.excoin.utils.ThreadPoolUtils; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections4.Put; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java index 9a296df..d8af5e9 100644 --- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java +++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java @@ -19,9 +19,7 @@ */ @Slf4j @Component("msgDispatch") -public class MsgDispatch implements ApplicationContextAware { - - private ApplicationContext applicationContext; +public class MsgDispatch { @Autowired private MsgLogic msgLogic; @@ -35,11 +33,5 @@ log.info("#websocket json error:{}#", e); ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error")); } - } - - - @Override - public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { - this.applicationContext = applicationContext; } } diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java index 2ceb827..a6d7bef 100644 --- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java +++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java @@ -38,14 +38,14 @@ @ChannelHandler.Sharable public class WebSocketServerHandler extends ChannelInboundHandlerAdapter { - private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>(); +// private final ConcurrentMap<String, Integer> pingTimes = new ConcurrentHashMap<>(); private static final int MAX_UN_REC_PING_TIMES = 3; private WebSocketServerHandshaker handshaker; - @Resource(name = "msgDispatch") - private MsgDispatch msgDispatch; +// @Resource(name = "msgDispatch") +// private MsgDispatch msgDispatch; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { @@ -76,7 +76,7 @@ if (content.contains(Contans.HEART_BEAT)) { resetTimes(ctx.channel()); } else { - this.msgDispatch.webSocketDispatch(ctx, content); +// this.msgDispatch.webSocketDispatch(ctx, content); } } catch (ClassCastException e) { content = ((CloseWebSocketFrame) frame).reasonText(); @@ -88,40 +88,40 @@ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("[触发器触发]"); - if (evt instanceof IdleStateEvent) { - IdleStateEvent event = (IdleStateEvent) evt; - if (event.state() == IdleState.READER_IDLE) { - - } else if (event.state() == IdleState.WRITER_IDLE) { - /*写超时*/ - ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); - Integer times = pingTimes.get(ctx.channel().id().asShortText()); - if (times == null) { - times = 0; - } - /*读超时*/ - log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); - // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 - if (times >= MAX_UN_REC_PING_TIMES) { - log.info("===服务端===(写超时,关闭chanel)"); - // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 - ctx.channel().close(); - } else { - // 失败计数器加1 - times++; - pingTimes.remove(ctx.channel().id().asShortText()); - pingTimes.put(ctx.channel().id().asShortText(), times); - } - } else if (event.state() == IdleState.ALL_IDLE) { - /*总超时*/ - System.out.println("===服务端===(ALL_IDLE 总超时)"); - } - } +// if (evt instanceof IdleStateEvent) { +// IdleStateEvent event = (IdleStateEvent) evt; +// if (event.state() == IdleState.READER_IDLE) { +// +// } else if (event.state() == IdleState.WRITER_IDLE) { +// /*写超时*/ +// ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); +// Integer times = pingTimes.get(ctx.channel().id().asShortText()); +// if (times == null) { +// times = 0; +// } +// /*读超时*/ +// log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); +// // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 +// if (times >= MAX_UN_REC_PING_TIMES) { +// log.info("===服务端===(写超时,关闭chanel)"); +// // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 +// ctx.channel().close(); +// } else { +// // 失败计数器加1 +// times++; +// pingTimes.remove(ctx.channel().id().asShortText()); +// pingTimes.put(ctx.channel().id().asShortText(), times); +// } +// } else if (event.state() == IdleState.ALL_IDLE) { +// /*总超时*/ +// System.out.println("===服务端===(ALL_IDLE 总超时)"); +// } +// } } private void resetTimes(Channel channel) { - pingTimes.remove(channel.id().asShortText()); - pingTimes.put(channel.id().asShortText(), 0); +// pingTimes.remove(channel.id().asShortText()); +// pingTimes.put(channel.id().asShortText(), 0); } @Override @@ -132,6 +132,7 @@ private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { + System.out.println(111111111); // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java index 54cb224..e487aaf 100644 --- a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java +++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java @@ -1,6 +1,7 @@ package com.xcong.excoin.netty.initalizer; import com.xcong.excoin.netty.handler.WebSocketServerHandler; +import com.xcong.excoin.utils.SpringContextHolder; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.nio.NioSocketChannel; @@ -11,6 +12,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import javax.annotation.Resource; + /** * @author wzy * @email wangdoubleone@gmail.com @@ -19,8 +22,8 @@ @Component public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { - @Autowired - private WebSocketServerHandler webSocketServerHandler; +// @Autowired +// private WebSocketServerHandler webSocketServerHandler; @Override protected void initChannel(NioSocketChannel ch) throws Exception { @@ -32,8 +35,8 @@ cp.addLast(new HttpObjectAggregator(65536)); cp.addLast(new ChunkedWriteHandler()); // 心跳 - ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); +// ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); // 自定义业务handler - cp.addLast(webSocketServerHandler); + cp.addLast(new WebSocketServerHandler()); } } diff --git a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java index f40c91d..3b1258a 100644 --- a/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java +++ b/src/main/java/com/xcong/excoin/netty/server/WebSocketServer.java @@ -23,9 +23,9 @@ private EventLoopGroup work = new NioEventLoopGroup(); private ChannelFuture channelFuture; - - @Autowired - private WebSocketServerInitializer webSocketServerInitializer; +// +// @Autowired +// private WebSocketServerInitializer webSocketServerInitializer; @Override public void start() throws Exception { @@ -34,9 +34,9 @@ ServerBootstrap b = new ServerBootstrap(); b.group(boss, work) .channel(NioServerSocketChannel.class) - .childHandler(webSocketServerInitializer); + .childHandler(new WebSocketServerInitializer() ); - channelFuture = b.bind(9998).sync(); + channelFuture = b.bind(9982).sync(); log.info("[websocket服务器启动完成]-->{}", channelFuture.channel().localAddress()); } finally { diff --git a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java b/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java deleted file mode 100644 index cc3bd12..0000000 --- a/src/main/java/com/xcong/excoin/quartz/job/BlockCoinUpdateJob.java +++ /dev/null @@ -1,130 +0,0 @@ -package com.xcong.excoin.quartz.job; - -import cn.hutool.http.HttpException; -import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; -import com.xcong.excoin.modules.coin.service.BlockCoinService; -import com.xcong.excoin.utils.RedisUtils; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.scheduling.annotation.Async; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; -import org.springframework.web.client.RestClientException; - -import javax.annotation.Resource; -import java.util.concurrent.ConcurrentLinkedQueue; - -/** - * 链上币种同步任务 - * - * @author wzy - * @date 2020-07-02 - **/ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true") -public class BlockCoinUpdateJob { - - @Resource - private BlockCoinService blockCoinService; - - @Resource - private TrxUsdtUpdateService trxUsdtUpdateService; - - @Resource - RedisUtils redisUtils; - - - public static ConcurrentLinkedQueue<Long> TRC_BLOCK = new ConcurrentLinkedQueue<>(); - - /** - * TRC20_USDT 同步 - */ - @Scheduled(cron = "0/2 * * * * ? ") - @Async - public void usdtTc20Update() { - // 波场3秒出一个块 - Long blocnNum = TRC_BLOCK.poll(); - if (blocnNum == null) { - return; - } - redisUtils.set("USDT_TRC20_CURRENT_BLOCK_NUM", blocnNum); - try { - trxUsdtUpdateService.monitorCoinListener(blocnNum); - } catch (RestClientException | HttpException e) { - // 此时是连接问题 这个块需要重新扫描 - log.info("查询区块超时:" + blocnNum); - TRC_BLOCK.add(blocnNum); - } catch (Exception e) { - e.printStackTrace(); - } - - } - - @Scheduled(cron = "0 0/1 * * * ? ") - public void usdtTc20UpdateQueue() { - // 查询最新区块号 - long getnowblock = trxUsdtUpdateService.getnowblockFromTronScan() - 25; - // 拿到redis里最新区块 - Object trc20BlockNum = redisUtils.get("USDT_TRC20_BLOCK_NUM"); - if (trc20BlockNum == null) { - // 没有则取最新的块 - trc20BlockNum = getnowblock; - redisUtils.set("USDT_TRC20_BLOCK_NUM", getnowblock); - } - Long blockNum = Long.valueOf(trc20BlockNum.toString()); - if (getnowblock <= blockNum) { - // 如果当前区块比最新已确认区块还大,则不继续执行 - return; - } - // 得到最新区块和当前区块的差值 - Long diff = getnowblock-blockNum; - for(long i=1;i<=diff;i++){ - blockNum++; - TRC_BLOCK.add(blockNum); - } - // 将最新的最大区块放入redis - redisUtils.set("USDT_TRC20_BLOCK_NUM", blockNum); - } - - /** - * ETH_USDT 同步 使用扫块 废弃这个定时任务 - */ - //@Scheduled(cron = "0 0/10 * * * ? ") - //@Deprecated - public void ethUsdtUpdate() { - blockCoinService.updateEthUsdt(); - } - - /** - * eth 同步 - */ -// @Scheduled(cron = "0 1/20 * * * ? ") - public void ethUpdate() { - blockCoinService.updateEth(); - } - - /** - * BTC_USDT 同步 - */ -// @Scheduled(cron = "0 2/10 * * * ? ") - public void btcUsdtUpdate() { - blockCoinService.updateBtcUsdt(); - } - - // @Scheduled(cron = "0 3/20 * * * ? ") - public void btcUpdate() { - blockCoinService.updateBtc(); - } - - // @Scheduled(cron = "0 4/20 * * * ? ") - public void eosUpdate() { - blockCoinService.updateEos(); - } - - // @Scheduled(cron = "0 6/20 * * * ? ") - public void xrpUpdate() { - blockCoinService.updateXrp(); - } - -} diff --git a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java deleted file mode 100644 index bdbe078..0000000 --- a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java +++ /dev/null @@ -1,85 +0,0 @@ -package com.xcong.excoin.quartz.job; - -import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; -import com.xcong.excoin.modules.blackchain.service.UsdtEthService; -import com.xcong.excoin.utils.RedisUtils; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.collections.CollectionUtils; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.scheduling.annotation.Scheduled; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ExecutionException; - -/** - * 归集定时任务 - * - * @author wzy - * @date 2020-07-02 - **/ - -@Slf4j -@Component -@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true") -public class NotionalPoolingJob { - - @Resource - private UsdtEthService usdtEthService; - - @Resource - private RedisUtils redisUtils; - - @Resource - private TrxUsdtUpdateService trxUsdtUpdateService; - - /** - * usdt 归集 - */ - @Scheduled(cron = "0 5/30 * * * ? ") - public void poolUsdtEth() { - try { - log.info("USDT归集开始"); - usdtEthService.pool(); - log.info("USDT归集结束"); - } catch (ExecutionException | InterruptedException e) { - log.error("#usdt归集错误#", e); - } - } - - /** - * 使用扫块 不需要这个 - */ - //@Scheduled(cron = "0 2/8 * * * ? ") - @Deprecated - public void usdtEthPoolCheck() { - log.info("USDTETH归集结果扫描开始"); - usdtEthService.usdtEthPoolCheck(); - } - - @Scheduled(cron = "0 2/30 * * * ? ") - public void poolEth() { - try { - usdtEthService.ethPool(); - } catch (ExecutionException | InterruptedException e) { - log.info("#ETH归集错误#", e); - } - } - - /** - * 归集TRC20 - */ - @Scheduled(cron = "0 1/5 * * * ? ") - public void poolUsdtTrc20() { - Object trc20_pool = redisUtils.get("TRC20_POOL"); - if(trc20_pool==null){ - return; - } - List<String> list = (List)trc20_pool; - for(String address: list){ - trxUsdtUpdateService.poolByAddress(address); - } - } -} diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java deleted file mode 100644 index a6a1158..0000000 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java +++ /dev/null @@ -1,76 +0,0 @@ -package com.xcong.excoin.rabbit.consumer; - -import com.alibaba.fastjson.JSONObject; -import com.xcong.excoin.configurations.RabbitMqConfig; -import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto; -import com.xcong.excoin.modules.blackchain.service.Trc20Service; -import com.xcong.excoin.modules.blackchain.service.TrxUsdtUpdateService; -import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService; -import com.xcong.excoin.modules.coin.service.BlockCoinService; -import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang.StringUtils; -import org.springframework.amqp.rabbit.annotation.RabbitListener; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; -import java.math.BigDecimal; - -/** - * @author wzy - * @date 2020-05-25 - **/ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "app", name = "block-job", havingValue = "true") -public class UsdtUpdateConsumer { - - - @Resource - private BlockCoinService blockCoinService; - - @Resource - TrxUsdtUpdateService trxUsdtUpdateService; - - - @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_UPDATE) - public void doSomething(String content) { - log.info("#USDT同步---->{}#", content); - EthUsdtChargeDto ethUsdtChargeDto = JSONObject.parseObject(content, EthUsdtChargeDto.class); - // 更新这个用户的余额 - if(EthUsdtChargeDto.Symbol.USDT_ERC20.equals(ethUsdtChargeDto.getSymbol())){ - blockCoinService.updateEthUsdtNew(ethUsdtChargeDto); - } - if(EthUsdtChargeDto.Symbol.USDT_TRC20.equals(ethUsdtChargeDto.getSymbol())){ - blockCoinService.updateTrc20(ethUsdtChargeDto); - // 同步完直接归集 - trxUsdtUpdateService.poolByAddress(ethUsdtChargeDto.getAddress()); - } - - } - - @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS) - public void addUsdtAddress(String content) { - if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){ - log.debug("#添加新地址---->{}#", content); - if(StringUtils.isBlank(content)){ - return; - } - String[] split = content.split(","); - if(split.length<2){ - return; - } - String address = split[0]; - String tag = split[1]; - if("ERC20".equals(tag)){ - UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address); - } - if("TRC20".equals(tag)){ - TrxUsdtUpdateService.addressList.add(address); - // 此时还需要给这个地址转账用于激活及后续手续费 - Trc20Service.sendTrx(Trc20Service.TRX_PRIVATE_KEY,address,new BigDecimal(10)); - } - - } - } -} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a31813a..bd98de5 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,5 +1,5 @@ server: - port: 8888 + port: 8894 servlet: context-path: / diff --git a/src/test/java/com/xcong/excoin/TrcTest.java b/src/test/java/com/xcong/excoin/TrcTest.java index 85ae927..8e62a43 100644 --- a/src/test/java/com/xcong/excoin/TrcTest.java +++ b/src/test/java/com/xcong/excoin/TrcTest.java @@ -3,7 +3,6 @@ import cn.hutool.http.HttpResponse; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSON; -import com.xcong.excoin.modules.blackchain.service.Trc20Service; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; @@ -27,22 +26,22 @@ @Test public void trc20Test() { - String url = "https://api.trongrid.io/wallet/getblockbynum"; - - while(true) { - Object current = redisUtils.get("USDT_TRC20_CURRENT_BLOCK_NUM"); - Map<String, Object> map = new HashMap<>(); - map.put("num", current); - String param = JSON.toJSONString(map); - System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); - HttpResponse response = HttpUtil.createPost(url).body(param).contentType("application/json").header("TRON-PRO-API-KEY", Trc20Service.API_KEY).execute(); -// log.info(response.body()); - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } +// String url = "https://api.trongrid.io/wallet/getblockbynum"; +// +// while(true) { +// Object current = redisUtils.get("USDT_TRC20_CURRENT_BLOCK_NUM"); +// Map<String, Object> map = new HashMap<>(); +// map.put("num", current); +// String param = JSON.toJSONString(map); +// System.setProperty("https.protocols", "TLSv1,TLSv1.1,TLSv1.2"); +// HttpResponse response = HttpUtil.createPost(url).body(param).contentType("application/json").header("TRON-PRO-API-KEY", Trc20Service.API_KEY).execute(); +//// log.info(response.body()); +// +// try { +// Thread.sleep(2000); +// } catch (InterruptedException e) { +// e.printStackTrace(); +// } +// } } } -- Gitblit v1.9.1