From d174d6963d62b3bd176f9e7ba3cf0d7f75a91b69 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Wed, 16 Sep 2020 16:03:22 +0800
Subject: [PATCH] 撮合交易代码提交
---
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 79 +++++++
src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java | 11 +
src/main/java/com/xcong/excoin/processor/MarketService.java | 6
src/main/java/com/xcong/excoin/trade/ExchangeTrade.java | 1
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 109 ++++++++--
src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java | 30 +-
src/main/resources/application-test.yml | 13
src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml | 7
src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java | 4
src/main/java/com/xcong/excoin/websocket/CandlestickResult.java | 11 +
src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java | 3
src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java | 5
src/main/java/com/xcong/excoin/websocket/CandlestickModel.java | 18 +
src/main/java/com/xcong/excoin/websocket/NewCandlestick.java | 9
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java | 22 ++
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 79 ++++---
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 38 +++
src/main/java/com/xcong/excoin/trade/CoinTrader.java | 91 +++++++-
src/main/java/com/xcong/excoin/websocket/SubResultModel.java | 10 +
src/main/resources/application.yml | 2
20 files changed, 438 insertions(+), 110 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
index ca953be..cc06661 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/controller/OrderCoinController.java
@@ -5,6 +5,7 @@
import javax.annotation.Resource;
import javax.validation.Valid;
+import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
@@ -71,7 +72,7 @@
return orderCoinService.submitSalesWalletCoinOrderWithMatch(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
}else{
- return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount);
+ return orderCoinService.submitSalesWalletCoinOrder(symbol,type,tradeType,price,amount,submitSalesWalletCoinOrderDto.getEntrustAmount());
}
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
index 8bf291c..779e375 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/OrderCoinService.java
@@ -16,7 +16,7 @@
Result enterTransactionPageOfWalletCoin(String symbol);
Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price,
- BigDecimal amount);
+ BigDecimal amount,BigDecimal entrustAmount);
/**
* 需要撮合交易的币种提交买卖单
@@ -51,4 +51,7 @@
public void handleOrder(List<ExchangeTrade> trades);
+ void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
+ BigDecimal amount,BigDecimal entrustAmount);
+
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
index 07a9886..2ddead0 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -161,13 +161,18 @@
@Override
@Transactional
- public Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount) {
+ public Result submitSalesWalletCoinOrder(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount,BigDecimal entrustAmount) {
+
+
//获取用户ID
Long memberId = LoginUserUtils.getAppLoginUser().getId();
BigDecimal nowPriceinBigDecimal = price;
//查询当前价
BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
-
+ if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type) && OrderCoinsEntity.TRADETYPE_MARKETPRICE.equals(tradeType)){
+ amount = entrustAmount.divide(nowPrice,BigDecimal.ROUND_DOWN);
+ }
+ // 处理市价
// 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
symbol = symbol.toUpperCase();
MemberWalletCoinEntity walletCoin = memberWalletCoinDao.selectWalletCoinBymIdAndCode(memberId, symbol);
@@ -319,7 +324,7 @@
@Override
public Result submitSalesWalletCoinOrderWithMatch(String symbol, Integer type, Integer tradeType, BigDecimal price, BigDecimal amount, BigDecimal entrustAmount) {
//获取用户ID
- Long memberId = 13L;
+ Long memberId = LoginUserUtils.getAppLoginUser().getId();
BigDecimal nowPriceinBigDecimal = price;
//查询当前价
//BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
@@ -818,11 +823,16 @@
if(exchangeTrade==null){
continue;
}
+ // 量
BigDecimal amount = exchangeTrade.getAmount();
+ // 买单ID
Long buyOrderId = exchangeTrade.getBuyOrderId();
+ // 成交金额(usdt)
BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
int direction = exchangeTrade.getDirection();
+ // 成交价
BigDecimal price = exchangeTrade.getPrice();
+ // 卖单
Long sellOrderId = exchangeTrade.getSellOrderId();
// 买卖单都需要处理
// 买单
@@ -918,4 +928,67 @@
}
}
}
+
+ @Override
+ public void initOrders(String symbol, Integer type, Integer tradeType, BigDecimal price,
+ BigDecimal amount,BigDecimal entrustAmount) {
+ //获取用户ID
+ Long memberId = 10L;
+ BigDecimal nowPriceinBigDecimal = price;
+ //查询当前价
+ //BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(symbol + "/USDT")));
+
+ // 获取交易管理的杠杠倍率,手续费率等信息,由平台进行设置
+ symbol = symbol.toUpperCase();
+
+ // 手续费用(手续费=建仓价X数量X手续费率)
+ BigDecimal closingPrice = BigDecimal.ZERO ;
+
+ // BigDecimal totalPayPricCoin = nowPrice.multiply(amount).add(closingPrice);
+ // 首先将单插入到数据库主表(委托表)
+ // 创建订单
+ OrderCoinsEntity order = new OrderCoinsEntity();
+ //根据委托类型生成不同数据
+ // 如果是限价交易直接插入主表数据
+ order.setMemberId(memberId);
+ order.setOrderNo(generateSimpleSerialno(memberId.toString()));
+ order.setOrderType(type);
+ order.setSymbol(symbol);
+ //order.setMarkPrice(nowPrice);
+
+ // 成交量 先设置为0
+ order.setDealCnt(BigDecimal.ZERO);
+ // 成交价
+ //order.setDealPrice(price);
+ // 成交金额
+ order.setDealAmount(BigDecimal.ZERO);
+ order.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DODING);
+ order.setTradeType(tradeType);
+ // 手续费
+ order.setFeeAmount(closingPrice);
+ if(OrderCoinsEntity.TRADETYPE_FIXEDPRICE.equals(tradeType)){
+ // 限价 是需要价格和数量 可以得到成交金额
+ // 下单量
+ order.setEntrustCnt(amount);
+ // 下单价格
+ order.setEntrustPrice(price);
+ order.setEntrustAmount(amount.multiply(price));
+ }else{
+ if(OrderCoinsEntity.ORDERTYPE_BUY.equals(type)){
+ // 市价 只有金额
+ order.setEntrustAmount(entrustAmount);
+ }else{
+ // 下单量
+ order.setEntrustCnt(amount);
+ // 下单价格
+ order.setEntrustPrice(price);
+ order.setEntrustAmount(amount.multiply(price));
+ }
+
+ }
+ orderCoinsDao.insert(order);
+ // 加入到撮合
+ CoinTrader trader = factory.getTrader(symbol);
+ trader.trade(order);
+ }
}
diff --git a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
index ca6dc28..8e1423f 100644
--- a/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
+++ b/src/main/java/com/xcong/excoin/processor/DefaultCoinProcessor.java
@@ -62,7 +62,7 @@
calendar.set(Calendar.HOUR_OF_DAY, 0);
long firstTimeOfToday = calendar.getTimeInMillis();
String period = "1min";
- logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
+ //logger.info("initializeThumb from {} to {}", firstTimeOfToday, nowTime);
List<Candlestick> lines = service.findAllKLine(this.symbol, firstTimeOfToday, nowTime, period);
coinThumb = new CoinThumb();
synchronized (coinThumb) {
@@ -202,7 +202,7 @@
//处理K线
processTrade(currentKLine, exchangeTrade);
//处理今日概况信息
- logger.debug("处理今日概况信息");
+ //logger.debug("处理今日概况信息");
handleThumb(exchangeTrade);
//存储并推送成交信息
handleTradeStorage(exchangeTrade);
@@ -212,7 +212,7 @@
}
public void processTrade(Candlestick kLine, ExchangeTrade exchangeTrade) {
- if (kLine.getClose().compareTo(BigDecimal.ZERO) == 0) {
+ if (kLine.getClose()==null || kLine.getClose().compareTo(BigDecimal.ZERO)==0) {
//第一次设置K线值
kLine.setOpen(exchangeTrade.getPrice());
kLine.setHigh(exchangeTrade.getPrice());
@@ -224,15 +224,19 @@
kLine.setClose(exchangeTrade.getPrice());
}
kLine.setCount(kLine.getCount() + 1);
- kLine.setVolume(kLine.getVolume().add(exchangeTrade.getAmount()));
+ if(kLine.getVolume()==null){
+ kLine.setVolume(BigDecimal.ZERO);
+ }
+ kLine.setAmount(kLine.getVolume().add(exchangeTrade.getAmount()));
BigDecimal turnover = exchangeTrade.getPrice().multiply(exchangeTrade.getAmount());
- // kLine.setTurnover(kLine.getTurnover().add(turnover));
+ kLine.setVolume(kLine.getVolume().add(turnover));
+ //kLine.setTimestamp(System.currentTimeMillis());
}
public void handleTradeStorage(ExchangeTrade exchangeTrade) {
- for (MarketHandler storage : handlers) {
- storage.handleTrade(symbol, exchangeTrade, coinThumb);
- }
+// for (MarketHandler storage : handlers) {
+// storage.handleTrade(symbol, exchangeTrade, coinThumb);
+// }
}
public void handleKLineStorage(Candlestick kLine) {
@@ -243,7 +247,7 @@
}
public void handleThumb(ExchangeTrade exchangeTrade) {
- logger.info("handleThumb symbol = {}", this.symbol);
+ //logger.info("handleThumb symbol = {}", this.symbol);
synchronized (coinThumb) {
if (coinThumb.getOpen().compareTo(BigDecimal.ZERO) == 0) {
//第一笔交易记为开盘价
@@ -265,7 +269,7 @@
coinThumb.setChg(change.divide(coinThumb.getLow(), 4, BigDecimal.ROUND_UP));
}
if ("USDT".equalsIgnoreCase(baseCoin)) {
- logger.info("setUsdRate", exchangeTrade.getPrice());
+ // logger.info("setUsdRate", exchangeTrade.getPrice());
coinThumb.setUsdRate(exchangeTrade.getPrice());
} else {
@@ -304,7 +308,7 @@
calendar.add(field, -range);
String fromTime = df.format(calendar.getTime());
long startTick = calendar.getTimeInMillis();
- System.out.println("time range from " + fromTime + " to " + endTime);
+ //System.out.println("time range from " + fromTime + " to " + endTime);
List<ExchangeTrade> exchangeTrades = service.findTradeByTimeRange(this.symbol, startTick, endTick);
Candlestick kLine = new Candlestick();
@@ -340,7 +344,7 @@
kLine.setLow(coinThumb.getClose());
kLine.setHigh(coinThumb.getClose());
}
- logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
+ //logger.info("generate " + range + rangeUnit + " kline in {},data={}", df.format(new Date(kLine.getTimestamp())), JSON.toJSONString(kLine));
service.saveKLine(symbol,period, kLine);
// 生成一个对应的新K线 后续的交易会更新这个最新K线数据
Candlestick newKline = new Candlestick();
@@ -352,6 +356,8 @@
newKline.setOpen(kLine.getClose());
newKline.setVolume(BigDecimal.ZERO);
newKline.setHigh(kLine.getClose());
+ calendar.add(field, 2*range);
+ newKline.setTimestamp(calendar.getTimeInMillis());
currentKlineMap.put(period,newKline);
// 存储昨日K线
diff --git a/src/main/java/com/xcong/excoin/processor/MarketService.java b/src/main/java/com/xcong/excoin/processor/MarketService.java
index 11c6c56..d47b2c9 100644
--- a/src/main/java/com/xcong/excoin/processor/MarketService.java
+++ b/src/main/java/com/xcong/excoin/processor/MarketService.java
@@ -75,20 +75,20 @@
// Query query = new Query(criteria).with(sort);
//
// return mongoTemplate.find(query,ExchangeTrade.class,"exchange_trade_"+symbol);
- return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeStart));
+ return orderCoinDealDao.selectOrderCoinDealByTime(symbol, new Date(timeStart), new Date(timeEnd));
// return null;
}
public void saveKLine(String symbol, String period, Candlestick kLine) {
// 先获取
- String key = "KINE_" + symbol + "_" + period;
+ String key = "KINE_" + symbol + "/USDT_" + period;
Object data = redisUtils.get(key);
List list = new ArrayList();
if (data != null) {
list = (List) data;
}
list.add(kLine);
- redisUtils.set("KINE_" + symbol + "_" + period, list);
+ redisUtils.set(key, list);
// mongoTemplate.insert(kLine,"exchange_kline_"+symbol+"_"+kLine.getPeriod());
}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
index 91a81fd..9d4215c 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -1,10 +1,12 @@
package com.xcong.excoin.quartz.job;
+import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.huobi.client.SubscriptionClient;
import com.huobi.client.SubscriptionOptions;
import com.huobi.client.model.Candlestick;
import com.huobi.client.model.enums.CandlestickInterval;
+import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao;
import com.xcong.excoin.modules.coin.dao.OrderCoinsDao;
import com.xcong.excoin.modules.coin.entity.OrderCoinsEntity;
import com.xcong.excoin.modules.coin.service.OrderCoinService;
@@ -17,6 +19,7 @@
import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
+import com.xcong.excoin.trade.ExchangeTrade;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
@@ -30,6 +33,7 @@
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* 开启撮合交易
@@ -44,12 +48,14 @@
@Resource
private OrderCoinsDao orderCoinsDao;
+ @Resource
+ private OrderCoinDealDao orderCoinDealDao;
@Resource
private CoinTraderFactory factory;
@Resource
- private OrderCoinService coinService;
+ private RedisUtils redisUtils;
@Resource
private MarketService marketService;
@@ -98,7 +104,21 @@
processor.initializeThumb();
//processor.initializeUsdRate();
processor.setIsHalt(false);
+ List<ExchangeTrade> nekk = orderCoinDealDao.selectOrderCoinDealByTime("NEKK", null, null);
+ processor.process(nekk);
+ String symbolUsdt = symbol;
+ if(!symbol.contains("USDT")){
+ symbolUsdt = symbol+"/USDT";
+ }
+ String key = "NEW_KINE_{}";
+ key = StrUtil.format(key, symbolUsdt);
+ Object o = redisUtils.get(key);
+ if(o!=null){
+ Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
+ ((DefaultCoinProcessor) processor).setCurrentKlineMap(currentKlineMap);
+ }
processorFactory.addProcessor(symbol, processor);
+
}
}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
index 946af01..5972de9 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -1,10 +1,16 @@
package com.xcong.excoin.quartz.job;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
import com.huobi.client.model.Candlestick;
+import com.xcong.excoin.modules.coin.dao.OrderCoinDealDao;
+import com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity;
+import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.processor.CoinProcessorFactory;
import com.xcong.excoin.trade.TradePlateModel;
import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.websocket.CandlestickModel;
+import com.xcong.excoin.websocket.NewCandlestick;
import com.xcong.excoin.websocket.TradePlateSendWebSocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -13,9 +19,11 @@
import javax.annotation.Resource;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
+import java.util.Random;
/**
* 生成各时间段的K线信息
@@ -28,45 +36,41 @@
private CoinProcessorFactory processorFactory;
@Resource
- private TradePlateSendWebSocket plateSendWebSocket;
+ private OrderCoinService orderCoinService;
- @Resource
- private RedisUtils redisUtils;
- @Scheduled(cron = "0/10 * * * * *")
- public void tradePlate(){
- redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20));
- Candlestick candlestick = new Candlestick();
- candlestick.setOpen(new BigDecimal("10.33"));
- candlestick.setHigh(new BigDecimal("15.23"));
- candlestick.setVolume(new BigDecimal("12121.34"));
- candlestick.setLow(new BigDecimal("8.234"));
- candlestick.setAmount(new BigDecimal("1199"));
- candlestick.setTimestamp(1599840000);
- candlestick.setId(1599840000L);
- candlestick.setCount(100002);
- candlestick.setClose(new BigDecimal("12.2323"));
- //redisUtils.set("NEKK/USDT",candlestick);
- // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
- TradePlateModel tradePlateModel = new TradePlateModel();
- List<BigDecimal> buy;
- List<BigDecimal> sell;
- for(int i=0;i<5;i++){
- buy = new ArrayList<>(2);
- buy.add(new BigDecimal(Math.random()*i));
- buy.add(new BigDecimal(Math.random()*i));
- tradePlateModel.getBuy().add(buy);
- sell = new ArrayList<>(2);
- sell.add(new BigDecimal(Math.random()*i*2));
- sell.add(new BigDecimal(Math.random()*i*2));
- tradePlateModel.getSell().add(sell);
+
+ @Scheduled(cron = "0/1 * * * * *")
+ public void test(){
+// for(int i=1;i<=2;i++){
+// OrderCoinsDealEntity detail = new OrderCoinsDealEntity();
+// detail.setMemberId(13L);
+// //detail.setOrderId(111);
+// detail.setOrderNo("tete");
+// detail.setOrderType(1);
+// detail.setTradeType(1);
+// detail.setSymbol("NEKK");
+// detail.setSymbolCnt(new BigDecimal(10));
+// detail.setEntrustPrice(new BigDecimal(10));
+// detail.setDealPrice(new BigDecimal(i*10*Math.random()));
+// detail.setDealAmount(new BigDecimal(50));
+// detail.setFeeAmount(new BigDecimal(1));
+// detail.setOrderStatus(OrderCoinsDealEntity.ORDERSTATUS_DONE);
+// orderCoinDealDao.insert(detail);
+// }
+ Random random = new Random();
+ Integer type = OrderCoinsDealEntity.ORDERTYPE_BUY;
+ Integer tradeType = OrderCoinsDealEntity.TRADETYPE_FIXEDPRICE;
+ int i = random.nextInt(100);
+ if(i==0){
+ i=10;
}
- log.info("准备发送消息");
- plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null);
- plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
- plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
- plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+ BigDecimal price = new BigDecimal(i);
+ orderCoinService.initOrders("NEKK",type,tradeType,price,new BigDecimal(2),null);
+ orderCoinService.initOrders("NEKK",OrderCoinsDealEntity.ORDERTYPE_SELL,tradeType,price,new BigDecimal(2),null);
+
}
+
/**
* 每分钟定时器,处理分钟K线
@@ -75,7 +79,7 @@
public void handle5minKLine(){
Calendar calendar = Calendar.getInstance();
- log.debug("分钟K线:{}",calendar.getTime());
+ //log.debug("分钟K线:{}",calendar.getTime());
//将秒、微秒字段置为0
calendar.set(Calendar.SECOND,0);
calendar.set(Calendar.MILLISECOND,0);
@@ -84,9 +88,10 @@
int hour = calendar.get(Calendar.HOUR_OF_DAY);
processorFactory.getProcessorMap().forEach((symbol,processor)->{
if(!processor.isStopKline()) {
- log.debug("生成{}分钟k线:{}",symbol);
+ //log.debug("生成{}分钟k线:{}",symbol);
//生成1分钟K线
processor.autoGenerate();
+ processor.generateKLine(1, Calendar.MINUTE, time);
//更新24H成交量
processor.update24HVolume(time);
if(minute%5 == 0) {
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
index 9c9793a..df5900f 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -7,9 +7,13 @@
import com.xcong.excoin.modules.coin.service.OrderCoinService;
import com.xcong.excoin.modules.exchange.service.HandleKlineService;
import com.xcong.excoin.trade.ExchangeTrade;
+import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
+import com.xcong.excoin.websocket.CandlestickModel;
+import com.xcong.excoin.websocket.NewCandlestick;
import com.xcong.excoin.websocket.TradePlateSendWebSocket;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.collections.CollectionUtils;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@@ -45,7 +49,7 @@
*/
@RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
public void tradePlate(String content) {
- log.info("#盘口信息消费者---->{}#", content);
+ //log.info("#盘口信息消费者---->{}#", content);
tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null);
}
@@ -55,7 +59,7 @@
*/
@RabbitListener(queues = RabbitMqConfig.QUEUE_HANDLE_TRADE)
public void handleTradeExchange(String content) {
- log.info("#---->{}#", content);
+ // log.info("#处理订单---->{}#", content);
List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
// 去掉空的 暂时这样
Iterator<ExchangeTrade> iterator = exchangeTrades.iterator();
@@ -63,6 +67,9 @@
if(iterator.next()==null){
iterator.remove();
}
+ }
+ if(CollectionUtils.isEmpty(exchangeTrades)){
+ return;
}
// 处理K线 并更新最新价
handleKlineService.handleExchangeOrderToKline(exchangeTrades);
@@ -77,8 +84,33 @@
Object o = redisUtils.get(key);
Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick> )o;
Set<Map.Entry<String, Candlestick>> entries = currentKlineMap.entrySet();
+
+
for(Map.Entry<String, Candlestick> map : entries){
- tradePlateSendWebSocket.sendMessageKline(symbolUsdt,map.getKey(),JSONObject.toJSONString(map.getValue()),null);
+ String ch = "market.{}.kline.{}";
+ Candlestick value = map.getValue();
+ String key1 = map.getKey();
+ String chKey = key1;
+ if(key1.equals("1hour")){
+ chKey = "60min";
+ }
+ // 转换
+ NewCandlestick newCandlestick= new NewCandlestick();
+ String nekkusdt = CoinTypeConvert.convertReverse(symbolUsdt);
+ ch = StrUtil.format(ch, nekkusdt,chKey);
+ newCandlestick.setCh(ch);
+ CandlestickModel model = new CandlestickModel();
+ model.setVol(value.getVolume());
+ model.setLow(value.getLow());
+ model.setOpen(value.getOpen());
+ model.setHigh(value.getHigh());
+ model.setCount(value.getCount());
+ model.setAmount(value.getAmount());
+ model.setId(value.getTimestamp()/1000);
+ model.setTimestamp(value.getTimestamp()/1000);
+ model.setClose(value.getClose());
+ newCandlestick.setTick(model);
+ tradePlateSendWebSocket.sendMessageKline(symbolUsdt,key1,JSONObject.toJSONString(newCandlestick),null);
}
// 处理用户订单
orderCoinService.handleOrder(exchangeTrades);
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
index 42e67d4..0d08c61 100644
--- a/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/ExchangeProducer.java
@@ -37,9 +37,9 @@
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- log.info("#----->{}#", correlationData);
+ //log.info("#----->{}#", correlationData);
if (ack) {
- log.info("success");
+ //log.info("success");
} else {
log.info("--->{}", cause);
}
diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
index ca79fe9..fc4a8fc 100644
--- a/src/main/java/com/xcong/excoin/trade/CoinTrader.java
+++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -53,7 +53,7 @@
* 初始化交易线程
*/
public void initialize() {
- logger.info("init CoinTrader for symbol {}", symbol);
+ //logger.info("init CoinTrader for symbol {}", symbol);
//买单队列价格降序排列
buyLimitPriceQueue = new TreeMap<>(Comparator.reverseOrder());
//卖单队列价格升序排列
@@ -107,7 +107,7 @@
if (exchangeOrder.getTradeType() != OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
return;
}
- logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
+ //logger.info("addMarketPriceOrder,orderId = {}", exchangeOrder.getId());
LinkedList<OrderCoinsEntity> list = exchangeOrder.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY ? buyMarketQueue : sellMarketQueue;
synchronized (list) {
list.addLast(exchangeOrder);
@@ -136,7 +136,7 @@
}
//logger.info("trade order={}",exchangeOrder);
if (!symbol.equalsIgnoreCase(exchangeOrder.getSymbol())) {
- logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
+ //logger.info("unsupported symbol,coin={},base={}", exchangeOrder.getSymbol(), "USDT");
return;
}
// 如果
@@ -203,7 +203,10 @@
OrderCoinsEntity matchOrder = orderIterator.next();
//处理匹配
ExchangeTrade trade = processMatch(focusedOrder, matchOrder);
- exchangeTrades.add(trade);
+ if(trade!=null){
+ exchangeTrades.add(trade);
+ }
+
//判断匹配单是否完成
if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
//当前匹配的订单完成交易,删除该订单
@@ -304,7 +307,7 @@
if (trade != null) {
exchangeTrades.add(trade);
}
- //判断匹配单是否完成
+ //判断匹配单是否完成 TODO
if (matchOrder.getOrderStatus() == OrderCoinsEntity.ORDERSTATUS_DONE) {
//当前匹配的订单完成交易,删除该订单
orderIterator.remove();
@@ -364,12 +367,12 @@
*/
private BigDecimal adjustMarketOrderTurnover(OrderCoinsEntity order, BigDecimal dealPrice) {
if (order.getOrderType() == OrderCoinsEntity.ORDERTYPE_BUY && order.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
-// BigDecimal leftTurnover = order.getAmount().subtract(order.getTurnover());
-// if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
-// .compareTo(BigDecimal.ZERO)==0){
-// order.setTurnover(order.getAmount());
-// return leftTurnover;
-// }
+ BigDecimal leftTurnover = order.getEntrustAmount().subtract(order.getDealAmount());
+ if(leftTurnover.divide(dealPrice,coinScale,BigDecimal.ROUND_DOWN)
+ .compareTo(BigDecimal.ZERO)==0){
+ //order.setDealAmount(order.getEntrustAmount());
+ return leftTurnover;
+ }
}
return BigDecimal.ZERO;
}
@@ -400,7 +403,7 @@
availAmount = calculateTradedAmount(matchOrder, dealPrice);
//计算成交量 取少的
BigDecimal tradedAmount = (availAmount.compareTo(needAmount) >= 0 ? needAmount : availAmount);
- logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
+ //logger.info("dealPrice={},amount={}", dealPrice, tradedAmount);
//如果成交额为0说明剩余额度无法成交,退出
if (tradedAmount.compareTo(BigDecimal.ZERO) == 0) {
return null;
@@ -415,6 +418,21 @@
focusedOrder.setDealCnt(focusedOrder.getDealCnt().add(tradedAmount));
// 用户单成交金额
focusedOrder.setDealAmount(focusedOrder.getDealAmount().add(turnover));
+
+ // 判断两个单是否完成
+ if(matchOrder.getEntrustAmount()!=null && matchOrder.getEntrustAmount().compareTo(matchOrder.getDealAmount())<=0){
+ matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+ }
+ if(matchOrder.getEntrustCnt()!=null && matchOrder.getEntrustCnt().compareTo(matchOrder.getDealCnt())<=0){
+ matchOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+ }
+
+ if(focusedOrder.getEntrustAmount()!=null && focusedOrder.getEntrustAmount().compareTo(focusedOrder.getDealAmount())<=0){
+ focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+ }
+ if(focusedOrder.getEntrustCnt()!=null && focusedOrder.getEntrustCnt().compareTo(focusedOrder.getDealCnt())<=0){
+ focusedOrder.setOrderStatus(OrderCoinsEntity.ORDERSTATUS_DONE);
+ }
//创建成交记录
ExchangeTrade exchangeTrade = new ExchangeTrade();
@@ -471,9 +489,6 @@
//orderCoinService.handleOrder(subTrades);
}
} else {
- trades.forEach(e -> {
- System.out.println(e);
- });
exchangeProducer.sendHandleTrade(JSON.toJSONString(trades));
//orderCoinService.handleOrder(trades);
// kafkaTemplate.send("exchange-trade", JSON.toJSONString(trades));
@@ -551,13 +566,57 @@
}
/**
+ * 发送盘口变化消息
+ *
+ * @param
+ */
+ public String sendTradePlateMessage() {
+ //防止并发引起数组越界,造成盘口倒挂 TODO
+ List<List<BigDecimal>> plate;
+ List<BigDecimal> plateItem;
+ TradePlateModel tradePlateModel = new TradePlateModel();
+ // 转换格式
+ if (buyTradePlate != null && buyTradePlate.getItems() != null) {
+ plate = new ArrayList<>();
+ LinkedList<TradePlateItem> items = buyTradePlate.getItems();
+ for (TradePlateItem item : items) {
+ plateItem = new ArrayList<>(2);
+ BigDecimal price = item.getPrice();
+ BigDecimal amount = item.getAmount();
+ plateItem.add(price);
+ plateItem.add(amount);
+ plate.add(plateItem);
+ }
+ tradePlateModel.setBuy(plate);
+ }
+
+ if (sellTradePlate != null && sellTradePlate.getItems() != null) {
+ plate = new ArrayList<>();
+ LinkedList<TradePlateItem> items = sellTradePlate.getItems();
+ for (TradePlateItem item : items) {
+ plateItem = new ArrayList<>(2);
+ BigDecimal price = item.getPrice();
+ BigDecimal amount = item.getAmount();
+ plateItem.add(price);
+ plateItem.add(amount);
+ plate.add(plateItem);
+ }
+ tradePlateModel.setSell(plate);
+ }
+
+ // 盘口发生变化通知TODO
+ return JSON.toJSONString(tradePlateModel);
+ //exchangeProducer.sendPlateMsg(JSON.toJSONString(tradePlateModel));
+ }
+
+ /**
* 取消委托订单
*
* @param exchangeOrder
* @return
*/
public OrderCoinsEntity cancelOrder(OrderCoinsEntity exchangeOrder) {
- logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
+ //logger.info("cancelOrder,orderId={}", exchangeOrder.getId());
if (exchangeOrder.getTradeType() == OrderCoinsEntity.TRADETYPE_MARKETPRICE) {
//处理市价单
Iterator<OrderCoinsEntity> orderIterator;
diff --git a/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
index ea56d87..a73611c 100644
--- a/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
+++ b/src/main/java/com/xcong/excoin/trade/ExchangeTrade.java
@@ -5,6 +5,7 @@
import java.io.Serializable;
import java.math.BigDecimal;
+import java.util.Date;
/**
* 撮合交易信息
diff --git a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
index e9ff270..a32f0f9 100644
--- a/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
+++ b/src/main/java/com/xcong/excoin/utils/CoinTypeConvert.java
@@ -29,6 +29,17 @@
}
}
+ public static String convertReverse(String symbol) {
+ switch (symbol) {
+ case "BTC/USDT":
+ return "btcusdt";
+ case "NEKK/USDT":
+ return "nekkusdt";
+ default:
+ return null;
+ }
+ }
+
public static String convertToKey(String symbol) {
switch (symbol) {
case "BTC/USDT":
diff --git a/src/main/java/com/xcong/excoin/websocket/CandlestickModel.java b/src/main/java/com/xcong/excoin/websocket/CandlestickModel.java
new file mode 100644
index 0000000..b1229d1
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/websocket/CandlestickModel.java
@@ -0,0 +1,18 @@
+package com.xcong.excoin.websocket;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+public class CandlestickModel {
+ private Long id;
+ private BigDecimal amount;
+ private long count;
+ private BigDecimal open;
+ private BigDecimal close;
+ private BigDecimal low;
+ private BigDecimal high;
+ private BigDecimal vol;
+ private long timestamp;
+}
diff --git a/src/main/java/com/xcong/excoin/websocket/CandlestickResult.java b/src/main/java/com/xcong/excoin/websocket/CandlestickResult.java
new file mode 100644
index 0000000..bd7b605
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/websocket/CandlestickResult.java
@@ -0,0 +1,11 @@
+package com.xcong.excoin.websocket;
+
+import lombok.Data;
+
+import java.util.List;
+
+@Data
+public class CandlestickResult {
+ private String rep;
+ private List<CandlestickModel> data;
+}
diff --git a/src/main/java/com/xcong/excoin/websocket/NewCandlestick.java b/src/main/java/com/xcong/excoin/websocket/NewCandlestick.java
new file mode 100644
index 0000000..04ba10c
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/websocket/NewCandlestick.java
@@ -0,0 +1,9 @@
+package com.xcong.excoin.websocket;
+
+import lombok.Data;
+
+@Data
+public class NewCandlestick {
+ private String ch;
+ private CandlestickModel tick;
+}
diff --git a/src/main/java/com/xcong/excoin/websocket/SubResultModel.java b/src/main/java/com/xcong/excoin/websocket/SubResultModel.java
new file mode 100644
index 0000000..855a3cc
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/websocket/SubResultModel.java
@@ -0,0 +1,10 @@
+package com.xcong.excoin.websocket;
+
+import lombok.Data;
+
+@Data
+public class SubResultModel {
+ private String id;
+ private String status = "ok";
+ private String subbed;
+}
diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
index bb8bcb3..15d4168 100644
--- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
+++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -2,8 +2,11 @@
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
+import com.huobi.client.model.Candlestick;
import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.utils.CoinTypeConvert;
import com.xcong.excoin.utils.RedisUtils;
import com.xcong.excoin.utils.SpringContextHolder;
@@ -15,9 +18,8 @@
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,12 +55,19 @@
@OnClose
public void onClose(Session session) {
onlineCount.decrementAndGet(); // 在线数减1
-// Collection<Map<String, Session>> values = tradeplateClients.values();
-// if(CollectionUtils.isNotEmpty(values)){
-// for(Map<String,Session> map : values){
-// map.remove(session.getId());
-// }
-// }
+ Collection<Map<String, Session>> values = tradeplateClients.values();
+ if(CollectionUtils.isNotEmpty(values)){
+ for(Map<String,Session> map : values){
+ map.remove(session.getId());
+ }
+ }
+
+ Collection<Map<String, Session>> klineClientsValues = klineClients.values();
+ if(CollectionUtils.isNotEmpty(klineClientsValues)){
+ for(Map<String,Session> map : klineClientsValues){
+ map.remove(session.getId());
+ }
+ }
log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
}
@@ -69,13 +78,13 @@
*/
@OnMessage
public void onMessage(String message, Session session) {
- log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
+ //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
// 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
//}
JSONObject jsonObject = JSON.parseObject(message);
// 盘口的判断
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
- log.info("订阅盘口消息:{}", session.getId());
+ //log.info("订阅盘口消息:{}", session.getId());
String sub = jsonObject.get("sub").toString();
String symbol = sub.split("\\.")[1];
symbol = CoinTypeConvert.convert(symbol);
@@ -86,11 +95,34 @@
map.put(session.getId(), session);
tradeplateClients.put(symbol, map);
}
+ // 发送一次盘口
+ CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class);
+ // 发送订阅消息
+ String nekk = factory.getTrader("NEKK").sendTradePlateMessage();
+ SubResultModel subResultModel = new SubResultModel();
+ subResultModel.setId("nekkusdt");
+ subResultModel.setSubbed(sub);
+ synchronized (session){
+ try {
+ session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel));
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ synchronized (session){
+ try {
+ session.getBasicRemote().sendText(nekk);
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+
+
}
// 取消盘口订阅
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
// `market.${symbol}.kline.${strPeriod}
- log.info("取消订阅盘口消息:{}", session.getId());
+ //log.info("取消订阅盘口消息:{}", session.getId());
String unsub = jsonObject.get("unsub").toString();
String[] split = unsub.split("\\.");
String symbol = split[1];
@@ -112,38 +144,48 @@
// 取消订阅 {unsub: xxx(标识)}
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
// 订阅
- log.info("订阅最新K线消息:{}", session.getId());
String sub = jsonObject.get("sub").toString();
+ log.info("订阅最新K线消息:{}", sub);
String[] split = sub.split("\\.");
String symbol = split[1];
symbol = CoinTypeConvert.convert(symbol);
String period = split[3];
+ if("60min".equals(period)){
+ period = "1hour";
+ }
String key = symbol + "-" + period;
+ log.info("最新K线key:{}", key);
if (klineClients.containsKey(key)) {
// 有这个币种K线
Map<String, Session> stringSessionMap = klineClients.get(key);
if (!stringSessionMap.containsKey(session.getId())) {
stringSessionMap.put(session.getId(), session);
+ log.info("放入最新K线Map:{}", key);
}
} else {
Map<String, Session> stringSessionMap = new HashMap<>();
stringSessionMap.put(session.getId(), session);
klineClients.put(key, stringSessionMap);
+ log.info("放入最新K线Map:{}", key);
}
}
// 取消订阅
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
// `market.${symbol}.kline.${strPeriod}
- log.info("取消订阅最新K消息:{}", session.getId());
+ //log.info("取消订阅最新K消息:{}", session.getId());
String unsub = jsonObject.get("unsub").toString();
String[] split = unsub.split("\\.");
String strPeriod = split[3];
+ if("60min".equals(strPeriod)){
+ strPeriod = "1hour";
+ }
String symbol = split[1];
symbol = CoinTypeConvert.convert(symbol);
String key = symbol + "-" + strPeriod;
if (klineClients.containsKey(key)) {
klineClients.get(key).remove(session.getId());
+ //session.getAsyncRemote().sendText(message);
}
}
@@ -155,14 +197,38 @@
String symbol = split[1];
symbol = CoinTypeConvert.convert(symbol);
String period = split[3];
+ if("60min".equals(period)){
+ period = "1hour";
+ }
//String key = symbol+"-"+period;
// String key = "KINE_BCH/USDT_1week";
String key = "KINE_{}_{}";
// 币币k线数据
- key = StrUtil.format(key, symbol, period);
+ //key = StrUtil.format(key, symbol, period);
+ key = StrUtil.format(key, "NEKK/USDT", period);
RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
Object o = bean.get(key);
- sendMessageHistory(JSON.toJSONString(o), session);
+ List<CandlestickModel> candlestickModels = new ArrayList<>();
+ CandlestickResult result = new CandlestickResult();
+ result.setRep(sub);
+ if(o!=null){
+ List<Candlestick> list = (List<Candlestick>)o;
+ for(Candlestick candlestick : list){
+ CandlestickModel model = new CandlestickModel();
+ model.setAmount(candlestick.getAmount());
+ model.setClose(candlestick.getClose());
+ model.setCount(candlestick.getCount());
+ model.setHigh(candlestick.getHigh());
+ model.setId(candlestick.getTimestamp()/1000);
+ model.setOpen(candlestick.getOpen());
+ model.setLow(candlestick.getLow());
+ model.setVol(candlestick.getVolume());
+ candlestickModels.add(model);
+ }
+ result.setData(candlestickModels);
+ }
+
+ sendMessageHistory(JSON.toJSONString(result), session);
}
}
@@ -184,7 +250,7 @@
Session toSession = sessionEntry.getValue();
// 排除掉自己
//if (!fromSession.getId().equals(toSession.getId())) {
- log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
+ // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
boolean open = toSession.isOpen();
if (open) {
toSession.getAsyncRemote().sendText(message);
@@ -197,7 +263,7 @@
}
public void sendMessageHistory(String message, Session toSession) {
- log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+ // log.info("服务端给客户端[{}]发送历史K线", toSession.getId());
boolean open = toSession.isOpen();
if (open) {
toSession.getAsyncRemote().sendText(message);
@@ -205,19 +271,18 @@
}
public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
+
String key = symbol + "-" + period;
+ //log.info("发送最新K线[{}],数据[{}]",key,message);
if (klineClients.containsKey(key)) {
Map<String, Session> stringSessionMap = klineClients.get(key);
for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
Session toSession = sessionEntry.getValue();
- // 排除掉自己
- //if (!fromSession.getId().equals(toSession.getId())) {
- log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
boolean open = toSession.isOpen();
if (open) {
+ log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message);
toSession.getAsyncRemote().sendText(message);
}
- // }
}
}
}
diff --git a/src/main/resources/application-test.yml b/src/main/resources/application-test.yml
index 45a0feb..a21709a 100644
--- a/src/main/resources/application-test.yml
+++ b/src/main/resources/application-test.yml
@@ -91,16 +91,17 @@
app:
- debug: false
+ debug: true
redis_expire: 3000
kline-update-job: false
- newest-price-update-job: true
+ newest-price-update-job: false
#日线 该任务不能与最新价处于同一个服务器
- day-line: true
- other-job: true
- loop-job: true
+ trade: false
+ day-line: false
+ other-job: false
+ loop-job: false
rabbit-consumer: true
- block-job: true
+ block-job: false
aliyun:
oss:
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index d8cae7a..a2ed99e 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -5,7 +5,7 @@
spring:
profiles:
- active: dev
+ active: test
datasource:
url: jdbc:mysql://rm-bp151tw8er79ig9kb5o.mysql.rds.aliyuncs.com:3306/db_biue?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2b8
username: ctcoin_data
diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
index 303667f..5277c94 100644
--- a/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
+++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinDealDao.xml
@@ -20,12 +20,15 @@
deal_amount buyTurnover,
deal_amount sellTurnover,
order_type direction,
- create_time time
+ UNIX_TIMESTAMP(create_time) time
from coins_order_deal
where symbol = #{symbol}
and order_type = 1
and order_status = 3
- and create_time between #{startTime} and #{endTime}
+ <if test="startTime != null and endTime != null">
+ and create_time between #{startTime} and #{endTime}
+ </if>
+
</select>
<select id="selectAllWalletCoinOrderBySymbol"
resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsDealEntity">
--
Gitblit v1.9.1