From dcac20ece30dddba0fd21ca0d7a965de5914189d Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Mon, 12 Oct 2020 15:46:48 +0800
Subject: [PATCH] 交易所K线优化
---
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 34 ++++++++
src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java | 6 +
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 22 +++++
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java | 9 ++
src/main/java/com/xcong/excoin/processor/MarketService.java | 23 +++++
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 16 ++-
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java | 3
src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java | 9 +
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java | 3
src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java | 2
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml | 15 +++
src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java | 5 +
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java | 13 ++
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java | 20 ++--
src/main/resources/application-prodapp.yml | 2
15 files changed, 155 insertions(+), 27 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
index 9a4d13c..b9d1449 100644
--- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -36,6 +36,12 @@
public static final String ROUTING_KEY_USDT_UPDATE = "routing_key_usdt_update";
+ public static final String EXCHANGE_USDT_ADDRESS = "exchange_usdt_address";
+
+ public static final String QUEUE_USDT_ADDRESS= "queue_usdt_address";
+
+ public static final String ROUTING_KEY_USDT_ADDRESS = "routing_key_usdt_address";
+
/**
* 撮合交易
@@ -201,6 +207,22 @@
return BindingBuilder.bind(usdtUpdateQueue()).to(usdtUpdateExchange()).with(ROUTING_KEY_USDT_UPDATE);
}
+ @Bean
+ public DirectExchange usdtAddressExchange() {
+ return new DirectExchange(EXCHANGE_USDT_ADDRESS);
+ }
+
+
+ @Bean
+ public Queue usdtAddressQueue() {
+ return new Queue(QUEUE_USDT_ADDRESS, true);
+ }
+
+ @Bean
+ public Binding usdtAddressbinding() {
+ return BindingBuilder.bind(usdtAddressQueue()).to(usdtAddressExchange()).with(ROUTING_KEY_USDT_ADDRESS);
+ }
+
/**
* 交换器A 可以继续添加交换器B C
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java
index c52b39b..c2c9379 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java
@@ -10,6 +10,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.web3j.abi.FunctionEncoder;
@@ -237,8 +238,9 @@
// log.debug("transfer hexValue:" + hexValue);
- EthSendTransaction ethSendTransaction = web3j.ethSendRawTransaction(hexValue).sendAsync().get();
-
+ CompletableFuture<EthSendTransaction> ethSendTransactionCompletableFuture = web3j.ethSendRawTransaction(hexValue).sendAsync();
+ EthSendTransaction ethSendTransaction = ethSendTransactionCompletableFuture.get();
+ //return "hash";
if (ethSendTransaction.hasError()) {
// log.info("transfer error:", ethSendTransaction.getError().getMessage());
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
index 33c2c20..e21dc6b 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
@@ -6,6 +6,7 @@
import javax.annotation.Resource;
import com.xcong.excoin.modules.blackchain.service.*;
+import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -29,6 +30,9 @@
MemberDao memberDao;
@Resource
MemberCoinAddressDao memberMapper;
+
+ @Resource
+ private UsdtUpdateProducer usdtUpdateProducer;
@Override
public Result findBlockAddress(String symbol) {
@@ -178,9 +182,8 @@
coinAddress.setLabel(uuid);
memberMapper.insert(coinAddress);
}
- if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(address)){
- UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address);
- }
+ // 发送新增的地址到监听集合
+ usdtUpdateProducer.sendAddressMsg(address);
}
break;
case "ROC":
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
index 6daf22c..2ad5e2c 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -20,7 +20,9 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
@Service
public class UsdtErc20UpdateService {
@@ -88,8 +90,10 @@
Credentials credentials = Credentials.create(privateKey);
EthUsdtContract contract = EthUsdtContract.load(contractAddr, getInstance(), credentials, getStaticGasProvider());
EthFilter filter = getFilter(blockNum);
+ Map<String,BigInteger> map = new HashMap<String,BigInteger>();
+ map.put("blockNum",blockNum);
contract.transferEventFlowable(filter).subscribe(e->{
- if(e!=null && StringUtils.isNotBlank(e.to)){
+ if(e!=null && StringUtils.isNotBlank(e.to) && e.log.getBlockNumber()!=null){
String transactionHash = e.log.getTransactionHash();
BigInteger blockNumber1 = e.log.getBlockNumber();
String toAddress = e.to;
@@ -102,13 +106,16 @@
EthUsdtChargeDto dto = new EthUsdtChargeDto(toAddress,transactionHash,divide);
usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto));
}
-
- redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString());
+ if(map.get("blockNum").compareTo(blockNumber1)!=0){
+ redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString());
+ map.put("blockNum",blockNumber1);
+ }
}
});
}
+
private static EthFilter getFilter(BigInteger startBlock) {
if (startBlock != null) {
EthFilter filter = new EthFilter(new DefaultBlockParameterNumber(startBlock),
diff --git a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java
index 4979a57..aa6d7fb 100644
--- a/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java
+++ b/src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java
@@ -29,10 +29,10 @@
private static final BigDecimal LIMIT = new BigDecimal("50");
private static final BigDecimal LIMIT_ETH = new BigDecimal("0.2");
- private static final BigDecimal FEE = new BigDecimal("0.005");
+ private static final BigDecimal FEE = new BigDecimal("0.0042");
private static final BigDecimal ETH_TR_FEE = new BigDecimal("0.0032");
- public static String ETH_FEE = "0.005";
+ public static String ETH_FEE = "0.0042";
public static final String TOTAL_ADDRESS = "0x3d83A28B6C2d599d2B6D272c5DBcDC9c976d344F";
public static final String TOTAL_PRIVATE = "4a1ce332133d8917360c5f3b194f703a0cf5b86c4eea319b1cd01197e68dad27";
@@ -59,13 +59,13 @@
}
BigDecimal usdt = ethService.tokenGetBalance(address);
- log.info("地址:{}, 金额:{}", address, usdt);
+ //log.info("地址:{}, 金额:{}", address, usdt);
if (usdt != null && usdt.compareTo(LIMIT) > 0) {
usdt = usdt.subtract(new BigDecimal("0.01"));
// 查询eth是否足够
BigDecimal eth = EthService.getEthBlance(address);
- log.info("地址:{}, ETH:{}", address, eth);
+ //log.info("地址:{}, ETH:{}", address, eth);
if (eth != null && eth.compareTo(FEE) >= 0) {
MemberCoinAddressEntity memberCoinAddressEntity = memberCoinAddressDao.selectBlockAddressWithTag(memberId, CoinTypeEnum.USDT.name(), "ERC20");
if (memberCoinAddressEntity == null) {
@@ -82,14 +82,14 @@
String hash = ethService.tokenSend(privateKey, address, TOTAL_ADDRESS, usdtStr);
log.info("归集:{}", hash);
- if (StrUtil.isNotBlank(hash)) {
- // 归集成功更新状态 先保存本次的hash值,待交易成功后再更新
- coinCharge.setHash(hash);
- memberCoinChargeDao.updateById(coinCharge);
- }
+// if (StrUtil.isNotBlank(hash)) {
+// // 归集成功更新状态 先保存本次的hash值,待交易成功后再更新
+// coinCharge.setHash(hash);
+// memberCoinChargeDao.updateById(coinCharge);
+// }
} else {
String hash = ethService.ethSend(TOTAL_PRIVATE, TOTAL_ADDRESS, address, ETH_FEE);
- log.info("转手续费:{}", hash);
+ //log.info("转手续费:{}", hash);
}
}
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
index 989d4bc..0a9e313 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
@@ -1,5 +1,6 @@
package com.xcong.excoin.modules.coin.dao;
+import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
@@ -15,6 +16,8 @@
public interface OrderCoinDealDao extends BaseMapper<OrderCoinsDealEntity>{
List<OrderCoinsDealEntity> selectAllWalletCoinOrder(@Param("memberId")Long memberId);
+ BigDecimal sumTodayBuyAmount(@Param("memberId")Long memberId, @Param("symbol") String symbol);
+ BigDecimal sumTodayEntrustCntBuyAmount(@Param("memberId")Long memberId, @Param("symbol") String symbol);
List<OrderCoinsDealEntity> selectCoinOrderDealByOrderId(@Param("orderId")Long orderId);
List<OrderCoinsDealEntity> selectAllWalletCoinOrderBySymbol(@Param("memberId")Long memberId,@Param("symbol")String symbol);
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
index 0988cb9..eb034ca 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -347,6 +347,38 @@
if(!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())){
return Result.fail(MessageSourceUtils.getString("member_controller_0001"));
}
+ // 需要先
+ String phone = memberEntity.getPhone();
+ if(!"13632989240".equals(phone) && !"15158130575".equals(phone)){
+ if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
+ // 不能超过800个
+
+ if(amount!=null && amount.compareTo(new BigDecimal("800"))>0){
+ return Result.fail("买入额度受限");
+ }
+ BigDecimal bigDecimal = orderCoinDealDao.sumTodayBuyAmount(memberId, symbol);
+ if(bigDecimal==null){
+ bigDecimal= BigDecimal.ZERO;
+ }
+ amount= amount==null?BigDecimal.ZERO:amount;
+ bigDecimal = bigDecimal.add(amount);
+ if(bigDecimal!=null && bigDecimal.compareTo(new BigDecimal("800"))>0){
+ return Result.fail("买入额度受限");
+ }
+ // 挂单不能超过800
+ BigDecimal bigDecimal1 = orderCoinDealDao.sumTodayEntrustCntBuyAmount(memberId, symbol);
+ if(bigDecimal1==null){
+ bigDecimal1=BigDecimal.ZERO;
+ }
+ bigDecimal1 = bigDecimal1.add(amount);
+ if(bigDecimal1!=null && bigDecimal1.compareTo(new BigDecimal("800"))>0){
+ return Result.fail("买入额度受限");
+ }
+ }else{
+ return Result.fail("卖出受限");
+ }
+ }
+
BigDecimal nowPriceinBigDecimal = price;
//查询当前价
//BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
@@ -520,7 +552,7 @@
if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) {
orderSubmitProducer.sendCancelMsg(orderId);
// return this.cancelEntrustWalletCoinOrderForMatch(orderId);
- return Result.ok("order_service_0013");
+ return Result.ok(MessageSourceUtils.getString("order_service_0013"));
}
if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) {
return Result.fail(MessageSourceUtils.getString("order_service_0012"));
diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
index 872cae4..5873683 100644
--- a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
+++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -363,6 +363,9 @@
// 存储昨日K线
if("day".equals(rangeUnit)){
System.out.println("存储日K线");
+ kLine.setOpen(kLine.getClose());
+ kLine.setLow(kLine.getClose());
+ kLine.setHigh(kLine.getClose());
redisUtils.set("ROC/USDT",kLine);
}
}
diff --git a/src/main/java/com/xcong/excoin/processor/MarketService.java b/src/main/java/com/xcong/excoin/processor/MarketService.java
index d47b2c9..3f3f067 100644
--- a/src/main/java/com/xcong/excoin/processor/MarketService.java
+++ b/src/main/java/com/xcong/excoin/processor/MarketService.java
@@ -88,7 +88,17 @@
list = (List) data;
}
list.add(kLine);
- redisUtils.set(key, list);
+ int size = list.size();
+ if(size>500){
+ list = list.subList(size-500,size);
+ }
+ List lines = new ArrayList();
+ if(CollectionUtils.isNotEmpty(list)){
+ for(Object object:list){
+ lines.add(object);
+ }
+ }
+ redisUtils.set(key, lines);
// mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod());
}
@@ -122,4 +132,15 @@
return totalVolume;
}
+
+ public static void main(String[] args) {
+ List<String> list = new ArrayList<>();
+ list.add("1");
+ list.add("2");
+ list.add("3");
+ list.add("4");
+ list.add("5");
+ list=list.subList(2,5);
+ System.out.println(list);
+ }
}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
index 25b7fdc..f0d22e5 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
@@ -27,7 +27,7 @@
/**
* usdt 归集
*/
- @Scheduled(cron = "0 5/30 * * * ? ")
+ @Scheduled(cron = "0 15/30 * * * ? ")
public void poolUsdtEth() {
try {
log.info("USDT归集开始");
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
index c9560ed..7288fc6 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
+import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService;
import com.xcong.excoin.modules.coin.service.BlockCoinService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -32,4 +33,12 @@
// 更新这个用户的余额
blockCoinService.updateEthUsdtNew(ethUsdtChargeDto);
}
+
+ @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS)
+ public void addUsdtAddress(String content) {
+ if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){
+ log.info("#添加新地址---->{}#", content);
+ UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(content);
+ }
+ }
}
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
index 8f12cda..c89f4c9 100644
--- a/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
@@ -29,6 +29,11 @@
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_UPDATE, RabbitMqConfig.ROUTING_KEY_USDT_UPDATE, content, correlationData);
}
+ public void sendAddressMsg(String content) {
+ CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
+ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_ADDRESS, RabbitMqConfig.ROUTING_KEY_USDT_ADDRESS, content, correlationData);
+ }
+
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
index 8e48637..3889dff 100644
--- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
+++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -45,7 +45,7 @@
@OnOpen
public void onOpen(Session session) {
onlineCount.incrementAndGet(); // 在线数加1
- log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
+ // log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
}
/**
@@ -67,7 +67,7 @@
map.remove(session.getId());
}
}
- log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
+ //log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
}
/**
@@ -227,8 +227,14 @@
result.setRep(sub);
if (o != null) {
List<Candlestick> list = (List<Candlestick>) o;
+
+ if(list!=null && list.size()>300){
+ int size = list.size();
+ list = list.subList(size-300,size);
+ }
+ CandlestickModel model = null;
for (Candlestick candlestick : list) {
- CandlestickModel model = new CandlestickModel();
+ model = new CandlestickModel();
model.setAmount(candlestick.getAmount());
model.setClose(candlestick.getClose());
model.setCount(candlestick.getCount());
@@ -248,8 +254,8 @@
@OnError
public void onError(Session session, Throwable error) {
- log.error("发生错误");
- error.printStackTrace();
+ // log.error("发生错误");
+ //error.printStackTrace();
}
/**
diff --git a/src/main/resources/application-prodapp.yml b/src/main/resources/application-prodapp.yml
index edf1a56..0c57f69 100644
--- a/src/main/resources/application-prodapp.yml
+++ b/src/main/resources/application-prodapp.yml
@@ -95,7 +95,7 @@
redis_expire: 3000
kline-update-job: false
newest-price-update-job: false
- exchange-trade: true
+ exchange-trade: false
day-line: false
other-job: false
loop-job: false
diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
index fc2d73d..339e355 100644
--- a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
+++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -12,6 +12,21 @@
order by create_time desc
</select>
+ <select id="sumTodayBuyAmount" resultType="java.math.BigDecimal">
+ select sum(symbol_cnt) from coins_order_deal
+ where member_id = #{memberId} and symbol=#{symbol}
+ and order_type =1
+ and order_status=3
+ and DATE_FORMAT(create_time, '%Y-%m-%d') = DATE_FORMAT(now(),'%Y-%m-%d')
+ </select>
+ <select id="sumTodayEntrustCntBuyAmount" resultType="java.math.BigDecimal">
+ select sum(entrust_cnt) from coins_order
+ where member_id = #{memberId} and symbol=#{symbol}
+ and order_type =1
+ and order_status=1
+ and DATE_FORMAT(create_time, '%Y-%m-%d') = DATE_FORMAT(now(),'%Y-%m-%d')
+ </select>
+
<select id="selectCoinOrderDealByOrderId" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
select * from coins_order_deal where order_id = #{orderId}
order by create_time desc
--
Gitblit v1.9.1