From 74ca5bc0f40e3b91464c8972392271d24dd5f066 Mon Sep 17 00:00:00 2001
From: zainali5120 <512061637@qq.com>
Date: Mon, 14 Sep 2020 11:05:48 +0800
Subject: [PATCH] 撮合交易代码提交
---
src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java | 4 +
src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 8 +++-
src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java | 11 ++++-
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 12 ++++--
src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml | 18 +++++++++
src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java | 10 ++++-
src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 17 ++++----
src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 12 +++++-
src/main/java/com/xcong/excoin/trade/CoinTrader.java | 10 ++--
9 files changed, 76 insertions(+), 26 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
index beed401..12f59a0 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java
@@ -15,5 +15,7 @@
OrderCoinsEntity findWalletCoinOrderByOrderNo(@Param("orderNo")String orderNo);
- List<OrderCoinsEntity> selectAllEntrustingCoinOrderList();
+ List<OrderCoinsEntity> selectAllEntrustingCoinOrderList(List<String> list);
+
+ List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list);
}
diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
index 200f4a7..07a9886 100644
--- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java
@@ -744,7 +744,9 @@
@Override
@Transactional(rollbackFor = Exception.class)
public void dealEntrustCoinOrder() {
- List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList();
+ List<String> ignoreTypes = new ArrayList<>();
+ ignoreTypes.add("NEKK");
+ List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList(ignoreTypes);
if (CollUtil.isNotEmpty(list)) {
for (OrderCoinsEntity orderCoinsEntity : list) {
BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(orderCoinsEntity.getSymbol() + "/USDT")));
@@ -813,7 +815,9 @@
public void handleOrder(List<ExchangeTrade> trades){
// 处理撮合交易的订单
for(ExchangeTrade exchangeTrade : trades){
-
+ if(exchangeTrade==null){
+ continue;
+ }
BigDecimal amount = exchangeTrade.getAmount();
Long buyOrderId = exchangeTrade.getBuyOrderId();
BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
index 7add4f8..e61a588 100644
--- a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
+++ b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java
@@ -39,10 +39,14 @@
CoinProcessor processor = processorFactory.getProcessor(symbol);
Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap();
Collection<Candlestick> values = currentKlineMap.values();
- BigDecimal newPrice = trades.get(trades.size()-1).getPrice();
+ BigDecimal newPrice =null;
for (Candlestick candlestick : values) {
for (ExchangeTrade exchangeTrade : trades) {
+ if(exchangeTrade==null){
+ continue;
+ }
processor.processTrade(candlestick, exchangeTrade);
+ newPrice=exchangeTrade.getPrice();
}
}
@@ -51,6 +55,9 @@
key = StrUtil.format(key, symbolUsdt);
redisUtils.set(key,currentKlineMap);
// 更新最新价
- redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
+ if(newPrice!=null){
+ redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice);
+ }
+
}
}
diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
index 90a3e9c..91a81fd 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java
@@ -14,6 +14,7 @@
import com.xcong.excoin.processor.DefaultCoinProcessor;
import com.xcong.excoin.processor.MarketService;
import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService;
+import com.xcong.excoin.rabbit.producer.ExchangeProducer;
import com.xcong.excoin.trade.CoinTrader;
import com.xcong.excoin.trade.CoinTraderFactory;
import com.xcong.excoin.utils.CoinTypeConvert;
@@ -56,12 +57,15 @@
@Resource
private CoinProcessorFactory processorFactory;
+ @Resource
+ ExchangeProducer exchangeProducer;
+
@PostConstruct
public void initCoinTrade() {
log.info("#=======撮合交易器开启=======#");
String symbol = "NEKK";
CoinTrader newTrader = new CoinTrader(symbol);
- newTrader.setOrderCoinService(coinService);
+ newTrader.setExchangeProducer(exchangeProducer);
//newTrader.setKafkaTemplate(kafkaTemplate);
//newTrader.setBaseCoinScale(coin.getBaseCoinScale());
//newTrader.setCoinScale(coin.getCoinScale());
@@ -70,7 +74,9 @@
// 创建成功以后需要对未处理订单预处理
log.info("======CoinTrader Process: " + symbol + "======");
- List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList();
+ List<String> symbolList = new ArrayList<>();
+ symbolList.add(symbol);
+ List<OrderCoinsEntity> orders = orderCoinsDao.selectCoinOrderOnTrade(symbolList);
List<OrderCoinsEntity> tradingOrders = new ArrayList<>();
List<OrderCoinsEntity> completedOrders = new ArrayList<>();
orders.forEach(order -> {
diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
index 110c1fc..946af01 100644
--- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
+++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java
@@ -33,9 +33,9 @@
@Resource
private RedisUtils redisUtils;
- @Scheduled(cron = "0/2 * * * * *")
+ @Scheduled(cron = "0/10 * * * * *")
public void tradePlate(){
- redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()));
+ redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20));
Candlestick candlestick = new Candlestick();
candlestick.setOpen(new BigDecimal("10.33"));
candlestick.setHigh(new BigDecimal("15.23"));
@@ -46,7 +46,7 @@
candlestick.setId(1599840000L);
candlestick.setCount(100002);
candlestick.setClose(new BigDecimal("12.2323"));
- redisUtils.set("NEKK/USDT",candlestick);
+ //redisUtils.set("NEKK/USDT",candlestick);
// [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]]
TradePlateModel tradePlateModel = new TradePlateModel();
List<BigDecimal> buy;
@@ -61,11 +61,11 @@
sell.add(new BigDecimal(Math.random()*i*2));
tradePlateModel.getSell().add(sell);
}
-
- plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null);
- plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
- plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
- plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+ log.info("准备发送消息");
+ plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null);
+ plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null);
+ plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
+ plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null);
}
/**
@@ -156,4 +156,5 @@
}
});
}
+
}
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
index 9b03546..9c9793a 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java
@@ -14,6 +14,7 @@
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -44,8 +45,8 @@
*/
@RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE)
public void tradePlate(String content) {
- log.info("#---->{}#", content);
- tradePlateSendWebSocket.sendMessagePlate(content,null);
+ log.info("#盘口信息消费者---->{}#", content);
+ tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null);
}
/**
@@ -56,6 +57,13 @@
public void handleTradeExchange(String content) {
log.info("#---->{}#", content);
List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class);
+ // 去掉空的 暂时这样
+ Iterator<ExchangeTrade> iterator = exchangeTrades.iterator();
+ while (iterator.hasNext()){
+ if(iterator.next()==null){
+ iterator.remove();
+ }
+ }
// 处理K线 并更新最新价
handleKlineService.handleExchangeOrderToKline(exchangeTrades);
// 推送最新K线
diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
index 46289af..ca79fe9 100644
--- a/src/main/java/com/xcong/excoin/trade/CoinTrader.java
+++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java
@@ -17,7 +17,6 @@
public class CoinTrader {
private String symbol;
private ExchangeProducer exchangeProducer;
- private OrderCoinService orderCoinService;
//交易币种的精度
private int coinScale = 4;
//基币的精度
@@ -752,11 +751,12 @@
return count;
}
- public OrderCoinService getOrderCoinService() {
- return orderCoinService;
+
+ public ExchangeProducer getExchangeProducer() {
+ return exchangeProducer;
}
- public void setOrderCoinService(OrderCoinService orderCoinService) {
- this.orderCoinService = orderCoinService;
+ public void setExchangeProducer(ExchangeProducer exchangeProducer) {
+ this.exchangeProducer = exchangeProducer;
}
}
diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
index b955f11..bb8bcb3 100644
--- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
+++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -75,6 +75,7 @@
JSONObject jsonObject = JSON.parseObject(message);
// 盘口的判断
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
+ log.info("订阅盘口消息:{}", session.getId());
String sub = jsonObject.get("sub").toString();
String symbol = sub.split("\\.")[1];
symbol = CoinTypeConvert.convert(symbol);
@@ -89,6 +90,7 @@
// 取消盘口订阅
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
// `market.${symbol}.kline.${strPeriod}
+ log.info("取消订阅盘口消息:{}", session.getId());
String unsub = jsonObject.get("unsub").toString();
String[] split = unsub.split("\\.");
String symbol = split[1];
@@ -110,6 +112,7 @@
// 取消订阅 {unsub: xxx(标识)}
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
// 订阅
+ log.info("订阅最新K线消息:{}", session.getId());
String sub = jsonObject.get("sub").toString();
String[] split = sub.split("\\.");
String symbol = split[1];
@@ -132,6 +135,7 @@
// 取消订阅
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
// `market.${symbol}.kline.${strPeriod}
+ log.info("取消订阅最新K消息:{}", session.getId());
String unsub = jsonObject.get("unsub").toString();
String[] split = unsub.split("\\.");
String strPeriod = split[3];
@@ -173,14 +177,14 @@
*
* @param message 消息内容
*/
- public void sendMessagePlate(String message, Session fromSession) {
- if (tradeplateClients.containsKey("nekkusdt")) {
- Map<String, Session> nekk = tradeplateClients.get("nekkusdt");
+ public void sendMessagePlate(String symbol,String message, Session fromSession) {
+ if (tradeplateClients.containsKey(symbol)) {
+ Map<String, Session> nekk = tradeplateClients.get(symbol);
for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
Session toSession = sessionEntry.getValue();
// 排除掉自己
//if (!fromSession.getId().equals(toSession.getId())) {
- log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
+ log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
boolean open = toSession.isOpen();
if (open) {
toSession.getAsyncRemote().sendText(message);
diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
index b844a93..aa02fee 100644
--- a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
+++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml
@@ -25,5 +25,23 @@
select *
from coins_order
where order_status=1
+ <if test="list != null">
+ and symbol not in
+ <foreach collection="list" separator="," item="item" open="(" close=")">
+ #{item}
+ </foreach>
+ </if>
+ </select>
+
+ <select id="selectCoinOrderOnTrade" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
+ select *
+ from coins_order
+ where order_status=1
+ <if test="list != null">
+ and symbol in
+ <foreach collection="list" separator="," item="item" open="(" close=")">
+ #{item}
+ </foreach>
+ </if>
</select>
</mapper>
--
Gitblit v1.9.1