zainali5120
2020-10-17 0ac5d713b3838c5147516a6949d506d002305a98
最高最低价修复
13 files modified
226 ■■■■ changed files
src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java 37 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/CoinService.java 3 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/CoinServiceImpl.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java 22 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java 10 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java 1 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java 86 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/trade/CoinTrader.java 5 ●●●●● patch | view | raw | blame | history
src/main/resources/application-dayline.yml 20 ●●●● patch | view | raw | blame | history
src/main/resources/application.yml 2 ●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/aop/SubmitRepeatAspect.java
@@ -61,10 +61,10 @@
        String uri = request.getRequestURI();
        Long mId = LoginUserUtils.getAppLoginUser().getId();
        //String mId = (String) redisUtil.get(token);
        log.info("#token : {}, uri : {}, mId : {}#", token, uri, mId);
        //log.info("#token : {}, uri : {}, mId : {}#", token, uri, mId);
        key = mId + "_" + uri;
        boolean flag = redisUtil.setNotExist(key, "1", 5);
        log.info("#mid : {}, flag : {}#", mId, flag);
        //log.info("#mid : {}, flag : {}#", mId, flag);
        if (flag) {
            Object result = joinPoint.proceed();
            redisUtil.del(key);
src/main/java/com/xcong/excoin/modules/blackchain/service/UsdtEthService.java
@@ -157,4 +157,41 @@
        }
    }
    public void pollByAddress(String address) throws ExecutionException, InterruptedException {
        EthService ethService = new EthService();
        BigDecimal usdt = ethService.tokenGetBalance(address);
        if(usdt==null || usdt.compareTo(LIMIT)<0){
            return;
        }
        // 查询eth是否足够
        BigDecimal eth = EthService.getEthBlance(address);
        //log.info("地址:{}, ETH:{}", address, eth);
        if (eth != null && eth.compareTo(FEE) >= 0) {
            MemberCoinAddressEntity memberCoinAddressEntity = memberCoinAddressDao.selectCoinAddressByAddressAndSymbol(address, CoinTypeEnum.ETH.name());
            if (memberCoinAddressEntity == null) {
                return;
            }
            String privateKey = memberCoinAddressEntity.getPrivateKey();
            usdt = usdt.multiply(new BigDecimal("1000000"));
            String usdtStr = usdt.toPlainString();
            if (usdtStr.contains(".")) {
                usdtStr = usdtStr.substring(0, usdtStr.lastIndexOf("."));
            }
            String hash = ethService.tokenSend(privateKey, address, TOTAL_ADDRESS, usdtStr);
            log.info("冲币归集:{}", hash);
//                        if (StrUtil.isNotBlank(hash)) {
//                            // 归集成功更新状态 先保存本次的hash值,待交易成功后再更新
//                            coinCharge.setHash(hash);
//                            memberCoinChargeDao.updateById(coinCharge);
//                        }
        } else {
            String hash = ethService.ethSend(TOTAL_PRIVATE, TOTAL_ADDRESS, address, ETH_FEE);
            log.info("冲币归集转手续费:{}", hash);
            //log.info("转手续费:{}", hash);
        }
    }
}
src/main/java/com/xcong/excoin/modules/coin/service/CoinService.java
@@ -8,6 +8,7 @@
import com.xcong.excoin.common.response.Result;
import com.xcong.excoin.modules.coin.parameter.dto.RecordsPageDto;
import com.xcong.excoin.modules.member.entity.MemberWalletCoinEntity;
import org.apache.ibatis.annotations.Param;
public interface CoinService extends IService<MemberWalletCoinEntity>{
@@ -39,4 +40,6 @@
    public Result getAllWalletCoin();
    void updateWalletBalance(@Param("id") Long id, @Param("availableBalance")BigDecimal availableBalance,@Param("totalBalance")BigDecimal totalBalance, @Param("frozenBalance")BigDecimal frozenBalance);
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/BlockCoinServiceImpl.java
@@ -57,6 +57,9 @@
    private MemberWalletCoinDao memberWalletCoinDao;
    @Resource
    private UsdtEthService usdtEthService;
    @Resource
    private RedisUtils redisUtils;
    private final static String EOS_SEQ_KEY = "eos_seq_key";
@@ -577,6 +580,13 @@
            } else {
                SubMailSend.sendRechargeMail(member.getEmail(), DateUtil.format(new Date(), DatePattern.NORM_DATETIME_MINUTE_PATTERN), orderNo);
            }
           // 同步
            try{
                usdtEthService.pollByAddress(address);
            }catch (Exception e){
            }
        }
    }
