package com.xcong.excoin.processor; import com.alibaba.fastjson.JSON; import com.huobi.client.model.Candlestick; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.trade.ExchangeTrade; import com.xcong.excoin.utils.RedisUtils; import lombok.ToString; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; /** * 默认交易处理器,产生1mK线信息 */ @ToString public class DefaultCoinProcessor implements CoinProcessor { private Logger logger = LoggerFactory.getLogger(DefaultCoinProcessor.class); private String symbol; private String baseCoin; private Candlestick currentKLine; private List handlers; private CoinThumb coinThumb; private MarketService service; private RedisUtils redisUtils; //private CoinExchangeRate coinExchangeRate; //是否暂时处理 private Boolean isHalt = true; //是否停止K线生成 private Boolean stopKLine = false; /** * 每个时间段的K线在生成后,生成一个最新的K线 */ private Map currentKlineMap = new ConcurrentHashMap<>(); public DefaultCoinProcessor(String symbol, String baseCoin) { //handlers = new ArrayList<>(); createNewKLine(); this.baseCoin = baseCoin; this.symbol = symbol; } public String getSymbol() { return symbol; } @Override public void initializeThumb() { Calendar calendar = Calendar.getInstance(); //将秒、微秒字段置为0 calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); long nowTime = calendar.getTimeInMillis(); calendar.set(Calendar.MINUTE, 0); calendar.set(Calendar.HOUR_OF_DAY, 0); long firstTimeOfToday = calendar.getTimeInMillis(); String period = "1min"; //logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime); List lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period); coinThumb = new CoinThumb(); synchronized (coinThumb) { coinThumb.setSymbol(symbol); for (Candlestick kline : lines) { if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0) { continue; } if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) { coinThumb.setOpen(kline.getOpen()); } if (coinThumb.getHigh().compareTo(kline.getHigh()) < 0) { coinThumb.setHigh(kline.getHigh()); } if (kline.getLow().compareTo(BigDecimal.ZERO) > 0 && coinThumb.getLow().compareTo(kline.getLow()) > 0) { coinThumb.setLow(kline.getLow()); } if (kline.getClose().compareTo(BigDecimal.ZERO) > 0) { coinThumb.setClose(kline.getClose()); } coinThumb.setVolume(coinThumb.getVolume().add(kline.getVolume())); // TODO coinThumb.setTurnover(coinThumb.getTurnover().add(kline.getAmount())); } coinThumb.setChange(coinThumb.getClose().subtract(coinThumb.getOpen())); // 此处计算涨幅并没有以开盘价为标准,而是以最低价 if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) { coinThumb.setChg(coinThumb.getChange().divide(coinThumb.getLow(), 4, RoundingMode.UP)); } } } public void createNewKLine() { currentKLine = new Candlestick(); synchronized (currentKLine) { Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); //1Min时间要是下一整分钟的 calendar.add(Calendar.MINUTE, 1); currentKLine.setTimestamp(calendar.getTimeInMillis()); // K线类型 //currentKLine.setPeriod("1min"); currentKLine.setCount(0); } } /** * 00:00:00 时重置CoinThumb */ @Override public void resetThumb() { logger.info("reset coinThumb"); synchronized (coinThumb) { coinThumb.setOpen(BigDecimal.ZERO); coinThumb.setHigh(BigDecimal.ZERO); //设置昨收价格 coinThumb.setLastDayClose(coinThumb.getClose()); //coinThumb.setClose(BigDecimal.ZERO); coinThumb.setLow(BigDecimal.ZERO); coinThumb.setChg(BigDecimal.ZERO); coinThumb.setChange(BigDecimal.ZERO); } } // @Override // public void setExchangeRate(CoinExchangeRate coinExchangeRate) { // this.coinExchangeRate = coinExchangeRate; // } @Override public void update24HVolume(long time) { if(coinThumb!=null) { synchronized (coinThumb) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); calendar.add(Calendar.HOUR_OF_DAY, -24); long timeStart = calendar.getTimeInMillis(); // TODO BigDecimal volume = service.findTradeVolume(this.symbol, timeStart, time); coinThumb.setVolume(volume.setScale(4, RoundingMode.DOWN)); } } } // @Override // public void initializeUsdRate() { // //logger.info("symbol = {} ,baseCoin = {}",this.symbol,this.baseCoin); // BigDecimal baseUsdRate = coinExchangeRate.getUsdRate(baseCoin); // coinThumb.setBaseUsdRate(baseUsdRate); // //logger.info("setBaseUsdRate = ",baseUsdRate); // BigDecimal multiply = coinThumb.getClose().multiply(coinExchangeRate.getUsdRate(baseCoin)); // //logger.info("setUsdRate = ",multiply); // coinThumb.setUsdRate(multiply); // } @Override public void autoGenerate() { DateFormat df = new SimpleDateFormat("HH:mm:ss"); //logger.info("auto generate 1min kline in {},data={}", df.format(new Date(currentKLine.getTime())), JSON.toJSONString(currentKLine)); if(coinThumb != null) { synchronized (currentKLine) { //没有成交价时存储上一笔成交价 if(currentKLine.getOpen()==null){ currentKLine.setOpen(BigDecimal.ZERO); } if (currentKLine.getOpen().compareTo(BigDecimal.ZERO) == 0) { currentKLine.setOpen(coinThumb.getClose()); currentKLine.setLow(coinThumb.getClose()); currentKLine.setHigh(coinThumb.getClose()); currentKLine.setClose(coinThumb.getClose()); } Calendar calendar = Calendar.getInstance(); calendar.set(Calendar.SECOND, 0); calendar.set(Calendar.MILLISECOND, 0); currentKLine.setTimestamp(calendar.getTimeInMillis()); handleKLineStorage(currentKLine); createNewKLine(); } } } @Override public void setIsHalt(boolean status) { this.isHalt = status; } @Override public void process(List trades) { if (!isHalt) { if (trades == null || trades.size() == 0) { return; } synchronized (currentKLine) { for (ExchangeTrade exchangeTrade : trades) { //处理K线 processTrade(currentKLine, exchangeTrade); //处理今日概况信息 //logger.debug("处理今日概况信息"); handleThumb(exchangeTrade); //存储并推送成交信息 handleTradeStorage(exchangeTrade); } } } } public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) { if (kLine.getClose()==null || kLine.getClose().compareTo(BigDecimal.ZERO)==0) { //第一次设置K线值 kLine.setOpen(exchangeTrade.getPrice()); kLine.setHigh(exchangeTrade.getPrice()); kLine.setLow(exchangeTrade.getPrice()); kLine.setClose(exchangeTrade.getPrice()); } else { kLine.setHigh(exchangeTrade.getPrice().max(kLine.getHigh())); kLine.setLow(exchangeTrade.getPrice().min(kLine.getLow())); kLine.setClose(exchangeTrade.getPrice()); } kLine.setCount(kLine.getCount() + 1); if(kLine.getVolume()==null){ kLine.setVolume(BigDecimal.ZERO); } kLine.setAmount(kLine.getVolume().add(exchangeTrade.getAmount())); BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()); kLine.setVolume(kLine.getVolume().add(turnover)); //kLine.setTimestamp(System.currentTimeMillis()); } public void handleTradeStorage(ExchangeTrade exchangeTrade) { // for (MarketHandler storage : handlers) { // storage.handleTrade(symbol, exchangeTrade, coinThumb); // } } public void handleKLineStorage(Candlestick kLine) { // 存储交易信息 TODO 发送最新的一根K线 // for (MarketHandler storage : handlers) { // storage.handleKLine(symbol, kLine); // } } public void handleThumb(ExchangeTrade exchangeTrade) { //logger.info("handleThumb symbol = {}", this.symbol); synchronized (coinThumb) { if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) { //第一笔交易记为开盘价 coinThumb.setOpen(exchangeTrade.getPrice()); } coinThumb.setHigh(exchangeTrade.getPrice().max(coinThumb.getHigh())); if (coinThumb.getLow().compareTo(BigDecimal.ZERO) == 0) { coinThumb.setLow(exchangeTrade.getPrice()); } else { coinThumb.setLow(exchangeTrade.getPrice().min(coinThumb.getLow())); } coinThumb.setClose(exchangeTrade.getPrice()); coinThumb.setVolume(coinThumb.getVolume().add(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP)); BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP); coinThumb.setTurnover(coinThumb.getTurnover().add(turnover)); BigDecimal change = coinThumb.getClose().subtract(coinThumb.getOpen()); coinThumb.setChange(change); if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) { coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP)); } if ("USDT".equalsIgnoreCase(baseCoin)) { // logger.info("setUsdRate", exchangeTrade.getPrice()); coinThumb.setUsdRate(exchangeTrade.getPrice()); } else { } //coinThumb.setBaseUsdRate(coinExchangeRate.getUsdRate(baseCoin)); //coinThumb.setUsdRate(exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin))); //logger.info("setUsdRate", exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin))); //logger.info("thumb = {}", coinThumb); } } @Override public void addHandler(MarketHandler storage) { handlers.add(storage); } @Override public CoinThumb getThumb() { return coinThumb; } @Override public void setMarketService(MarketService service) { this.service = service; } @Override public void generateKLine(int range, int field, long time) { Calendar calendar = Calendar.getInstance(); calendar.setTimeInMillis(time); DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long endTick = calendar.getTimeInMillis(); String endTime = df.format(calendar.getTime()); //往前推range个时间单位 calendar.add(field, -range); String fromTime = df.format(calendar.getTime()); long startTick = calendar.getTimeInMillis(); //System.out.println("time range from " + fromTime + " to " + endTime); List exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick); Candlestick kLine = new Candlestick(); kLine.setTimestamp(endTick); kLine.setAmount(BigDecimal.ZERO); kLine.setClose(AppContants.DEFAULT_PRICE); kLine.setLow(AppContants.DEFAULT_PRICE); kLine.setOpen(AppContants.DEFAULT_PRICE); kLine.setVolume(BigDecimal.ZERO); kLine.setHigh(AppContants.DEFAULT_PRICE); String rangeUnit = ""; if (field == Calendar.MINUTE) { rangeUnit = "min"; } else if (field == Calendar.HOUR_OF_DAY) { rangeUnit = "hour"; } else if (field == Calendar.DAY_OF_WEEK) { rangeUnit = "week"; } else if (field == Calendar.DAY_OF_YEAR) { rangeUnit = "day"; } else if (field == Calendar.MONTH) { rangeUnit = "month"; } // kLine.setPeriod(range + rangeUnit); String period = range + rangeUnit; // 处理K线信息 for (ExchangeTrade exchangeTrade : exchangeTrades) { processTrade(kLine, exchangeTrade); } // 如果开盘价为0,则设置为前一个价格 if(kLine.getOpen().compareTo(BigDecimal.ZERO) == 0) { // 查询前一根K线 TODO String key = "KINE_" + symbol + "/USDT_" + period; Object data = redisUtils.get(key); List list = new ArrayList(); if (data != null) { list = (List) data; Candlestick o = (Candlestick)list.get(list.size() - 1); kLine.setOpen(o.getClose()); kLine.setClose(o.getClose()); kLine.setLow(o.getClose()); kLine.setHigh(o.getClose()); }else{ kLine.setOpen(coinThumb.getClose()); kLine.setClose(coinThumb.getClose()); kLine.setLow(coinThumb.getClose()); kLine.setHigh(coinThumb.getClose()); } } //logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine)); service.saveKLine(symbol,period, kLine); // 生成一个对应的新K线 后续的交易会更新这个最新K线数据 Candlestick newKline = new Candlestick(); //kLine.setTimestamp(endTick); newKline.setAmount(BigDecimal.ZERO); newKline.setClose(kLine.getClose()); newKline.setLow(kLine.getClose()); // 开盘价是上个K线的收盘价 newKline.setOpen(kLine.getClose()); newKline.setVolume(BigDecimal.ZERO); newKline.setHigh(kLine.getClose()); calendar.add(field, 2*range); newKline.setTimestamp(calendar.getTimeInMillis()); currentKlineMap.put(period,newKline); // 存储昨日K线 if("day".equals(rangeUnit)){ System.out.println("存储日K线"); kLine.setOpen(kLine.getClose()); kLine.setLow(kLine.getClose()); kLine.setHigh(kLine.getClose()); kLine.setVolume(BigDecimal.ZERO); redisUtils.set("XCT/USDT",kLine); } } @Override public Candlestick getKLine() { return currentKLine; } @Override public void setIsStopKLine(boolean stop) { this.stopKLine = stop; } @Override public boolean isStopKline() { return this.stopKLine; } @Override public Map getCurrentKlineMap() { return currentKlineMap; } @Override public void setRedisUtils(RedisUtils redisUtils) { this.redisUtils = redisUtils; } public void setCurrentKlineMap(Map currentKlineMap) { this.currentKlineMap = currentKlineMap; } }