16 files modified
	
		
		4 files added
	
	
 
	
	
	
	
	
	
	
	
 |  |  | 
 |  |  | import javax.annotation.Resource;
 | 
 |  |  | import javax.validation.Valid;
 | 
 |  |  | 
 | 
 |  |  | import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
 | 
 |  |  | import org.springframework.validation.annotation.Validated;
 | 
 |  |  | import org.springframework.web.bind.annotation.GetMapping;
 | 
 |  |  | import org.springframework.web.bind.annotation.PostMapping;
 | 
 |  |  | 
 |  |  |             return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
 | 
 |  |  | 
 | 
 |  |  |         }else{
 | 
 |  |  |             return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
 | 
 |  |  |             return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
 | 
 |  |  |         }
 | 
 |  |  |     }
 | 
 |  |  |     
 | 
 
 |  |  | 
 |  |  |     Result enterTransactionPageOfWalletCoin(String symbol);
 | 
 |  |  | 
 | 
 |  |  |     Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price,
 | 
 |  |  |             BigDecimal amount);
 | 
 |  |  |             BigDecimal amount,BigDecimal entrustAmount);
 | 
 |  |  | 
 | 
 |  |  |     /**
 | 
 |  |  |      *  需要撮合交易的币种提交买卖单
 | 
 |  |  | 
 |  |  | 
 | 
 |  |  |     public void handleOrder(List<ExchangeTrade> trades);
 | 
 |  |  | 
 | 
 |  |  |     void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
 | 
 |  |  |                     BigDecimal amount,BigDecimal entrustAmount);
 | 
 |  |  | 
 | 
 |  |  | }
 | 
 
 |  |  | 
 |  |  | 
 | 
 |  |  |     @Override
 | 
 |  |  |     @Transactional
 | 
 |  |  |     public Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount) {
 | 
 |  |  |     public Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount,BigDecimal entrustAmount) {
 | 
 |  |  | 
 | 
 |  |  | 
 | 
 |  |  |         //获取用户ID
 | 
 |  |  |         Long memberId = LoginUserUtils.getAppLoginUser().getId();
 | 
 |  |  |         BigDecimal nowPriceinBigDecimal = price;
 | 
 |  |  |         //查询当前价
 | 
 |  |  |         BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
 | 
 |  |  | 
 | 
 |  |  |         if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type) && OrderCoinsEntity.TRADETYPE_MARKETPRICE.equals(tradeType)){
 | 
 |  |  |             amount = entrustAmount.divide(nowPrice,BigDecimal.ROUND_DOWN);
 | 
 |  |  |         }
 | 
 |  |  |         // 处理市价
 | 
 |  |  |         // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
 | 
 |  |  |         symbol = symbol.toUpperCase();
 | 
 |  |  |         MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
 | 
 |  |  | 
 |  |  |     @Override
 | 
 |  |  |     public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) {
 | 
 |  |  |         //获取用户ID
 | 
 |  |  |         Long memberId = 13L;
 | 
 |  |  |         Long memberId = LoginUserUtils.getAppLoginUser().getId();
 | 
 |  |  |         BigDecimal nowPriceinBigDecimal = price;
 | 
 |  |  |         //查询当前价
 | 
 |  |  |         //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
 | 
 |  |  | 
 |  |  |             if(exchangeTrade==null){
 | 
 |  |  |                 continue;
 | 
 |  |  |             }
 | 
 |  |  |             // 量
 | 
 |  |  |             BigDecimal amount = exchangeTrade.getAmount();
 | 
 |  |  |             // 买单ID
 | 
 |  |  |             Long buyOrderId = exchangeTrade.getBuyOrderId();
 | 
 |  |  |             // 成交金额(usdt)
 | 
 |  |  |             BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
 | 
 |  |  |             int direction = exchangeTrade.getDirection();
 | 
 |  |  |             // 成交价
 | 
 |  |  |             BigDecimal price = exchangeTrade.getPrice();
 | 
 |  |  |             // 卖单
 | 
 |  |  |             Long sellOrderId = exchangeTrade.getSellOrderId();
 | 
 |  |  |             // 买卖单都需要处理
 | 
 |  |  |             // 买单
 | 
 |  |  | 
 |  |  |             }
 | 
 |  |  |         }
 | 
 |  |  |     }
 | 
 |  |  | 
 | 
 |  |  |     @Override
 | 
 |  |  |     public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
 | 
 |  |  |                            BigDecimal amount,BigDecimal entrustAmount) {
 | 
 |  |  |         //获取用户ID
 | 
 |  |  |         Long memberId = 10L;
 | 
 |  |  |         BigDecimal nowPriceinBigDecimal = price;
 | 
 |  |  |         //查询当前价
 | 
 |  |  |         //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
 | 
 |  |  | 
 | 
 |  |  |         // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
 | 
 |  |  |         symbol = symbol.toUpperCase();
 | 
 |  |  | 
 | 
 |  |  |         // 手续费用(手续费=建仓价X数量X手续费率)
 | 
 |  |  |         BigDecimal closingPrice = BigDecimal.ZERO ;
 | 
 |  |  | 
 | 
 |  |  |         // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
 | 
 |  |  |         // 首先将单插入到数据库主表(委托表)
 | 
 |  |  |         // 创建订单
 | 
 |  |  |         OrderCoinsEntity order = new OrderCoinsEntity();
 | 
 |  |  |         //根据委托类型生成不同数据
 | 
 |  |  |         // 如果是限价交易直接插入主表数据
 | 
 |  |  |         order.setMemberId(memberId);
 | 
 |  |  |         order.setOrderNo(generateSimpleSerialno(memberId.toString()));
 | 
 |  |  |         order.setOrderType(type);
 | 
 |  |  |         order.setSymbol(symbol);
 | 
 |  |  |         //order.setMarkPrice(nowPrice);
 | 
 |  |  | 
 | 
 |  |  |         // 成交量 先设置为0
 | 
 |  |  |         order.setDealCnt(BigDecimal.ZERO);
 | 
 |  |  |         // 成交价
 | 
 |  |  |         //order.setDealPrice(price);
 | 
 |  |  |         // 成交金额
 | 
 |  |  |         order.setDealAmount(BigDecimal.ZERO);
 | 
 |  |  |         order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING);
 | 
 |  |  |         order.setTradeType(tradeType);
 | 
 |  |  |         // 手续费
 | 
 |  |  |         order.setFeeAmount(closingPrice);
 | 
 |  |  |         if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){
 | 
 |  |  |             // 限价 是需要价格和数量 可以得到成交金额
 | 
 |  |  |             // 下单量
 | 
 |  |  |             order.setEntrustCnt(amount);
 | 
 |  |  |             // 下单价格
 | 
 |  |  |             order.setEntrustPrice(price);
 | 
 |  |  |             order.setEntrustAmount(amount.multiply(price));
 | 
 |  |  |         }else{
 | 
 |  |  |             if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
 | 
 |  |  |                 // 市价 只有金额
 | 
 |  |  |                 order.setEntrustAmount(entrustAmount);
 | 
 |  |  |             }else{
 | 
 |  |  |                 // 下单量
 | 
 |  |  |                 order.setEntrustCnt(amount);
 | 
 |  |  |                 // 下单价格
 | 
 |  |  |                 order.setEntrustPrice(price);
 | 
 |  |  |                 order.setEntrustAmount(amount.multiply(price));
 | 
 |  |  |             }
 | 
 |  |  | 
 | 
 |  |  |         }
 | 
 |  |  |         orderCoinsDao.insert(order);
 | 
 |  |  |         // 加入到撮合
 | 
 |  |  |         CoinTrader trader = factory.getTrader(symbol);
 | 
 |  |  |         trader.trade(order);
 | 
 |  |  |     }
 | 
 |  |  | }
 | 
 
 |  |  | 
 |  |  |         calendar.set(Calendar.HOUR_OF_DAY, 0); | 
 |  |  |         long firstTimeOfToday = calendar.getTimeInMillis(); | 
 |  |  |         String period = "1min"; | 
 |  |  |         logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime); | 
 |  |  |         //logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime); | 
 |  |  |         List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period); | 
 |  |  |         coinThumb = new CoinThumb(); | 
 |  |  |         synchronized (coinThumb) { | 
 |  |  | 
 |  |  |                     //处理K线 | 
 |  |  |                     processTrade(currentKLine, exchangeTrade); | 
 |  |  |                     //处理今日概况信息 | 
 |  |  |                     logger.debug("处理今日概况信息"); | 
 |  |  |                     //logger.debug("处理今日概况信息"); | 
 |  |  |                     handleThumb(exchangeTrade); | 
 |  |  |                     //存储并推送成交信息 | 
 |  |  |                     handleTradeStorage(exchangeTrade); | 
 |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) { | 
 |  |  |         if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) { | 
 |  |  |         if (kLine.getClose()==null || kLine.getClose().compareTo(BigDecimal.ZERO)==0) { | 
 |  |  |             //第一次设置K线值 | 
 |  |  |             kLine.setOpen(exchangeTrade.getPrice()); | 
 |  |  |             kLine.setHigh(exchangeTrade.getPrice()); | 
 |  |  | 
 |  |  |             kLine.setClose(exchangeTrade.getPrice()); | 
 |  |  |         } | 
 |  |  |         kLine.setCount(kLine.getCount() + 1); | 
 |  |  |         kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount())); | 
 |  |  |         if(kLine.getVolume()==null){ | 
 |  |  |             kLine.setVolume(BigDecimal.ZERO); | 
 |  |  |         } | 
 |  |  |         kLine.setAmount(kLine.getVolume().add(exchangeTrade.getAmount())); | 
 |  |  |         BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()); | 
 |  |  |        // kLine.setTurnover(kLine.getTurnover().add(turnover)); | 
 |  |  |         kLine.setVolume(kLine.getVolume().add(turnover)); | 
 |  |  |         //kLine.setTimestamp(System.currentTimeMillis()); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void handleTradeStorage(ExchangeTrade exchangeTrade) { | 
 |  |  |         for (MarketHandler storage : handlers) { | 
 |  |  |             storage.handleTrade(symbol, exchangeTrade, coinThumb); | 
 |  |  |         } | 
 |  |  | //        for (MarketHandler storage : handlers) { | 
 |  |  | //            storage.handleTrade(symbol, exchangeTrade, coinThumb); | 
 |  |  | //        } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void handleKLineStorage(Candlestick kLine) { | 
 |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void handleThumb(ExchangeTrade exchangeTrade) { | 
 |  |  |         logger.info("handleThumb symbol = {}", this.symbol); | 
 |  |  |         //logger.info("handleThumb symbol = {}", this.symbol); | 
 |  |  |         synchronized (coinThumb) { | 
 |  |  |             if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) { | 
 |  |  |                 //第一笔交易记为开盘价 | 
 |  |  | 
 |  |  |                 coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP)); | 
 |  |  |             } | 
 |  |  |             if ("USDT".equalsIgnoreCase(baseCoin)) { | 
 |  |  |                 logger.info("setUsdRate", exchangeTrade.getPrice()); | 
 |  |  |                // logger.info("setUsdRate", exchangeTrade.getPrice()); | 
 |  |  |                 coinThumb.setUsdRate(exchangeTrade.getPrice()); | 
 |  |  |             } else { | 
 |  |  |  | 
 |  |  | 
 |  |  |         calendar.add(field, -range); | 
 |  |  |         String fromTime = df.format(calendar.getTime()); | 
 |  |  |         long startTick = calendar.getTimeInMillis(); | 
 |  |  |         System.out.println("time range from " + fromTime + " to " + endTime); | 
 |  |  |         //System.out.println("time range from " + fromTime + " to " + endTime); | 
 |  |  |         List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick); | 
 |  |  |  | 
 |  |  |         Candlestick kLine = new Candlestick(); | 
 |  |  | 
 |  |  |             kLine.setLow(coinThumb.getClose()); | 
 |  |  |             kLine.setHigh(coinThumb.getClose()); | 
 |  |  |         } | 
 |  |  |         logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine)); | 
 |  |  |         //logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine)); | 
 |  |  |         service.saveKLine(symbol,period, kLine); | 
 |  |  |         // 生成一个对应的新K线 后续的交易会更新这个最新K线数据 | 
 |  |  |         Candlestick newKline = new Candlestick(); | 
 |  |  | 
 |  |  |         newKline.setOpen(kLine.getClose()); | 
 |  |  |         newKline.setVolume(BigDecimal.ZERO); | 
 |  |  |         newKline.setHigh(kLine.getClose()); | 
 |  |  |         calendar.add(field, 2*range); | 
 |  |  |         newKline.setTimestamp(calendar.getTimeInMillis()); | 
 |  |  |         currentKlineMap.put(period,newKline); | 
 |  |  |  | 
 |  |  |         // 存储昨日K线 | 
 
 |  |  | 
 |  |  | //        Query query = new Query(criteria).with(sort); | 
 |  |  | // | 
 |  |  | //        return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol); | 
 |  |  |         return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart)); | 
 |  |  |         return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeEnd)); | 
 |  |  |         // return null; | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void saveKLine(String symbol, String period, Candlestick kLine) { | 
 |  |  |         // 先获取 | 
 |  |  |         String key = "KINE_" + symbol + "_" + period; | 
 |  |  |         String key = "KINE_" + symbol + "/USDT_" + period; | 
 |  |  |         Object data = redisUtils.get(key); | 
 |  |  |         List list = new ArrayList(); | 
 |  |  |         if (data != null) { | 
 |  |  |             list = (List) data; | 
 |  |  |         } | 
 |  |  |         list.add(kLine); | 
 |  |  |         redisUtils.set("KINE_" + symbol + "_" + period, list); | 
 |  |  |         redisUtils.set(key, list); | 
 |  |  |         //  mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod()); | 
 |  |  |     } | 
 |  |  |  | 
 
 |  |  | 
 |  |  | package com.xcong.excoin.quartz.job; | 
 |  |  |  | 
 |  |  | import cn.hutool.core.util.StrUtil; | 
 |  |  | import com.alibaba.fastjson.JSON; | 
 |  |  | import com.huobi.client.SubscriptionClient; | 
 |  |  | import com.huobi.client.SubscriptionOptions; | 
 |  |  | import com.huobi.client.model.Candlestick; | 
 |  |  | import com.huobi.client.model.enums.CandlestickInterval; | 
 |  |  | import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao; | 
 |  |  | import com.xcong.excoin.modules.coin.dao.OrderCoinsDao; | 
 |  |  | import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity; | 
 |  |  | import com.xcong.excoin.modules.coin.service.OrderCoinService; | 
 |  |  | 
 |  |  | import com.xcong.excoin.rabbit.producer.ExchangeProducer; | 
 |  |  | import com.xcong.excoin.trade.CoinTrader; | 
 |  |  | import com.xcong.excoin.trade.CoinTraderFactory; | 
 |  |  | import com.xcong.excoin.trade.ExchangeTrade; | 
 |  |  | import com.xcong.excoin.utils.CoinTypeConvert; | 
 |  |  | import com.xcong.excoin.utils.RedisUtils; | 
 |  |  | import lombok.extern.slf4j.Slf4j; | 
 |  |  | 
 |  |  | import java.text.ParseException; | 
 |  |  | import java.util.ArrayList; | 
 |  |  | import java.util.List; | 
 |  |  | import java.util.Map; | 
 |  |  |  | 
 |  |  | /** | 
 |  |  |  *  开启撮合交易 | 
 |  |  | 
 |  |  |  | 
 |  |  |     @Resource | 
 |  |  |     private OrderCoinsDao orderCoinsDao; | 
 |  |  |     @Resource | 
 |  |  |     private OrderCoinDealDao orderCoinDealDao; | 
 |  |  |  | 
 |  |  |     @Resource | 
 |  |  |     private CoinTraderFactory factory; | 
 |  |  |  | 
 |  |  |     @Resource | 
 |  |  |     private OrderCoinService coinService; | 
 |  |  |     private RedisUtils redisUtils; | 
 |  |  |  | 
 |  |  |     @Resource | 
 |  |  |     private MarketService marketService; | 
 |  |  | 
 |  |  |         processor.initializeThumb(); | 
 |  |  |         //processor.initializeUsdRate(); | 
 |  |  |         processor.setIsHalt(false); | 
 |  |  |         List<ExchangeTrade> nekk = orderCoinDealDao.selectOrderCoinDealByTime("NEKK", null, null); | 
 |  |  |         processor.process(nekk); | 
 |  |  |         String symbolUsdt = symbol; | 
 |  |  |         if(!symbol.contains("USDT")){ | 
 |  |  |             symbolUsdt = symbol+"/USDT"; | 
 |  |  |         } | 
 |  |  |         String key = "NEW_KINE_{}"; | 
 |  |  |         key = StrUtil.format(key, symbolUsdt); | 
 |  |  |         Object o = redisUtils.get(key); | 
 |  |  |         if(o!=null){ | 
 |  |  |             Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; | 
 |  |  |             ((DefaultCoinProcessor) processor).setCurrentKlineMap(currentKlineMap); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         processorFactory.addProcessor(symbol, processor); | 
 |  |  |  | 
 |  |  |     } | 
 |  |  | } | 
 
 |  |  | 
 |  |  | package com.xcong.excoin.quartz.job; | 
 |  |  |  | 
 |  |  | import com.alibaba.fastjson.JSON; | 
 |  |  | import com.alibaba.fastjson.JSONObject; | 
 |  |  | import com.huobi.client.model.Candlestick; | 
 |  |  | import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao; | 
 |  |  | import com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity; | 
 |  |  | import com.xcong.excoin.modules.coin.service.OrderCoinService; | 
 |  |  | import com.xcong.excoin.processor.CoinProcessorFactory; | 
 |  |  | import com.xcong.excoin.trade.TradePlateModel; | 
 |  |  | import com.xcong.excoin.utils.RedisUtils; | 
 |  |  | import com.xcong.excoin.websocket.CandlestickModel; | 
 |  |  | import com.xcong.excoin.websocket.NewCandlestick; | 
 |  |  | import com.xcong.excoin.websocket.TradePlateSendWebSocket; | 
 |  |  | import lombok.extern.slf4j.Slf4j; | 
 |  |  | import org.springframework.beans.factory.annotation.Autowired; | 
 |  |  | 
 |  |  |  | 
 |  |  | import javax.annotation.Resource; | 
 |  |  | import java.math.BigDecimal; | 
 |  |  | import java.math.RoundingMode; | 
 |  |  | import java.util.ArrayList; | 
 |  |  | import java.util.Calendar; | 
 |  |  | import java.util.List; | 
 |  |  | import java.util.Random; | 
 |  |  |  | 
 |  |  | /** | 
 |  |  |  * 生成各时间段的K线信息 | 
 |  |  | 
 |  |  |     private CoinProcessorFactory processorFactory; | 
 |  |  |  | 
 |  |  |     @Resource | 
 |  |  |     private TradePlateSendWebSocket plateSendWebSocket; | 
 |  |  |     private OrderCoinService orderCoinService; | 
 |  |  |  | 
 |  |  |     @Resource | 
 |  |  |     private RedisUtils redisUtils; | 
 |  |  |  | 
 |  |  |     @Scheduled(cron = "0/10 * * * * *") | 
 |  |  |     public void tradePlate(){ | 
 |  |  |         redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20)); | 
 |  |  |         Candlestick candlestick = new Candlestick(); | 
 |  |  |         candlestick.setOpen(new BigDecimal("10.33")); | 
 |  |  |         candlestick.setHigh(new BigDecimal("15.23")); | 
 |  |  |         candlestick.setVolume(new BigDecimal("12121.34")); | 
 |  |  |         candlestick.setLow(new BigDecimal("8.234")); | 
 |  |  |         candlestick.setAmount(new BigDecimal("1199")); | 
 |  |  |         candlestick.setTimestamp(1599840000); | 
 |  |  |         candlestick.setId(1599840000L); | 
 |  |  |         candlestick.setCount(100002); | 
 |  |  |         candlestick.setClose(new BigDecimal("12.2323")); | 
 |  |  |         //redisUtils.set("NEKK/USDT",candlestick); | 
 |  |  |         // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]] | 
 |  |  |         TradePlateModel tradePlateModel = new TradePlateModel(); | 
 |  |  |         List<BigDecimal> buy; | 
 |  |  |         List<BigDecimal> sell; | 
 |  |  |         for(int i=0;i<5;i++){ | 
 |  |  |             buy = new ArrayList<>(2); | 
 |  |  |             buy.add(new BigDecimal(Math.random()*i)); | 
 |  |  |             buy.add(new BigDecimal(Math.random()*i)); | 
 |  |  |             tradePlateModel.getBuy().add(buy); | 
 |  |  |             sell = new ArrayList<>(2); | 
 |  |  |             sell.add(new BigDecimal(Math.random()*i*2)); | 
 |  |  |             sell.add(new BigDecimal(Math.random()*i*2)); | 
 |  |  |             tradePlateModel.getSell().add(sell); | 
 |  |  |  | 
 |  |  |     @Scheduled(cron = "0/1 * * * * *") | 
 |  |  |     public void test(){ | 
 |  |  | //        for(int i=1;i<=2;i++){ | 
 |  |  | //            OrderCoinsDealEntity detail = new OrderCoinsDealEntity(); | 
 |  |  | //            detail.setMemberId(13L); | 
 |  |  | //            //detail.setOrderId(111); | 
 |  |  | //            detail.setOrderNo("tete"); | 
 |  |  | //            detail.setOrderType(1); | 
 |  |  | //            detail.setTradeType(1); | 
 |  |  | //            detail.setSymbol("NEKK"); | 
 |  |  | //            detail.setSymbolCnt(new BigDecimal(10)); | 
 |  |  | //            detail.setEntrustPrice(new BigDecimal(10)); | 
 |  |  | //            detail.setDealPrice(new BigDecimal(i*10*Math.random())); | 
 |  |  | //            detail.setDealAmount(new BigDecimal(50)); | 
 |  |  | //            detail.setFeeAmount(new BigDecimal(1)); | 
 |  |  | //            detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE); | 
 |  |  | //            orderCoinDealDao.insert(detail); | 
 |  |  | //        } | 
 |  |  |         Random random = new Random(); | 
 |  |  |         Integer type = OrderCoinsDealEntity.ORDERTYPE_BUY; | 
 |  |  |         Integer tradeType = OrderCoinsDealEntity.TRADETYPE_FIXEDPRICE; | 
 |  |  |         int i = random.nextInt(100); | 
 |  |  |         if(i==0){ | 
 |  |  |             i=10; | 
 |  |  |         } | 
 |  |  |         log.info("准备发送消息"); | 
 |  |  |         plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); | 
 |  |  |         BigDecimal price =   new BigDecimal(i); | 
 |  |  |         orderCoinService.initOrders("NEKK",type,tradeType,price,new BigDecimal(2),null); | 
 |  |  |         orderCoinService.initOrders("NEKK",OrderCoinsDealEntity.ORDERTYPE_SELL,tradeType,price,new BigDecimal(2),null); | 
 |  |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  |      * 每分钟定时器,处理分钟K线 | 
 |  |  | 
 |  |  |     public void handle5minKLine(){ | 
 |  |  |  | 
 |  |  |         Calendar calendar = Calendar.getInstance(); | 
 |  |  |         log.debug("分钟K线:{}",calendar.getTime()); | 
 |  |  |         //log.debug("分钟K线:{}",calendar.getTime()); | 
 |  |  |         //将秒、微秒字段置为0 | 
 |  |  |         calendar.set(Calendar.SECOND,0); | 
 |  |  |         calendar.set(Calendar.MILLISECOND,0); | 
 |  |  | 
 |  |  |         int hour = calendar.get(Calendar.HOUR_OF_DAY); | 
 |  |  |         processorFactory.getProcessorMap().forEach((symbol,processor)->{ | 
 |  |  |             if(!processor.isStopKline()) { | 
 |  |  |                 log.debug("生成{}分钟k线:{}",symbol); | 
 |  |  |                 //log.debug("生成{}分钟k线:{}",symbol); | 
 |  |  |                 //生成1分钟K线 | 
 |  |  |                 processor.autoGenerate(); | 
 |  |  |                 processor.generateKLine(1, Calendar.MINUTE, time); | 
 |  |  |                 //更新24H成交量 | 
 |  |  |                 processor.update24HVolume(time); | 
 |  |  |                 if(minute%5 == 0) { | 
 
 |  |  | 
 |  |  | import com.xcong.excoin.modules.coin.service.OrderCoinService; | 
 |  |  | import com.xcong.excoin.modules.exchange.service.HandleKlineService; | 
 |  |  | import com.xcong.excoin.trade.ExchangeTrade; | 
 |  |  | import com.xcong.excoin.utils.CoinTypeConvert; | 
 |  |  | import com.xcong.excoin.utils.RedisUtils; | 
 |  |  | import com.xcong.excoin.websocket.CandlestickModel; | 
 |  |  | import com.xcong.excoin.websocket.NewCandlestick; | 
 |  |  | import com.xcong.excoin.websocket.TradePlateSendWebSocket; | 
 |  |  | import lombok.extern.slf4j.Slf4j; | 
 |  |  | import org.apache.commons.collections.CollectionUtils; | 
 |  |  | import org.springframework.amqp.rabbit.annotation.RabbitListener; | 
 |  |  | import org.springframework.stereotype.Component; | 
 |  |  |  | 
 |  |  | 
 |  |  |      */ | 
 |  |  |     @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) | 
 |  |  |     public void tradePlate(String content) { | 
 |  |  |         log.info("#盘口信息消费者---->{}#", content); | 
 |  |  |         //log.info("#盘口信息消费者---->{}#", content); | 
 |  |  |         tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  | 
 |  |  |      */ | 
 |  |  |     @RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE) | 
 |  |  |     public void handleTradeExchange(String content) { | 
 |  |  |         log.info("#---->{}#", content); | 
 |  |  |        // log.info("#处理订单---->{}#", content); | 
 |  |  |         List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); | 
 |  |  |         // 去掉空的  暂时这样 | 
 |  |  |         Iterator<ExchangeTrade> iterator = exchangeTrades.iterator(); | 
 |  |  | 
 |  |  |             if(iterator.next()==null){ | 
 |  |  |                 iterator.remove(); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |         if(CollectionUtils.isEmpty(exchangeTrades)){ | 
 |  |  |             return; | 
 |  |  |         } | 
 |  |  |         // 处理K线 并更新最新价 | 
 |  |  |         handleKlineService.handleExchangeOrderToKline(exchangeTrades); | 
 |  |  | 
 |  |  |         Object o = redisUtils.get(key); | 
 |  |  |         Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o; | 
 |  |  |         Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet(); | 
 |  |  |  | 
 |  |  |  | 
 |  |  |         for(Map.Entry<String, Candlestick> map : entries){ | 
 |  |  |             tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null); | 
 |  |  |             String ch = "market.{}.kline.{}"; | 
 |  |  |             Candlestick value = map.getValue(); | 
 |  |  |             String key1 = map.getKey(); | 
 |  |  |             String chKey = key1; | 
 |  |  |             if(key1.equals("1hour")){ | 
 |  |  |                 chKey = "60min"; | 
 |  |  |             } | 
 |  |  |             // 转换 | 
 |  |  |             NewCandlestick newCandlestick= new NewCandlestick(); | 
 |  |  |             String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt); | 
 |  |  |             ch = StrUtil.format(ch, nekkusdt,chKey); | 
 |  |  |             newCandlestick.setCh(ch); | 
 |  |  |             CandlestickModel model = new CandlestickModel(); | 
 |  |  |             model.setVol(value.getVolume()); | 
 |  |  |             model.setLow(value.getLow()); | 
 |  |  |             model.setOpen(value.getOpen()); | 
 |  |  |             model.setHigh(value.getHigh()); | 
 |  |  |             model.setCount(value.getCount()); | 
 |  |  |             model.setAmount(value.getAmount()); | 
 |  |  |             model.setId(value.getTimestamp()/1000); | 
 |  |  |             model.setTimestamp(value.getTimestamp()/1000); | 
 |  |  |             model.setClose(value.getClose()); | 
 |  |  |             newCandlestick.setTick(model); | 
 |  |  |             tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null); | 
 |  |  |         } | 
 |  |  |         // 处理用户订单 | 
 |  |  |         orderCoinService.handleOrder(exchangeTrades); | 
 
 |  |  | 
 |  |  |  | 
 |  |  |     @Override | 
 |  |  |     public void confirm(CorrelationData correlationData, boolean ack, String cause) { | 
 |  |  |         log.info("#----->{}#", correlationData); | 
 |  |  |         //log.info("#----->{}#", correlationData); | 
 |  |  |         if (ack) { | 
 |  |  |             log.info("success"); | 
 |  |  |             //log.info("success"); | 
 |  |  |         } else { | 
 |  |  |             log.info("--->{}", cause); | 
 |  |  |         } | 
 
 |  |  | 
 |  |  |      * 初始化交易线程 | 
 |  |  |      */ | 
 |  |  |     public void initialize() { | 
 |  |  |         logger.info("init CoinTrader for symbol {}", symbol); | 
 |  |  |         //logger.info("init CoinTrader for symbol {}", symbol); | 
 |  |  |         //买单队列价格降序排列 | 
 |  |  |         buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder()); | 
 |  |  |         //卖单队列价格升序排列 | 
 |  |  | 
 |  |  |         if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) { | 
 |  |  |             return; | 
 |  |  |         } | 
 |  |  |         logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId()); | 
 |  |  |         //logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId()); | 
 |  |  |         LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue; | 
 |  |  |         synchronized (list) { | 
 |  |  |             list.addLast(exchangeOrder); | 
 |  |  | 
 |  |  |         } | 
 |  |  |         //logger.info("trade order={}",exchangeOrder); | 
 |  |  |         if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) { | 
 |  |  |             logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT"); | 
 |  |  |             //logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT"); | 
 |  |  |             return; | 
 |  |  |         } | 
 |  |  |         // 如果 | 
 |  |  | 
 |  |  |                     OrderCoinsEntity matchOrder = orderIterator.next(); | 
 |  |  |                     //处理匹配 | 
 |  |  |                     ExchangeTrade trade = processMatch(focusedOrder, matchOrder); | 
 |  |  |                     exchangeTrades.add(trade); | 
 |  |  |                     if(trade!=null){ | 
 |  |  |                         exchangeTrades.add(trade); | 
 |  |  |                     } | 
 |  |  |  | 
 |  |  |                     //判断匹配单是否完成 | 
 |  |  |                     if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { | 
 |  |  |                         //当前匹配的订单完成交易,删除该订单 | 
 |  |  | 
 |  |  |                     if (trade != null) { | 
 |  |  |                         exchangeTrades.add(trade); | 
 |  |  |                     } | 
 |  |  |                     //判断匹配单是否完成 | 
 |  |  |                     //判断匹配单是否完成 TODO | 
 |  |  |                     if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) { | 
 |  |  |                         //当前匹配的订单完成交易,删除该订单 | 
 |  |  |                         orderIterator.remove(); | 
 |  |  | 
 |  |  |      */ | 
 |  |  |     private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) { | 
 |  |  |         if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { | 
 |  |  | //            BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover()); | 
 |  |  | //            if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN) | 
 |  |  | //                    .compareTo(BigDecimal.ZERO)==0){ | 
 |  |  | //                order.setTurnover(order.getAmount()); | 
 |  |  | //                return leftTurnover; | 
 |  |  | //            } | 
 |  |  |             BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount()); | 
 |  |  |             if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN) | 
 |  |  |                     .compareTo(BigDecimal.ZERO)==0){ | 
 |  |  |                 //order.setDealAmount(order.getEntrustAmount()); | 
 |  |  |                 return leftTurnover; | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |         return BigDecimal.ZERO; | 
 |  |  |     } | 
 |  |  | 
 |  |  |         availAmount = calculateTradedAmount(matchOrder, dealPrice); | 
 |  |  |         //计算成交量 取少的 | 
 |  |  |         BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount); | 
 |  |  |         logger.info("dealPrice={},amount={}", dealPrice, tradedAmount); | 
 |  |  |         //logger.info("dealPrice={},amount={}", dealPrice, tradedAmount); | 
 |  |  |         //如果成交额为0说明剩余额度无法成交,退出 | 
 |  |  |         if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) { | 
 |  |  |             return null; | 
 |  |  | 
 |  |  |         focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount)); | 
 |  |  |         // 用户单成交金额 | 
 |  |  |         focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover)); | 
 |  |  |  | 
 |  |  |         // 判断两个单是否完成 | 
 |  |  |         if(matchOrder.getEntrustAmount()!=null && matchOrder.getEntrustAmount().compareTo(matchOrder.getDealAmount())<=0){ | 
 |  |  |             matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); | 
 |  |  |         } | 
 |  |  |         if(matchOrder.getEntrustCnt()!=null && matchOrder.getEntrustCnt().compareTo(matchOrder.getDealCnt())<=0){ | 
 |  |  |             matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         if(focusedOrder.getEntrustAmount()!=null && focusedOrder.getEntrustAmount().compareTo(focusedOrder.getDealAmount())<=0){ | 
 |  |  |             focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); | 
 |  |  |         } | 
 |  |  |         if(focusedOrder.getEntrustCnt()!=null && focusedOrder.getEntrustCnt().compareTo(focusedOrder.getDealCnt())<=0){ | 
 |  |  |             focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         //创建成交记录 | 
 |  |  |         ExchangeTrade exchangeTrade = new ExchangeTrade(); | 
 |  |  | 
 |  |  |                     //orderCoinService.handleOrder(subTrades); | 
 |  |  |                 } | 
 |  |  |             } else { | 
 |  |  |                 trades.forEach(e -> { | 
 |  |  |                     System.out.println(e); | 
 |  |  |                 }); | 
 |  |  |                 exchangeProducer.sendHandleTrade(JSON.toJSONString(trades)); | 
 |  |  |                 //orderCoinService.handleOrder(trades); | 
 |  |  |                 // kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades)); | 
 |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  |      * 发送盘口变化消息 | 
 |  |  |      * | 
 |  |  |      * @param | 
 |  |  |      */ | 
 |  |  |     public String sendTradePlateMessage() { | 
 |  |  |         //防止并发引起数组越界,造成盘口倒挂 TODO | 
 |  |  |         List<List<BigDecimal>> plate; | 
 |  |  |         List<BigDecimal> plateItem; | 
 |  |  |         TradePlateModel tradePlateModel = new TradePlateModel(); | 
 |  |  |         // 转换格式 | 
 |  |  |         if (buyTradePlate != null && buyTradePlate.getItems() != null) { | 
 |  |  |             plate = new ArrayList<>(); | 
 |  |  |             LinkedList<TradePlateItem> items = buyTradePlate.getItems(); | 
 |  |  |             for (TradePlateItem item : items) { | 
 |  |  |                 plateItem = new ArrayList<>(2); | 
 |  |  |                 BigDecimal price = item.getPrice(); | 
 |  |  |                 BigDecimal amount = item.getAmount(); | 
 |  |  |                 plateItem.add(price); | 
 |  |  |                 plateItem.add(amount); | 
 |  |  |                 plate.add(plateItem); | 
 |  |  |             } | 
 |  |  |             tradePlateModel.setBuy(plate); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         if (sellTradePlate != null && sellTradePlate.getItems() != null) { | 
 |  |  |             plate = new ArrayList<>(); | 
 |  |  |             LinkedList<TradePlateItem> items = sellTradePlate.getItems(); | 
 |  |  |             for (TradePlateItem item : items) { | 
 |  |  |                 plateItem = new ArrayList<>(2); | 
 |  |  |                 BigDecimal price = item.getPrice(); | 
 |  |  |                 BigDecimal amount = item.getAmount(); | 
 |  |  |                 plateItem.add(price); | 
 |  |  |                 plateItem.add(amount); | 
 |  |  |                 plate.add(plateItem); | 
 |  |  |             } | 
 |  |  |             tradePlateModel.setSell(plate); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         // 盘口发生变化通知TODO | 
 |  |  |         return JSON.toJSONString(tradePlateModel); | 
 |  |  |         //exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel)); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  |      * 取消委托订单 | 
 |  |  |      * | 
 |  |  |      * @param exchangeOrder | 
 |  |  |      * @return | 
 |  |  |      */ | 
 |  |  |     public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) { | 
 |  |  |         logger.info("cancelOrder,orderId={}", exchangeOrder.getId()); | 
 |  |  |         //logger.info("cancelOrder,orderId={}", exchangeOrder.getId()); | 
 |  |  |         if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) { | 
 |  |  |             //处理市价单 | 
 |  |  |             Iterator<OrderCoinsEntity> orderIterator; | 
 
 |  |  | 
 |  |  |  | 
 |  |  | import java.io.Serializable; | 
 |  |  | import java.math.BigDecimal; | 
 |  |  | import java.util.Date; | 
 |  |  |  | 
 |  |  | /** | 
 |  |  |  * 撮合交易信息 | 
 
 |  |  | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public static String convertReverse(String symbol) { | 
 |  |  |         switch (symbol) { | 
 |  |  |             case "BTC/USDT": | 
 |  |  |                 return "btcusdt"; | 
 |  |  |             case "NEKK/USDT": | 
 |  |  |                 return "nekkusdt"; | 
 |  |  |             default: | 
 |  |  |                 return null; | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public static String convertToKey(String symbol) { | 
 |  |  |         switch (symbol) { | 
 |  |  |             case "BTC/USDT": | 
 
| New file | 
 |  |  | 
 |  |  | package com.xcong.excoin.websocket; | 
 |  |  |  | 
 |  |  | import lombok.Data; | 
 |  |  |  | 
 |  |  | import java.math.BigDecimal; | 
 |  |  |  | 
 |  |  | @Data | 
 |  |  | public class CandlestickModel { | 
 |  |  |     private Long id; | 
 |  |  |     private BigDecimal amount; | 
 |  |  |     private long count; | 
 |  |  |     private BigDecimal open; | 
 |  |  |     private BigDecimal close; | 
 |  |  |     private BigDecimal low; | 
 |  |  |     private BigDecimal high; | 
 |  |  |     private BigDecimal vol; | 
 |  |  |     private long timestamp; | 
 |  |  | } | 
 
| New file | 
 |  |  | 
 |  |  | package com.xcong.excoin.websocket; | 
 |  |  |  | 
 |  |  | import lombok.Data; | 
 |  |  |  | 
 |  |  | import java.util.List; | 
 |  |  |  | 
 |  |  | @Data | 
 |  |  | public class CandlestickResult { | 
 |  |  |     private String rep; | 
 |  |  |     private List<CandlestickModel> data; | 
 |  |  | } | 
 
| New file | 
 |  |  | 
 |  |  | package com.xcong.excoin.websocket; | 
 |  |  |  | 
 |  |  | import lombok.Data; | 
 |  |  |  | 
 |  |  | @Data | 
 |  |  | public class NewCandlestick { | 
 |  |  |     private String ch; | 
 |  |  |     private CandlestickModel tick; | 
 |  |  | } | 
 
| New file | 
 |  |  | 
 |  |  | package com.xcong.excoin.websocket; | 
 |  |  |  | 
 |  |  | import lombok.Data; | 
 |  |  |  | 
 |  |  | @Data | 
 |  |  | public class SubResultModel { | 
 |  |  |     private String id; | 
 |  |  |     private String status = "ok"; | 
 |  |  |     private String subbed; | 
 |  |  | } | 
 
 |  |  | 
 |  |  |  | 
 |  |  | import cn.hutool.core.util.StrUtil; | 
 |  |  | import com.alibaba.fastjson.JSON; | 
 |  |  | import com.alibaba.fastjson.JSONArray; | 
 |  |  | import com.alibaba.fastjson.JSONObject; | 
 |  |  | import com.huobi.client.model.Candlestick; | 
 |  |  | import com.xcong.excoin.common.contants.AppContants; | 
 |  |  | import com.xcong.excoin.trade.CoinTraderFactory; | 
 |  |  | import com.xcong.excoin.utils.CoinTypeConvert; | 
 |  |  | import com.xcong.excoin.utils.RedisUtils; | 
 |  |  | import com.xcong.excoin.utils.SpringContextHolder; | 
 |  |  | 
 |  |  | import javax.websocket.*; | 
 |  |  | import javax.websocket.server.PathParam; | 
 |  |  | import javax.websocket.server.ServerEndpoint; | 
 |  |  | import java.util.Collection; | 
 |  |  | import java.util.HashMap; | 
 |  |  | import java.util.Map; | 
 |  |  | import java.io.IOException; | 
 |  |  | import java.util.*; | 
 |  |  | import java.util.concurrent.ConcurrentHashMap; | 
 |  |  | import java.util.concurrent.atomic.AtomicInteger; | 
 |  |  |  | 
 |  |  | 
 |  |  |     @OnClose | 
 |  |  |     public void onClose(Session session) { | 
 |  |  |         onlineCount.decrementAndGet(); // 在线数减1 | 
 |  |  | //        Collection<Map<String, Session>> values = tradeplateClients.values(); | 
 |  |  | //        if(CollectionUtils.isNotEmpty(values)){ | 
 |  |  | //            for(Map<String,Session> map : values){ | 
 |  |  | //                map.remove(session.getId()); | 
 |  |  | //            } | 
 |  |  | //        } | 
 |  |  |         Collection<Map<String, Session>> values = tradeplateClients.values(); | 
 |  |  |         if(CollectionUtils.isNotEmpty(values)){ | 
 |  |  |             for(Map<String,Session> map : values){ | 
 |  |  |                 map.remove(session.getId()); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         Collection<Map<String, Session>> klineClientsValues = klineClients.values(); | 
 |  |  |         if(CollectionUtils.isNotEmpty(klineClientsValues)){ | 
 |  |  |             for(Map<String,Session> map : klineClientsValues){ | 
 |  |  |                 map.remove(session.getId()); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |         log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  | 
 |  |  |      */ | 
 |  |  |     @OnMessage | 
 |  |  |     public void onMessage(String message, Session session) { | 
 |  |  |         log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); | 
 |  |  |         //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); | 
 |  |  |         // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol | 
 |  |  |         //} | 
 |  |  |         JSONObject jsonObject = JSON.parseObject(message); | 
 |  |  |         // 盘口的判断 | 
 |  |  |         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { | 
 |  |  |             log.info("订阅盘口消息:{}", session.getId()); | 
 |  |  |             //log.info("订阅盘口消息:{}", session.getId()); | 
 |  |  |             String sub = jsonObject.get("sub").toString(); | 
 |  |  |             String symbol = sub.split("\\.")[1]; | 
 |  |  |             symbol = CoinTypeConvert.convert(symbol); | 
 |  |  | 
 |  |  |                 map.put(session.getId(), session); | 
 |  |  |                 tradeplateClients.put(symbol, map); | 
 |  |  |             } | 
 |  |  |             // 发送一次盘口 | 
 |  |  |             CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class); | 
 |  |  |             // 发送订阅消息 | 
 |  |  |             String nekk = factory.getTrader("NEKK").sendTradePlateMessage(); | 
 |  |  |             SubResultModel subResultModel = new SubResultModel(); | 
 |  |  |             subResultModel.setId("nekkusdt"); | 
 |  |  |             subResultModel.setSubbed(sub); | 
 |  |  |             synchronized (session){ | 
 |  |  |                 try { | 
 |  |  |                     session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel)); | 
 |  |  |                 } catch (IOException e) { | 
 |  |  |                     e.printStackTrace(); | 
 |  |  |                 } | 
 |  |  |             } | 
 |  |  |            synchronized (session){ | 
 |  |  |                try { | 
 |  |  |                    session.getBasicRemote().sendText(nekk); | 
 |  |  |                } catch (IOException e) { | 
 |  |  |                    e.printStackTrace(); | 
 |  |  |                } | 
 |  |  |            } | 
 |  |  |  | 
 |  |  |  | 
 |  |  |         } | 
 |  |  |         // 取消盘口订阅 | 
 |  |  |         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { | 
 |  |  |             // `market.${symbol}.kline.${strPeriod} | 
 |  |  |             log.info("取消订阅盘口消息:{}", session.getId()); | 
 |  |  |             //log.info("取消订阅盘口消息:{}", session.getId()); | 
 |  |  |             String unsub = jsonObject.get("unsub").toString(); | 
 |  |  |             String[] split = unsub.split("\\."); | 
 |  |  |             String symbol = split[1]; | 
 |  |  | 
 |  |  |         // 取消订阅 {unsub: xxx(标识)} | 
 |  |  |         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { | 
 |  |  |             // 订阅 | 
 |  |  |             log.info("订阅最新K线消息:{}", session.getId()); | 
 |  |  |             String sub = jsonObject.get("sub").toString(); | 
 |  |  |             log.info("订阅最新K线消息:{}", sub); | 
 |  |  |             String[] split = sub.split("\\."); | 
 |  |  |             String symbol = split[1]; | 
 |  |  |             symbol = CoinTypeConvert.convert(symbol); | 
 |  |  |             String period = split[3]; | 
 |  |  |             if("60min".equals(period)){ | 
 |  |  |                 period = "1hour"; | 
 |  |  |             } | 
 |  |  |             String key = symbol + "-" + period; | 
 |  |  |             log.info("最新K线key:{}", key); | 
 |  |  |             if (klineClients.containsKey(key)) { | 
 |  |  |                 // 有这个币种K线 | 
 |  |  |                 Map<String, Session> stringSessionMap = klineClients.get(key); | 
 |  |  |                 if (!stringSessionMap.containsKey(session.getId())) { | 
 |  |  |                     stringSessionMap.put(session.getId(), session); | 
 |  |  |                     log.info("放入最新K线Map:{}", key); | 
 |  |  |                 } | 
 |  |  |             } else { | 
 |  |  |                 Map<String, Session> stringSessionMap = new HashMap<>(); | 
 |  |  |                 stringSessionMap.put(session.getId(), session); | 
 |  |  |                 klineClients.put(key, stringSessionMap); | 
 |  |  |                 log.info("放入最新K线Map:{}", key); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         // 取消订阅 | 
 |  |  |         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { | 
 |  |  |             // `market.${symbol}.kline.${strPeriod} | 
 |  |  |             log.info("取消订阅最新K消息:{}", session.getId()); | 
 |  |  |             //log.info("取消订阅最新K消息:{}", session.getId()); | 
 |  |  |             String unsub = jsonObject.get("unsub").toString(); | 
 |  |  |             String[] split = unsub.split("\\."); | 
 |  |  |             String strPeriod = split[3]; | 
 |  |  |             if("60min".equals(strPeriod)){ | 
 |  |  |                 strPeriod = "1hour"; | 
 |  |  |             } | 
 |  |  |             String symbol = split[1]; | 
 |  |  |             symbol = CoinTypeConvert.convert(symbol); | 
 |  |  |             String key = symbol + "-" + strPeriod; | 
 |  |  |             if (klineClients.containsKey(key)) { | 
 |  |  |                 klineClients.get(key).remove(session.getId()); | 
 |  |  |                 //session.getAsyncRemote().sendText(message); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |  | 
 |  |  | 
 |  |  |             String symbol = split[1]; | 
 |  |  |             symbol = CoinTypeConvert.convert(symbol); | 
 |  |  |             String period = split[3]; | 
 |  |  |             if("60min".equals(period)){ | 
 |  |  |                 period = "1hour"; | 
 |  |  |             } | 
 |  |  |             //String key = symbol+"-"+period; | 
 |  |  |             // String key = "KINE_BCH/USDT_1week"; | 
 |  |  |             String key = "KINE_{}_{}"; | 
 |  |  |             // 币币k线数据 | 
 |  |  |             key = StrUtil.format(key, symbol, period); | 
 |  |  |             //key = StrUtil.format(key, symbol, period); | 
 |  |  |             key = StrUtil.format(key, "NEKK/USDT", period); | 
 |  |  |             RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); | 
 |  |  |             Object o = bean.get(key); | 
 |  |  |             sendMessageHistory(JSON.toJSONString(o), session); | 
 |  |  |             List<CandlestickModel> candlestickModels = new ArrayList<>(); | 
 |  |  |             CandlestickResult result = new CandlestickResult(); | 
 |  |  |             result.setRep(sub); | 
 |  |  |             if(o!=null){ | 
 |  |  |                 List<Candlestick> list = (List<Candlestick>)o; | 
 |  |  |                 for(Candlestick candlestick : list){ | 
 |  |  |                     CandlestickModel model = new CandlestickModel(); | 
 |  |  |                     model.setAmount(candlestick.getAmount()); | 
 |  |  |                     model.setClose(candlestick.getClose()); | 
 |  |  |                     model.setCount(candlestick.getCount()); | 
 |  |  |                     model.setHigh(candlestick.getHigh()); | 
 |  |  |                     model.setId(candlestick.getTimestamp()/1000); | 
 |  |  |                     model.setOpen(candlestick.getOpen()); | 
 |  |  |                     model.setLow(candlestick.getLow()); | 
 |  |  |                     model.setVol(candlestick.getVolume()); | 
 |  |  |                     candlestickModels.add(model); | 
 |  |  |                 } | 
 |  |  |                 result.setData(candlestickModels); | 
 |  |  |             } | 
 |  |  |  | 
 |  |  |             sendMessageHistory(JSON.toJSONString(result), session); | 
 |  |  |         } | 
 |  |  |     } | 
 |  |  |  | 
 |  |  | 
 |  |  |                 Session toSession = sessionEntry.getValue(); | 
 |  |  |                 // 排除掉自己 | 
 |  |  |                 //if (!fromSession.getId().equals(toSession.getId())) { | 
 |  |  |                 log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); | 
 |  |  |                // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); | 
 |  |  |                 boolean open = toSession.isOpen(); | 
 |  |  |                 if (open) { | 
 |  |  |                     toSession.getAsyncRemote().sendText(message); | 
 |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void sendMessageHistory(String message, Session toSession) { | 
 |  |  |         log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); | 
 |  |  |       //  log.info("服务端给客户端[{}]发送历史K线", toSession.getId()); | 
 |  |  |         boolean open = toSession.isOpen(); | 
 |  |  |         if (open) { | 
 |  |  |             toSession.getAsyncRemote().sendText(message); | 
 |  |  | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void sendMessageKline(String symbol, String period, String message, Session fromSession) { | 
 |  |  |  | 
 |  |  |         String key = symbol + "-" + period; | 
 |  |  |         //log.info("发送最新K线[{}],数据[{}]",key,message); | 
 |  |  |         if (klineClients.containsKey(key)) { | 
 |  |  |             Map<String, Session> stringSessionMap = klineClients.get(key); | 
 |  |  |             for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { | 
 |  |  |                 Session toSession = sessionEntry.getValue(); | 
 |  |  |                 // 排除掉自己 | 
 |  |  |                 //if (!fromSession.getId().equals(toSession.getId())) { | 
 |  |  |                 log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); | 
 |  |  |                 boolean open = toSession.isOpen(); | 
 |  |  |                 if (open) { | 
 |  |  |                     log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message); | 
 |  |  |                     toSession.getAsyncRemote().sendText(message); | 
 |  |  |                 } | 
 |  |  |                 //  } | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |     } | 
 
 |  |  | 
 |  |  |  | 
 |  |  |  | 
 |  |  | app: | 
 |  |  |   debug: false | 
 |  |  |   debug: true | 
 |  |  |   redis_expire: 3000 | 
 |  |  |   kline-update-job: false | 
 |  |  |   newest-price-update-job: true | 
 |  |  |   newest-price-update-job: false | 
 |  |  |   #日线 该任务不能与最新价处于同一个服务器 | 
 |  |  |   day-line: true | 
 |  |  |   other-job: true | 
 |  |  |   loop-job: true | 
 |  |  |   trade: false | 
 |  |  |   day-line: false | 
 |  |  |   other-job: false | 
 |  |  |   loop-job: false | 
 |  |  |   rabbit-consumer: true | 
 |  |  |   block-job: true | 
 |  |  |   block-job: false | 
 |  |  |  | 
 |  |  | aliyun: | 
 |  |  |   oss: | 
 
 |  |  | 
 |  |  |  | 
 |  |  | spring: | 
 |  |  |   profiles: | 
 |  |  |     active: dev | 
 |  |  |     active: test | 
 |  |  |   datasource: | 
 |  |  |     url: jdbc:mysql://rm-bp151tw8er79ig9kb5o.mysql.rds.aliyuncs.com:3306/db_biue?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8 | 
 |  |  |     username: ctcoin_data | 
 
 |  |  | 
 |  |  |             deal_amount buyTurnover,
 | 
 |  |  |             deal_amount sellTurnover,
 | 
 |  |  |             order_type direction,
 | 
 |  |  |             create_time time
 | 
 |  |  |             UNIX_TIMESTAMP(create_time) time
 | 
 |  |  |           from coins_order_deal
 | 
 |  |  |           where symbol = #{symbol}
 | 
 |  |  |           and order_type  = 1
 | 
 |  |  |           and order_status = 3
 | 
 |  |  |           and create_time between #{startTime} and #{endTime}
 | 
 |  |  |           <if test="startTime != null and endTime != null">
 | 
 |  |  |               and create_time between #{startTime} and #{endTime}
 | 
 |  |  |           </if>
 | 
 |  |  | 
 | 
 |  |  |     </select>
 | 
 |  |  |     <select id="selectAllWalletCoinOrderBySymbol"
 | 
 |  |  |             resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
 |