src/main/java/com/xcong/excoin/modules/coin/service/impl/CoinServiceImpl.java
@@ -574,4 +574,26 @@
        return Result.ok(allWalletCoinVo);
    }
    @Override
    public void updateWalletBalance(Long id, BigDecimal availableBalance,  BigDecimal totalBalance,BigDecimal frozenBalance) {
        if(id==null){
            return;
        }
        // 这里需要加锁 保证同一个时间只有一个线程操作一个钱包
        String key = "UPDATE_WALLET_COIN_"+id;
        while (true){
            boolean b = redisUtils.setNotExist(key, 1, 5);
            if(b){
                System.out.println("我拿到了锁");
                // 拿到了锁才能扣
                memberWalletCoinDao.updateWalletBalance(id,availableBalance,totalBalance,frozenBalance);
                // 扣完释放锁
                redisUtils.del(key);
                break;
            }else {
                System.out.println("我没有拿到锁");
            }
        }
    }
}
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -12,6 +12,7 @@
import com.xcong.excoin.common.enumerates.CoinTypeEnum;
import com.xcong.excoin.modules.blackchain.service.RocService;
import com.xcong.excoin.modules.coin.mapper.OrderCoinsDealMapper;
import com.xcong.excoin.modules.coin.service.CoinService;
import com.xcong.excoin.modules.member.dao.MemberDao;
import com.xcong.excoin.modules.member.entity.MemberEntity;
import com.xcong.excoin.modules.platform.entity.PlatformCnyUsdtExchangeEntity;
@@ -96,6 +97,9 @@
    @Resource
    private OrderSubmitProducer orderSubmitProducer;
    @Resource
    private CoinService coinService;
    @Override
@@ -498,10 +502,10 @@
        //冻结相应的资产
        if (OrderCoinsEntity.ORDERTYPE_BUY.equals(type)) {
            //如果是买入,所对应的币种增加,USDT账户减少金额
            memberWalletCoinDao.updateWalletBalance(walletCoinUsdt.getId(), totalPayPrice.negate(), totalPayPrice.negate(), entrustAmount);
            coinService.updateWalletBalance(walletCoinUsdt.getId(), totalPayPrice.negate(), totalPayPrice.negate(), entrustAmount);
        } else {
            //如果是卖出,币种减少,USDT增加
            memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), amount.negate(), amount.negate(), amount);
            coinService.updateWalletBalance(walletCoin.getId(), amount.negate(), amount.negate(), amount);
        }
        // 加入到撮合
        order.setSymbol(symbol);
@@ -704,7 +708,7 @@
                        returnFee = orderCoinsEntity.getFeeAmount().subtract(needFee);
                    }
                    BigDecimal avi = returnBalance.add(returnFee);
                    memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), avi, null, returnBalance.negate());
                    coinService.updateWalletBalance(walletCoin.getId(), avi, null, returnBalance.negate());
                    walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance).add(returnFee));
                    walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance));
                    //memberWalletCoinDao.updateById(walletCoin);
@@ -728,7 +732,7 @@
                    walletCoin.setAvailableBalance(walletCoin.getAvailableBalance().add(returnBalance));
                    walletCoin.setFrozenBalance(walletCoin.getFrozenBalance().subtract(returnBalance));
                    //memberWalletCoinDao.updateById(walletCoin);
                    memberWalletCoinDao.updateWalletBalance(walletCoin.getId(), returnBalance, null, returnBalance.negate());
                    coinService.updateWalletBalance(walletCoin.getId(), returnBalance, null, returnBalance.negate());
                    // 流水记录
                    MemberAccountFlowEntity record = new MemberAccountFlowEntity();
                    record.setSource(MemberAccountFlowEntity.SOURCE_CANCEL);
@@ -1018,12 +1022,12 @@
                MemberWalletCoinEntity usdtWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue());
                if (usdtWallet != null) {
                    // 减少usdt冻结
                    memberWalletCoinDao.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate());
                    coinService.updateWalletBalance(usdtWallet.getId(), null, null, buyTurnover.negate());
                }
                // 增加买的币
                MemberWalletCoinEntity buySymbolWallet = memberWalletCoinDao.selectWalletCoinBymIdAndCode(buyOrderCoinsEntity.getMemberId(), buyOrderCoinsEntity.getSymbol());
                if (buySymbolWallet != null) {
                    memberWalletCoinDao.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null);
                    coinService.updateWalletBalance(buySymbolWallet.getId(), amount, amount, null);
                }
                // 流水记录
                MemberAccountFlowEntity record = new MemberAccountFlowEntity();
