zainali5120
2020-10-12 dcac20ece30dddba0fd21ca0d7a965de5914189d
交易所K线优化
15 files modified
182 ■■■■ changed files
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java 9 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java 13 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java 20 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 34 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/MarketService.java 23 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java 2 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java 9 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java 5 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java 16 ●●●●● patch | view | raw | blame | history
src/main/resources/application-prodapp.yml 2 ●●● patch | view | raw | blame | history
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml 15 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -36,6 +36,12 @@
    public static final String ROUTING_KEY_USDT_UPDATE = "routing_key_usdt_update";
    public static final String EXCHANGE_USDT_ADDRESS = "exchange_usdt_address";
    public static final String QUEUE_USDT_ADDRESS= "queue_usdt_address";
    public static final String ROUTING_KEY_USDT_ADDRESS = "routing_key_usdt_address";
    /**
     * 撮合交易
@@ -201,6 +207,22 @@
        return BindingBuilder.bind(usdtUpdateQueue()).to(usdtUpdateExchange()).with(ROUTING_KEY_USDT_UPDATE);
    }
    @Bean
    public DirectExchange usdtAddressExchange() {
        return new DirectExchange(EXCHANGE_USDT_ADDRESS);
    }
    @Bean
    public Queue usdtAddressQueue() {
        return new Queue(QUEUE_USDT_ADDRESS, true);
    }
    @Bean
    public Binding usdtAddressbinding() {
        return BindingBuilder.bind(usdtAddressQueue()).to(usdtAddressExchange()).with(ROUTING_KEY_USDT_ADDRESS);
    }
    /**
     * 交换器A 可以继续添加交换器B C
src/main/java/com/xcong/excoin/modules/blackchain/service/EthService.java
@@ -10,6 +10,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.web3j.abi.FunctionEncoder;
@@ -237,8 +238,9 @@
        // log.debug("transfer hexValue:" + hexValue);
        EthSendTransaction ethSendTransaction = web3j.ethSendRawTransaction(hexValue).sendAsync().get();
        CompletableFuture<EthSendTransaction> ethSendTransactionCompletableFuture = web3j.ethSendRawTransaction(hexValue).sendAsync();
        EthSendTransaction ethSendTransaction = ethSendTransactionCompletableFuture.get();
        //return "hash";
        if (ethSendTransaction.hasError()) {
            // log.info("transfer error:", ethSendTransaction.getError().getMessage());
src/main/java/com/xcong/excoin/modules/blackchain/service/Impl/BlockSeriveImpl.java
@@ -6,6 +6,7 @@
import javax.annotation.Resource;
import com.xcong.excoin.modules.blackchain.service.*;
import com.xcong.excoin.rabbit.producer.UsdtUpdateProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -29,6 +30,9 @@
    MemberDao memberDao;
    @Resource
    MemberCoinAddressDao memberMapper;
    @Resource
    private UsdtUpdateProducer usdtUpdateProducer;
    @Override
    public Result findBlockAddress(String symbol) {
@@ -178,9 +182,8 @@
                                coinAddress.setLabel(uuid);
                                memberMapper.insert(coinAddress);
                            }
                            if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(address)){
                                UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(address);
                            }
                           // 发送新增的地址到监听集合
                            usdtUpdateProducer.sendAddressMsg(address);
                        }
                        break;
                    case "ROC":
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtErc20UpdateService.java
@@ -20,7 +20,9 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Service
public class UsdtErc20UpdateService {
@@ -88,8 +90,10 @@
        Credentials credentials = Credentials.create(privateKey);
        EthUsdtContract contract = EthUsdtContract.load(contractAddr, getInstance(), credentials, getStaticGasProvider());
        EthFilter filter = getFilter(blockNum);
        Map<String,BigInteger> map = new HashMap<String,BigInteger>();
        map.put("blockNum",blockNum);
        contract.transferEventFlowable(filter).subscribe(e->{
            if(e!=null && StringUtils.isNotBlank(e.to)){
            if(e!=null && StringUtils.isNotBlank(e.to) &&  e.log.getBlockNumber()!=null){
                String transactionHash = e.log.getTransactionHash();
                BigInteger blockNumber1 = e.log.getBlockNumber();
                String toAddress = e.to;
@@ -102,13 +106,16 @@
                    EthUsdtChargeDto dto = new EthUsdtChargeDto(toAddress,transactionHash,divide);
                    usdtUpdateProducer.sendMsg(JSONObject.toJSONString(dto));
                }
                redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString());
                if(map.get("blockNum").compareTo(blockNumber1)!=0){
                    redisUtils.set(USDT_BLOCK_NUM_GOLDEN,blockNumber1.toString());
                    map.put("blockNum",blockNumber1);
                }
            }
        });
    }
    private static EthFilter getFilter(BigInteger startBlock) {
        if (startBlock != null) {
            EthFilter filter = new EthFilter(new DefaultBlockParameterNumber(startBlock),
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java
@@ -29,10 +29,10 @@
    private static final BigDecimal LIMIT = new BigDecimal("50");
    private static final BigDecimal LIMIT_ETH = new BigDecimal("0.2");
    private static final BigDecimal FEE = new BigDecimal("0.005");
    private static final BigDecimal FEE = new BigDecimal("0.0042");
    private static final BigDecimal ETH_TR_FEE = new BigDecimal("0.0032");
    public static String ETH_FEE = "0.005";
    public static String ETH_FEE = "0.0042";
    public static final String TOTAL_ADDRESS = "0x3d83A28B6C2d599d2B6D272c5DBcDC9c976d344F";
    public static final String TOTAL_PRIVATE = "4a1ce332133d8917360c5f3b194f703a0cf5b86c4eea319b1cd01197e68dad27";
@@ -59,13 +59,13 @@
                }
                BigDecimal usdt = ethService.tokenGetBalance(address);
                log.info("地址:{}, 金额:{}", address, usdt);
                //log.info("地址:{}, 金额:{}", address, usdt);
                if (usdt != null && usdt.compareTo(LIMIT) > 0) {
                    usdt = usdt.subtract(new BigDecimal("0.01"));
                    // 查询eth是否足够
                    BigDecimal eth = EthService.getEthBlance(address);
                    log.info("地址:{}, ETH:{}", address, eth);
                    //log.info("地址:{}, ETH:{}", address, eth);
                    if (eth != null && eth.compareTo(FEE) >= 0) {
                        MemberCoinAddressEntity memberCoinAddressEntity = memberCoinAddressDao.selectBlockAddressWithTag(memberId, CoinTypeEnum.USDT.name(), "ERC20");
                        if (memberCoinAddressEntity == null) {
@@ -82,14 +82,14 @@
                        String hash = ethService.tokenSend(privateKey, address, TOTAL_ADDRESS, usdtStr);
                        log.info("归集:{}", hash);
                        if (StrUtil.isNotBlank(hash)) {
                            // 归集成功更新状态 先保存本次的hash值,待交易成功后再更新
                            coinCharge.setHash(hash);
                            memberCoinChargeDao.updateById(coinCharge);
                        }
//                        if (StrUtil.isNotBlank(hash)) {
//                            // 归集成功更新状态 先保存本次的hash值,待交易成功后再更新
//                            coinCharge.setHash(hash);
//                            memberCoinChargeDao.updateById(coinCharge);
//                        }
                    } else {
                        String hash = ethService.ethSend(TOTAL_PRIVATE, TOTAL_ADDRESS, address, ETH_FEE);
                        log.info("转手续费:{}", hash);
                        //log.info("转手续费:{}", hash);
                    }
                }
            }
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinDealDao.java
@@ -1,5 +1,6 @@
package com.xcong.excoin.modules.coin.dao;
import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
@@ -15,6 +16,8 @@
public interface OrderCoinDealDao  extends BaseMapper<OrderCoinsDealEntity>{
    
    List<OrderCoinsDealEntity> selectAllWalletCoinOrder(@Param("memberId")Long memberId);
    BigDecimal sumTodayBuyAmount(@Param("memberId")Long memberId, @Param("symbol") String symbol);
    BigDecimal sumTodayEntrustCntBuyAmount(@Param("memberId")Long memberId, @Param("symbol") String symbol);
    List<OrderCoinsDealEntity> selectCoinOrderDealByOrderId(@Param("orderId")Long orderId);
    List<OrderCoinsDealEntity> selectAllWalletCoinOrderBySymbol(@Param("memberId")Long memberId,@Param("symbol")String symbol);
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -347,6 +347,38 @@
        if(!MemberEntity.CERTIFY_STATUS_Y.equals(memberEntity.getCertifyStatus())){
            return Result.fail(MessageSourceUtils.getString("member_controller_0001"));
        }
        // 需要先
        String phone = memberEntity.getPhone();
        if(!"13632989240".equals(phone) && !"15158130575".equals(phone)){
            if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
                // 不能超过800个
                if(amount!=null && amount.compareTo(new BigDecimal("800"))>0){
                    return Result.fail("买入额度受限");
                }
                BigDecimal bigDecimal = orderCoinDealDao.sumTodayBuyAmount(memberId, symbol);
                if(bigDecimal==null){
                    bigDecimal= BigDecimal.ZERO;
                }
                amount= amount==null?BigDecimal.ZERO:amount;
                bigDecimal = bigDecimal.add(amount);
                if(bigDecimal!=null && bigDecimal.compareTo(new BigDecimal("800"))>0){
                    return Result.fail("买入额度受限");
                }
                // 挂单不能超过800
                BigDecimal bigDecimal1 = orderCoinDealDao.sumTodayEntrustCntBuyAmount(memberId, symbol);
                if(bigDecimal1==null){
                    bigDecimal1=BigDecimal.ZERO;
                }
                bigDecimal1 = bigDecimal1.add(amount);
                if(bigDecimal1!=null && bigDecimal1.compareTo(new BigDecimal("800"))>0){
                    return Result.fail("买入额度受限");
                }
            }else{
                return Result.fail("卖出受限");
            }
        }
        BigDecimal nowPriceinBigDecimal = price;
        //查询当前价
        //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
@@ -520,7 +552,7 @@
            if (SymbolsConstats.EXCHANGE_SYMBOLS.contains(orderCoinsEntity.getSymbol())) {
                orderSubmitProducer.sendCancelMsg(orderId);
               // return this.cancelEntrustWalletCoinOrderForMatch(orderId);
                return Result.ok("order_service_0013");
                return Result.ok(MessageSourceUtils.getString("order_service_0013"));
            }
            if (orderCoinsEntity.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_CANCEL || orderCoinsEntity.getOrderStatus()==OrderCoinsEntity.ORDERSTATUS_DONE) {
                return Result.fail(MessageSourceUtils.getString("order_service_0012"));
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -363,6 +363,9 @@
        // 存储昨日K线
        if("day".equals(rangeUnit)){
            System.out.println("存储日K线");
            kLine.setOpen(kLine.getClose());
            kLine.setLow(kLine.getClose());
            kLine.setHigh(kLine.getClose());
            redisUtils.set("ROC/USDT",kLine);
        }
    }
src/main/java/com/xcong/excoin/processor/MarketService.java
@@ -88,7 +88,17 @@
            list = (List) data;
        }
        list.add(kLine);
        redisUtils.set(key, list);
        int size = list.size();
        if(size>500){
            list = list.subList(size-500,size);
        }
        List lines = new ArrayList();
        if(CollectionUtils.isNotEmpty(list)){
            for(Object object:list){
                lines.add(object);
            }
        }
        redisUtils.set(key, lines);
        //  mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod());
    }
@@ -122,4 +132,15 @@
        return totalVolume;
    }
    public static void main(String[] args) {
        List<String> list = new ArrayList<>();
        list.add("1");
        list.add("2");
        list.add("3");
        list.add("4");
        list.add("5");
        list=list.subList(2,5);
        System.out.println(list);
    }
}
src/main/java/com/xcong/excoin/quartz/job/NotionalPoolingJob.java
@@ -27,7 +27,7 @@
    /**
     * usdt 归集
     */
   @Scheduled(cron = "0 5/30 * * * ? ")
   @Scheduled(cron = "0 15/30 * * * ? ")
    public void poolUsdtEth() {
        try {
            log.info("USDT归集开始");
src/main/java/com/xcong/excoin/rabbit/consumer/UsdtUpdateConsumer.java
@@ -3,6 +3,7 @@
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.blackchain.model.EthUsdtChargeDto;
import com.xcong.excoin.modules.blackchain.service.UsdtErc20UpdateService;
import com.xcong.excoin.modules.coin.service.BlockCoinService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
@@ -32,4 +33,12 @@
        // 更新这个用户的余额
        blockCoinService.updateEthUsdtNew(ethUsdtChargeDto);
    }
    @RabbitListener(queues = RabbitMqConfig.QUEUE_USDT_ADDRESS)
    public void addUsdtAddress(String content) {
        if(!UsdtErc20UpdateService.ALL_ADDRESS_LIST.contains(content)){
            log.info("#添加新地址---->{}#", content);
            UsdtErc20UpdateService.ALL_ADDRESS_LIST.add(content);
        }
    }
}
src/main/java/com/xcong/excoin/rabbit/producer/UsdtUpdateProducer.java
@@ -29,6 +29,11 @@
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_UPDATE, RabbitMqConfig.ROUTING_KEY_USDT_UPDATE, content, correlationData);
    }
    public void sendAddressMsg(String content) {
        CorrelationData correlationData = new CorrelationData(IdUtil.simpleUUID());
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_USDT_ADDRESS, RabbitMqConfig.ROUTING_KEY_USDT_ADDRESS, content, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -45,7 +45,7 @@
    @OnOpen
    public void onOpen(Session session) {
        onlineCount.incrementAndGet(); // 在线数加1
        log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
       // log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }
    /**
@@ -67,7 +67,7 @@
                map.remove(session.getId());
            }
        }
        log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
        //log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
    }
    /**
@@ -227,8 +227,14 @@
            result.setRep(sub);
            if (o != null) {
                List<Candlestick> list = (List<Candlestick>) o;
                if(list!=null && list.size()>300){
                    int size = list.size();
                    list = list.subList(size-300,size);
                }
                CandlestickModel model = null;
                for (Candlestick candlestick : list) {
                    CandlestickModel model = new CandlestickModel();
                    model = new CandlestickModel();
                    model.setAmount(candlestick.getAmount());
                    model.setClose(candlestick.getClose());
                    model.setCount(candlestick.getCount());
@@ -248,8 +254,8 @@
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("发生错误");
        error.printStackTrace();
       // log.error("发生错误");
        //error.printStackTrace();
    }
    /**
src/main/resources/application-prodapp.yml
@@ -95,7 +95,7 @@
  redis_expire: 3000
  kline-update-job: false
  newest-price-update-job: false
  exchange-trade: true
  exchange-trade: false
  day-line: false
  other-job: false
  loop-job: false
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -12,6 +12,21 @@
        order by create_time desc
    </select>
    <select id="sumTodayBuyAmount" resultType="java.math.BigDecimal">
        select sum(symbol_cnt) from coins_order_deal
        where   member_id = #{memberId} and symbol=#{symbol}
                and order_type =1
                and order_status=3
                and DATE_FORMAT(create_time, '%Y-%m-%d') = DATE_FORMAT(now(),'%Y-%m-%d')
    </select>
    <select id="sumTodayEntrustCntBuyAmount" resultType="java.math.BigDecimal">
        select sum(entrust_cnt) from coins_order
        where   member_id = #{memberId} and symbol=#{symbol}
                and order_type =1
                and order_status=1
                and DATE_FORMAT(create_time, '%Y-%m-%d') = DATE_FORMAT(now(),'%Y-%m-%d')
    </select>
    <select id="selectCoinOrderDealByOrderId" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
        select * from coins_order_deal where order_id = #{orderId}
        order by create_time desc