package com.xcong.excoin.processor;
|
|
|
import com.alibaba.fastjson.JSON;
|
import com.huobi.client.model.Candlestick;
|
import com.xcong.excoin.trade.ExchangeTrade;
|
import com.xcong.excoin.utils.RedisUtils;
|
import lombok.ToString;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
import java.math.BigDecimal;
|
import java.math.RoundingMode;
|
import java.text.DateFormat;
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
|
/**
|
* 默认交易处理器,产生1mK线信息
|
*/
|
@ToString
|
public class DefaultCoinProcessor implements CoinProcessor {
|
private Logger logger = LoggerFactory.getLogger(DefaultCoinProcessor.class);
|
private String symbol;
|
private String baseCoin;
|
private Candlestick currentKLine;
|
private List<MarketHandler> handlers;
|
private CoinThumb coinThumb;
|
private MarketService service;
|
private RedisUtils redisUtils;
|
//private CoinExchangeRate coinExchangeRate;
|
//是否暂时处理
|
private Boolean isHalt = true;
|
//是否停止K线生成
|
private Boolean stopKLine = false;
|
|
/**
|
* 每个时间段的K线在生成后,生成一个最新的K线
|
*/
|
private Map<String,Candlestick> currentKlineMap = new ConcurrentHashMap<>();
|
|
public DefaultCoinProcessor(String symbol, String baseCoin) {
|
//handlers = new ArrayList<>();
|
createNewKLine();
|
this.baseCoin = baseCoin;
|
this.symbol = symbol;
|
}
|
|
public String getSymbol() {
|
return symbol;
|
}
|
|
@Override
|
public void initializeThumb() {
|
Calendar calendar = Calendar.getInstance();
|
//将秒、微秒字段置为0
|
calendar.set(Calendar.SECOND, 0);
|
calendar.set(Calendar.MILLISECOND, 0);
|
long nowTime = calendar.getTimeInMillis();
|
calendar.set(Calendar.MINUTE, 0);
|
calendar.set(Calendar.HOUR_OF_DAY, 0);
|
long firstTimeOfToday = calendar.getTimeInMillis();
|
String period = "1min";
|
//logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
|
List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period);
|
coinThumb = new CoinThumb();
|
synchronized (coinThumb) {
|
coinThumb.setSymbol(symbol);
|
for (Candlestick kline : lines) {
|
if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0) {
|
continue;
|
}
|
if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
|
coinThumb.setOpen(kline.getOpen());
|
}
|
if (coinThumb.getHigh().compareTo(kline.getHigh()) < 0) {
|
coinThumb.setHigh(kline.getHigh());
|
}
|
if (kline.getLow().compareTo(BigDecimal.ZERO) > 0 && coinThumb.getLow().compareTo(kline.getLow()) > 0) {
|
coinThumb.setLow(kline.getLow());
|
}
|
if (kline.getClose().compareTo(BigDecimal.ZERO) > 0) {
|
coinThumb.setClose(kline.getClose());
|
}
|
coinThumb.setVolume(coinThumb.getVolume().add(kline.getVolume()));
|
// TODO
|
coinThumb.setTurnover(coinThumb.getTurnover().add(kline.getAmount()));
|
}
|
coinThumb.setChange(coinThumb.getClose().subtract(coinThumb.getOpen()));
|
// 此处计算涨幅并没有以开盘价为标准,而是以最低价
|
if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
|
coinThumb.setChg(coinThumb.getChange().divide(coinThumb.getLow(), 4, RoundingMode.UP));
|
}
|
}
|
}
|
|
public void createNewKLine() {
|
currentKLine = new Candlestick();
|
synchronized (currentKLine) {
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.SECOND, 0);
|
calendar.set(Calendar.MILLISECOND, 0);
|
//1Min时间要是下一整分钟的
|
calendar.add(Calendar.MINUTE, 1);
|
currentKLine.setTimestamp(calendar.getTimeInMillis());
|
// K线类型
|
//currentKLine.setPeriod("1min");
|
currentKLine.setCount(0);
|
}
|
}
|
|
/**
|
* 00:00:00 时重置CoinThumb
|
*/
|
@Override
|
public void resetThumb() {
|
logger.info("reset coinThumb");
|
synchronized (coinThumb) {
|
coinThumb.setOpen(BigDecimal.ZERO);
|
coinThumb.setHigh(BigDecimal.ZERO);
|
//设置昨收价格
|
coinThumb.setLastDayClose(coinThumb.getClose());
|
//coinThumb.setClose(BigDecimal.ZERO);
|
coinThumb.setLow(BigDecimal.ZERO);
|
coinThumb.setChg(BigDecimal.ZERO);
|
coinThumb.setChange(BigDecimal.ZERO);
|
}
|
}
|
|
// @Override
|
// public void setExchangeRate(CoinExchangeRate coinExchangeRate) {
|
// this.coinExchangeRate = coinExchangeRate;
|
// }
|
|
@Override
|
public void update24HVolume(long time) {
|
if(coinThumb!=null) {
|
synchronized (coinThumb) {
|
Calendar calendar = Calendar.getInstance();
|
calendar.setTimeInMillis(time);
|
calendar.add(Calendar.HOUR_OF_DAY, -24);
|
long timeStart = calendar.getTimeInMillis();
|
// TODO
|
BigDecimal volume = service.findTradeVolume(this.symbol, timeStart, time);
|
coinThumb.setVolume(volume.setScale(4, RoundingMode.DOWN));
|
}
|
}
|
}
|
|
// @Override
|
// public void initializeUsdRate() {
|
// //logger.info("symbol = {} ,baseCoin = {}",this.symbol,this.baseCoin);
|
// BigDecimal baseUsdRate = coinExchangeRate.getUsdRate(baseCoin);
|
// coinThumb.setBaseUsdRate(baseUsdRate);
|
// //logger.info("setBaseUsdRate = ",baseUsdRate);
|
// BigDecimal multiply = coinThumb.getClose().multiply(coinExchangeRate.getUsdRate(baseCoin));
|
// //logger.info("setUsdRate = ",multiply);
|
// coinThumb.setUsdRate(multiply);
|
// }
|
|
|
@Override
|
public void autoGenerate() {
|
DateFormat df = new SimpleDateFormat("HH:mm:ss");
|
//logger.info("auto generate 1min kline in {},data={}", df.format(new Date(currentKLine.getTime())), JSON.toJSONString(currentKLine));
|
if(coinThumb != null) {
|
synchronized (currentKLine) {
|
//没有成交价时存储上一笔成交价
|
if(currentKLine.getOpen()==null){
|
currentKLine.setOpen(BigDecimal.ZERO);
|
}
|
if (currentKLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
|
currentKLine.setOpen(coinThumb.getClose());
|
currentKLine.setLow(coinThumb.getClose());
|
currentKLine.setHigh(coinThumb.getClose());
|
currentKLine.setClose(coinThumb.getClose());
|
}
|
Calendar calendar = Calendar.getInstance();
|
calendar.set(Calendar.SECOND, 0);
|
calendar.set(Calendar.MILLISECOND, 0);
|
currentKLine.setTimestamp(calendar.getTimeInMillis());
|
handleKLineStorage(currentKLine);
|
createNewKLine();
|
}
|
}
|
}
|
|
@Override
|
public void setIsHalt(boolean status) {
|
this.isHalt = status;
|
}
|
|
@Override
|
public void process(List<ExchangeTrade> trades) {
|
if (!isHalt) {
|
if (trades == null || trades.size() == 0) {
|
return;
|
}
|
synchronized (currentKLine) {
|
for (ExchangeTrade exchangeTrade : trades) {
|
//处理K线
|
processTrade(currentKLine, exchangeTrade);
|
//处理今日概况信息
|
//logger.debug("处理今日概况信息");
|
handleThumb(exchangeTrade);
|
//存储并推送成交信息
|
handleTradeStorage(exchangeTrade);
|
}
|
}
|
}
|
}
|
|
public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) {
|
if (kLine.getClose()==null || kLine.getClose().compareTo(BigDecimal.ZERO)==0) {
|
//第一次设置K线值
|
kLine.setOpen(exchangeTrade.getPrice());
|
kLine.setHigh(exchangeTrade.getPrice());
|
kLine.setLow(exchangeTrade.getPrice());
|
kLine.setClose(exchangeTrade.getPrice());
|
} else {
|
kLine.setHigh(exchangeTrade.getPrice().max(kLine.getHigh()));
|
kLine.setLow(exchangeTrade.getPrice().min(kLine.getLow()));
|
kLine.setClose(exchangeTrade.getPrice());
|
}
|
kLine.setCount(kLine.getCount() + 1);
|
if(kLine.getVolume()==null){
|
kLine.setVolume(BigDecimal.ZERO);
|
}
|
kLine.setAmount(kLine.getVolume().add(exchangeTrade.getAmount()));
|
BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount());
|
kLine.setVolume(kLine.getVolume().add(turnover));
|
//kLine.setTimestamp(System.currentTimeMillis());
|
}
|
|
public void handleTradeStorage(ExchangeTrade exchangeTrade) {
|
// for (MarketHandler storage : handlers) {
|
// storage.handleTrade(symbol, exchangeTrade, coinThumb);
|
// }
|
}
|
|
public void handleKLineStorage(Candlestick kLine) {
|
// 存储交易信息 TODO 发送最新的一根K线
|
// for (MarketHandler storage : handlers) {
|
// storage.handleKLine(symbol, kLine);
|
// }
|
}
|
|
public void handleThumb(ExchangeTrade exchangeTrade) {
|
//logger.info("handleThumb symbol = {}", this.symbol);
|
synchronized (coinThumb) {
|
if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
|
//第一笔交易记为开盘价
|
coinThumb.setOpen(exchangeTrade.getPrice());
|
}
|
coinThumb.setHigh(exchangeTrade.getPrice().max(coinThumb.getHigh()));
|
if (coinThumb.getLow().compareTo(BigDecimal.ZERO) == 0) {
|
coinThumb.setLow(exchangeTrade.getPrice());
|
} else {
|
coinThumb.setLow(exchangeTrade.getPrice().min(coinThumb.getLow()));
|
}
|
coinThumb.setClose(exchangeTrade.getPrice());
|
coinThumb.setVolume(coinThumb.getVolume().add(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP));
|
BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount()).setScale(4, RoundingMode.UP);
|
coinThumb.setTurnover(coinThumb.getTurnover().add(turnover));
|
BigDecimal change = coinThumb.getClose().subtract(coinThumb.getOpen());
|
coinThumb.setChange(change);
|
if (coinThumb.getLow().compareTo(BigDecimal.ZERO) > 0) {
|
coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP));
|
}
|
if ("USDT".equalsIgnoreCase(baseCoin)) {
|
// logger.info("setUsdRate", exchangeTrade.getPrice());
|
coinThumb.setUsdRate(exchangeTrade.getPrice());
|
} else {
|
|
}
|
//coinThumb.setBaseUsdRate(coinExchangeRate.getUsdRate(baseCoin));
|
//coinThumb.setUsdRate(exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
|
//logger.info("setUsdRate", exchangeTrade.getPrice().multiply(coinExchangeRate.getUsdRate(baseCoin)));
|
//logger.info("thumb = {}", coinThumb);
|
}
|
}
|
|
@Override
|
public void addHandler(MarketHandler storage) {
|
handlers.add(storage);
|
}
|
|
|
@Override
|
public CoinThumb getThumb() {
|
return coinThumb;
|
}
|
|
@Override
|
public void setMarketService(MarketService service) {
|
this.service = service;
|
}
|
|
@Override
|
public void generateKLine(int range, int field, long time) {
|
Calendar calendar = Calendar.getInstance();
|
calendar.setTimeInMillis(time);
|
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
long endTick = calendar.getTimeInMillis();
|
String endTime = df.format(calendar.getTime());
|
//往前推range个时间单位
|
calendar.add(field, -range);
|
String fromTime = df.format(calendar.getTime());
|
long startTick = calendar.getTimeInMillis();
|
//System.out.println("time range from " + fromTime + " to " + endTime);
|
List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick);
|
|
Candlestick kLine = new Candlestick();
|
kLine.setTimestamp(endTick);
|
kLine.setAmount(BigDecimal.ZERO);
|
kLine.setClose(BigDecimal.valueOf(66));
|
kLine.setLow(BigDecimal.valueOf(66));
|
kLine.setOpen(BigDecimal.valueOf(66));
|
kLine.setVolume(BigDecimal.ZERO);
|
kLine.setHigh(BigDecimal.valueOf(66));
|
String rangeUnit = "";
|
if (field == Calendar.MINUTE) {
|
rangeUnit = "min";
|
} else if (field == Calendar.HOUR_OF_DAY) {
|
rangeUnit = "hour";
|
} else if (field == Calendar.DAY_OF_WEEK) {
|
rangeUnit = "week";
|
} else if (field == Calendar.DAY_OF_YEAR) {
|
rangeUnit = "day";
|
} else if (field == Calendar.MONTH) {
|
rangeUnit = "month";
|
}
|
// kLine.setPeriod(range + rangeUnit);
|
String period = range + rangeUnit;
|
// 处理K线信息
|
for (ExchangeTrade exchangeTrade : exchangeTrades) {
|
processTrade(kLine, exchangeTrade);
|
}
|
// 如果开盘价为0,则设置为前一个价格
|
if(kLine.getOpen().compareTo(BigDecimal.ZERO) == 0) {
|
// 查询前一根K线 TODO
|
String key = "KINE_" + symbol + "/USDT_" + period;
|
Object data = redisUtils.get(key);
|
List list = new ArrayList();
|
if (data != null) {
|
list = (List) data;
|
Candlestick o = (Candlestick)list.get(list.size() - 1);
|
kLine.setOpen(o.getClose());
|
kLine.setClose(o.getClose());
|
kLine.setLow(o.getClose());
|
kLine.setHigh(o.getClose());
|
}else{
|
kLine.setOpen(coinThumb.getClose());
|
kLine.setClose(coinThumb.getClose());
|
kLine.setLow(coinThumb.getClose());
|
kLine.setHigh(coinThumb.getClose());
|
}
|
|
}
|
//logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
|
service.saveKLine(symbol,period, kLine);
|
// 生成一个对应的新K线 后续的交易会更新这个最新K线数据
|
Candlestick newKline = new Candlestick();
|
//kLine.setTimestamp(endTick);
|
newKline.setAmount(BigDecimal.ZERO);
|
newKline.setClose(kLine.getClose());
|
newKline.setLow(kLine.getClose());
|
// 开盘价是上个K线的收盘价
|
newKline.setOpen(kLine.getClose());
|
newKline.setVolume(BigDecimal.ZERO);
|
newKline.setHigh(kLine.getClose());
|
calendar.add(field, 2*range);
|
newKline.setTimestamp(calendar.getTimeInMillis());
|
currentKlineMap.put(period,newKline);
|
|
// 存储昨日K线
|
if("day".equals(rangeUnit)){
|
System.out.println("存储日K线");
|
kLine.setOpen(kLine.getClose());
|
kLine.setLow(kLine.getClose());
|
kLine.setHigh(kLine.getClose());
|
kLine.setVolume(BigDecimal.ZERO);
|
redisUtils.set("BZZ/USDT",kLine);
|
}
|
}
|
|
@Override
|
public Candlestick getKLine() {
|
return currentKLine;
|
}
|
|
@Override
|
public void setIsStopKLine(boolean stop) {
|
this.stopKLine = stop;
|
}
|
|
@Override
|
public boolean isStopKline() {
|
return this.stopKLine;
|
}
|
|
|
@Override
|
public Map<String, Candlestick> getCurrentKlineMap() {
|
return currentKlineMap;
|
}
|
|
@Override
|
public void setRedisUtils(RedisUtils redisUtils) {
|
this.redisUtils = redisUtils;
|
}
|
|
public void setCurrentKlineMap(Map<String, Candlestick> currentKlineMap) {
|
this.currentKlineMap = currentKlineMap;
|
}
|
}
|