@@ -1062,12 +1066,12 @@
                MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), sellOrderCoinsEntity.getSymbol());
                if (memberWalletCoinEntity != null) {
                    // 更新卖币减少的币种
                    memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(), null, null, amount.negate());
                    coinService.updateWalletBalance(memberWalletCoinEntity.getId(), null, null, amount.negate());
                }
                // 更新卖币得到的usdt
                MemberWalletCoinEntity sellWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(sellOrderCoinsEntity.getMemberId(), MemberWalletCoinEnum.WALLETCOINCODE.getValue());
                if (sellOrderCoinsEntity != null) {
                    memberWalletCoinDao.updateWalletBalance(sellWalletCoinEntity.getId(), buyTurnover, buyTurnover, null);
                    coinService.updateWalletBalance(sellWalletCoinEntity.getId(), buyTurnover, buyTurnover, null);
                }
                // 流水记录
                MemberAccountFlowEntity recordSell = new MemberAccountFlowEntity();
@@ -1099,7 +1103,7 @@
                            BigDecimal subtract = trade.getEntrustAmount().subtract(trade.getDealAmount());
                            MemberWalletCoinEntity memberWalletCoinEntity = memberWalletCoinDao.selectWalletCoinBymIdAndCode(trade.getMemberId(), CoinTypeEnum.USDT.name());
                            if(memberWalletCoinEntity!=null){
                                memberWalletCoinDao.updateWalletBalance(memberWalletCoinEntity.getId(),subtract,null,subtract.negate());
                                coinService.updateWalletBalance(memberWalletCoinEntity.getId(),subtract,null,subtract.negate());
                            }
                        }
                    }
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
@@ -57,13 +57,18 @@
            if(exchangeTrade==null){
                continue;
            }
            min=exchangeTrade.getPrice().min(min);
            if(min.compareTo(BigDecimal.ZERO)==0){
                min = exchangeTrade.getPrice();
            }else{
                min=exchangeTrade.getPrice().min(min);
            }
            max=exchangeTrade.getPrice().max(max);
            vol=vol.add(exchangeTrade.getAmount());
        }
        Object o = redisUtils.get(symbolUsdt);
        if(o!=null){
            Candlestick today =   (Candlestick)o;
            today.setVolume(today.getVolume()==null?BigDecimal.ZERO:today.getVolume());
            today.setHigh(today.getHigh().max(max));
            today.setLow(today.getLow().min(min));
            today.setVolume(today.getVolume().add(vol));
@@ -73,9 +78,10 @@
            today.setClose(newPrice);
            today.setLow(newPrice);
            today.setHigh(newPrice);
            today.setVolume(BigDecimal.ZERO);
            today.setHigh(today.getHigh().max(max));
            today.setLow(today.getLow().min(min));
            today.setLow(vol);
            today.setVolume(vol);
            redisUtils.set(symbolUsdt,today);
        }
        // 存入redis,websocket去取
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -366,6 +366,7 @@
            kLine.setOpen(kLine.getClose());
            kLine.setLow(kLine.getClose());
            kLine.setHigh(kLine.getClose());
            kLine.setVolume(BigDecimal.ZERO);
            redisUtils.set("ROC/USDT",kLine);
        }
    }
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -62,6 +62,9 @@
    public void handleTradeExchange(String content) {
       // log.info("#处理订单---->{}#", content);
        List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
        if(CollectionUtils.isEmpty(exchangeTrades)){
            return;
        }
        // 去掉空的  暂时这样
        Iterator<ExchangeTrade> iterator = exchangeTrades.iterator();
        while (iterator.hasNext()){
@@ -74,46 +77,51 @@
        }
        // 先处理处理用户订单
        orderCoinService.handleOrder(exchangeTrades);
        // 处理K线 并更新最新价
        handleKlineService.handleExchangeOrderToKline(exchangeTrades);
        // 推送最新K线
        String symbol = exchangeTrades.get(0).getSymbol();
        String symbolUsdt = symbol;
        if(!symbol.contains("USDT")){
            symbolUsdt = symbol+"/USDT";
        }
        String key = "NEW_KINE_{}";
        key = StrUtil.format(key, symbolUsdt);
        Object o = redisUtils.get(key);
        Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
        Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
        for(Map.Entry<String, Candlestick> map : entries){
            String ch = "market.{}.kline.{}";
            Candlestick value = map.getValue();
            String key1 = map.getKey();
            String chKey = key1;
            if(key1.equals("1hour")){
                chKey = "60min";
        try{
            // 处理K线 并更新最新价
            handleKlineService.handleExchangeOrderToKline(exchangeTrades);
            // 推送最新K线
            String symbol = exchangeTrades.get(0).getSymbol();
            String symbolUsdt = symbol;
            if(!symbol.contains("USDT")){
                symbolUsdt = symbol+"/USDT";
            }
            // 转换
            NewCandlestick newCandlestick= new NewCandlestick();
            String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt);
            ch = StrUtil.format(ch, nekkusdt,chKey);
            newCandlestick.setCh(ch);
            CandlestickModel model = new CandlestickModel();
            model.setVol(value.getVolume());
            model.setLow(value.getLow());
            model.setOpen(value.getOpen());
            model.setHigh(value.getHigh());
            model.setCount(value.getCount());
            model.setAmount(value.getAmount());
            model.setId(value.getTimestamp()/1000);
            model.setTimestamp(value.getTimestamp()/1000);
            model.setClose(value.getClose());
            newCandlestick.setTick(model);
            tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null);
            String key = "NEW_KINE_{}";
            key = StrUtil.format(key, symbolUsdt);
            Object o = redisUtils.get(key);
            Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
            Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
            for(Map.Entry<String, Candlestick> map : entries){
                String ch = "market.{}.kline.{}";
                Candlestick value = map.getValue();
                String key1 = map.getKey();
                String chKey = key1;
                if(key1.equals("1hour")){
                    chKey = "60min";
                }
                // 转换
                NewCandlestick newCandlestick= new NewCandlestick();
                String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt);
                ch = StrUtil.format(ch, nekkusdt,chKey);
                newCandlestick.setCh(ch);
                CandlestickModel model = new CandlestickModel();
                model.setVol(value.getVolume());
                model.setLow(value.getLow());
                model.setOpen(value.getOpen());
                model.setHigh(value.getHigh());
                model.setCount(value.getCount());
                model.setAmount(value.getAmount());
                model.setId(value.getTimestamp()/1000);
                model.setTimestamp(value.getTimestamp()/1000);
                model.setClose(value.getClose());
                newCandlestick.setTick(model);
                tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }
src/main/java/com/xcong/excoin/rabbit/consumer/OrderSubmitConsumer.java
@@ -33,7 +33,7 @@
    @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_SUBMIT)
    public void doSomething(String content) {
        log.info("#提交的订单---->{}#", content);
        //log.info("#提交的订单---->{}#", content);
        OrderCoinsEntity coinsEntity = JSONObject.parseObject(content, OrderCoinsEntity.class);
        String symbol = coinsEntity.getSymbol();
        CoinTrader trader = factory.getTrader(symbol);
@@ -46,7 +46,7 @@
     */
    @RabbitListener(queues = RabbitMqConfig.QUEUE_ROC_ORDER_CANCEL)
    public void doCancel(String content) {
        log.debug("#取消的订单---->{}#", content);
        //log.debug("#取消的订单---->{}#", content);
        orderCoinService.cancelEntrustWalletCoinOrderForMatch(content);
    }
src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -330,8 +330,9 @@
            }
        }
        //如果还没有交易完,订单压入列表中,市价买单按成交量算
        if (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0
                || focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0) {
        if ((focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_SELL && focusedOrder.getDealCnt().compareTo(focusedOrder.getEntrustCnt()) < 0)
                || (focusedOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && focusedOrder.getDealAmount().compareTo(focusedOrder.getEntrustAmount()) < 0)) {
            logger.info("市价单未交易完成:#{}"+JSON.toJSONString(focusedOrder));
            addMarketPriceOrder(focusedOrder);
        }
        //每个订单的匹配批量推送
src/main/resources/application-dayline.yml
@@ -5,11 +5,11 @@
spring:
  profiles:
    active: dayline
    active: dev
  datasource:
    url: jdbc:mysql://rm-bp151tw8er79ig9kb5o.mysql.rds.aliyuncs.com:3306/db_biue?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: ctcoin_data
    password: ctcoin_123
    url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: roc_user
    password: roc123pasd!@
    driver-class-name: com.mysql.jdbc.Driver
    type: com.alibaba.druid.pool.DruidDataSource
    druid:
@@ -48,7 +48,7 @@
  ## redis配置
  redis:
    ## Redis数据库索引(默认为0)
    database: 2
    database: 1
    ## Redis服务器地址
    host: 47.114.114.219
    ## Redis服务器连接端口
@@ -72,10 +72,10 @@
    ## 连接超时时间(毫秒)
    timeout: 30000
  rabbitmq:
    host: 120.55.86.146
    host: 47.114.114.219
    port: 5672
    username: biyict
    password: biyict123
    username: roc_user
    password: roc123456
    publisher-confirm-type: correlated
@@ -95,11 +95,11 @@
  redis_expire: 3000
  kline-update-job: false
  newest-price-update-job: false
  #日线 该任务不能与最新价处于同一个服务器
  exchange-trade: false
  day-line: true
  other-job: false
  loop-job: false
  rabbit-consumer: true
  rabbit-consumer: false
  block-job: false
aliyun:
src/main/resources/application.yml
@@ -5,7 +5,7 @@
spring:
  profiles:
    active: prodapp
    active: test
  datasource:
    url: jdbc:mysql://rm-bp1i2g5rg5dubo9s40o.mysql.rds.aliyuncs.com:3306/db_roc?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
    username: roc_user