From 74ca5bc0f40e3b91464c8972392271d24dd5f066 Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Mon, 14 Sep 2020 11:05:48 +0800 Subject: [PATCH] 撮合交易代码提交 --- src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java | 4 + src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java | 8 +++- src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java | 11 ++++- src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 12 ++++-- src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml | 18 +++++++++ src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java | 10 ++++- src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java | 17 ++++---- src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java | 12 +++++- src/main/java/com/xcong/excoin/trade/CoinTrader.java | 10 ++-- 9 files changed, 76 insertions(+), 26 deletions(-) diff --git a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java index beed401..12f59a0 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java +++ b/src/main/java/com/xcong/excoin/modules/coin/dao/OrderCoinsDao.java @@ -15,5 +15,7 @@ OrderCoinsEntity findWalletCoinOrderByOrderNo(@Param("orderNo")String orderNo); - List<OrderCoinsEntity> selectAllEntrustingCoinOrderList(); + List<OrderCoinsEntity> selectAllEntrustingCoinOrderList(List<String> list); + + List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list); } diff --git a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java index 200f4a7..07a9886 100644 --- a/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/coin/service/impl/OrderCoinServiceImpl.java @@ -744,7 +744,9 @@ @Override @Transactional(rollbackFor = Exception.class) public void dealEntrustCoinOrder() { - List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList(); + List<String> ignoreTypes = new ArrayList<>(); + ignoreTypes.add("NEKK"); + List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList(ignoreTypes); if (CollUtil.isNotEmpty(list)) { for (OrderCoinsEntity orderCoinsEntity : list) { BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(orderCoinsEntity.getSymbol() + "/USDT"))); @@ -813,7 +815,9 @@ public void handleOrder(List<ExchangeTrade> trades){ // 处理撮合交易的订单 for(ExchangeTrade exchangeTrade : trades){ - + if(exchangeTrade==null){ + continue; + } BigDecimal amount = exchangeTrade.getAmount(); Long buyOrderId = exchangeTrade.getBuyOrderId(); BigDecimal buyTurnover = exchangeTrade.getBuyTurnover(); diff --git a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java index 7add4f8..e61a588 100644 --- a/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java +++ b/src/main/java/com/xcong/excoin/modules/exchange/service/impl/HandleKlineServiceImpl.java @@ -39,10 +39,14 @@ CoinProcessor processor = processorFactory.getProcessor(symbol); Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap(); Collection<Candlestick> values = currentKlineMap.values(); - BigDecimal newPrice = trades.get(trades.size()-1).getPrice(); + BigDecimal newPrice =null; for (Candlestick candlestick : values) { for (ExchangeTrade exchangeTrade : trades) { + if(exchangeTrade==null){ + continue; + } processor.processTrade(candlestick, exchangeTrade); + newPrice=exchangeTrade.getPrice(); } } @@ -51,6 +55,9 @@ key = StrUtil.format(key, symbolUsdt); redisUtils.set(key,currentKlineMap); // 更新最新价 - redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice); + if(newPrice!=null){ + redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice); + } + } } diff --git a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java index 90a3e9c..91a81fd 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/CoinTradeInitJob.java @@ -14,6 +14,7 @@ import com.xcong.excoin.processor.DefaultCoinProcessor; import com.xcong.excoin.processor.MarketService; import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; +import com.xcong.excoin.rabbit.producer.ExchangeProducer; import com.xcong.excoin.trade.CoinTrader; import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.utils.CoinTypeConvert; @@ -56,12 +57,15 @@ @Resource private CoinProcessorFactory processorFactory; + @Resource + ExchangeProducer exchangeProducer; + @PostConstruct public void initCoinTrade() { log.info("#=======撮合交易器开启=======#"); String symbol = "NEKK"; CoinTrader newTrader = new CoinTrader(symbol); - newTrader.setOrderCoinService(coinService); + newTrader.setExchangeProducer(exchangeProducer); //newTrader.setKafkaTemplate(kafkaTemplate); //newTrader.setBaseCoinScale(coin.getBaseCoinScale()); //newTrader.setCoinScale(coin.getCoinScale()); @@ -70,7 +74,9 @@ // 创建成功以后需要对未处理订单预处理 log.info("======CoinTrader Process: " + symbol + "======"); - List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList(); + List<String> symbolList = new ArrayList<>(); + symbolList.add(symbol); + List<OrderCoinsEntity> orders = orderCoinsDao.selectCoinOrderOnTrade(symbolList); List<OrderCoinsEntity> tradingOrders = new ArrayList<>(); List<OrderCoinsEntity> completedOrders = new ArrayList<>(); orders.forEach(order -> { diff --git a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java index 110c1fc..946af01 100644 --- a/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java +++ b/src/main/java/com/xcong/excoin/quartz/job/KLineGeneratorJob.java @@ -33,9 +33,9 @@ @Resource private RedisUtils redisUtils; - @Scheduled(cron = "0/2 * * * * *") + @Scheduled(cron = "0/10 * * * * *") public void tradePlate(){ - redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random())); + redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20)); Candlestick candlestick = new Candlestick(); candlestick.setOpen(new BigDecimal("10.33")); candlestick.setHigh(new BigDecimal("15.23")); @@ -46,7 +46,7 @@ candlestick.setId(1599840000L); candlestick.setCount(100002); candlestick.setClose(new BigDecimal("12.2323")); - redisUtils.set("NEKK/USDT",candlestick); + //redisUtils.set("NEKK/USDT",candlestick); // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]] TradePlateModel tradePlateModel = new TradePlateModel(); List<BigDecimal> buy; @@ -61,11 +61,11 @@ sell.add(new BigDecimal(Math.random()*i*2)); tradePlateModel.getSell().add(sell); } - - plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null); - plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); - plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); - plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); + log.info("准备发送消息"); + plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null); + plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); + plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); + plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); } /** @@ -156,4 +156,5 @@ } }); } + } diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java index 9b03546..9c9793a 100644 --- a/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ExchangeConsumer.java @@ -14,6 +14,7 @@ import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -44,8 +45,8 @@ */ @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) public void tradePlate(String content) { - log.info("#---->{}#", content); - tradePlateSendWebSocket.sendMessagePlate(content,null); + log.info("#盘口信息消费者---->{}#", content); + tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null); } /** @@ -56,6 +57,13 @@ public void handleTradeExchange(String content) { log.info("#---->{}#", content); List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); + // 去掉空的 暂时这样 + Iterator<ExchangeTrade> iterator = exchangeTrades.iterator(); + while (iterator.hasNext()){ + if(iterator.next()==null){ + iterator.remove(); + } + } // 处理K线 并更新最新价 handleKlineService.handleExchangeOrderToKline(exchangeTrades); // 推送最新K线 diff --git a/src/main/java/com/xcong/excoin/trade/CoinTrader.java b/src/main/java/com/xcong/excoin/trade/CoinTrader.java index 46289af..ca79fe9 100644 --- a/src/main/java/com/xcong/excoin/trade/CoinTrader.java +++ b/src/main/java/com/xcong/excoin/trade/CoinTrader.java @@ -17,7 +17,6 @@ public class CoinTrader { private String symbol; private ExchangeProducer exchangeProducer; - private OrderCoinService orderCoinService; //交易币种的精度 private int coinScale = 4; //基币的精度 @@ -752,11 +751,12 @@ return count; } - public OrderCoinService getOrderCoinService() { - return orderCoinService; + + public ExchangeProducer getExchangeProducer() { + return exchangeProducer; } - public void setOrderCoinService(OrderCoinService orderCoinService) { - this.orderCoinService = orderCoinService; + public void setExchangeProducer(ExchangeProducer exchangeProducer) { + this.exchangeProducer = exchangeProducer; } } diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java index b955f11..bb8bcb3 100644 --- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java +++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java @@ -75,6 +75,7 @@ JSONObject jsonObject = JSON.parseObject(message); // 盘口的判断 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { + log.info("订阅盘口消息:{}", session.getId()); String sub = jsonObject.get("sub").toString(); String symbol = sub.split("\\.")[1]; symbol = CoinTypeConvert.convert(symbol); @@ -89,6 +90,7 @@ // 取消盘口订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { // `market.${symbol}.kline.${strPeriod} + log.info("取消订阅盘口消息:{}", session.getId()); String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String symbol = split[1]; @@ -110,6 +112,7 @@ // 取消订阅 {unsub: xxx(标识)} if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { // 订阅 + log.info("订阅最新K线消息:{}", session.getId()); String sub = jsonObject.get("sub").toString(); String[] split = sub.split("\\."); String symbol = split[1]; @@ -132,6 +135,7 @@ // 取消订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { // `market.${symbol}.kline.${strPeriod} + log.info("取消订阅最新K消息:{}", session.getId()); String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String strPeriod = split[3]; @@ -173,14 +177,14 @@ * * @param message 消息内容 */ - public void sendMessagePlate(String message, Session fromSession) { - if (tradeplateClients.containsKey("nekkusdt")) { - Map<String, Session> nekk = tradeplateClients.get("nekkusdt"); + public void sendMessagePlate(String symbol,String message, Session fromSession) { + if (tradeplateClients.containsKey(symbol)) { + Map<String, Session> nekk = tradeplateClients.get(symbol); for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 //if (!fromSession.getId().equals(toSession.getId())) { - log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); + log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); diff --git a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml index b844a93..aa02fee 100644 --- a/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml +++ b/src/main/resources/mapper/walletCoinOrder/OrderCoinsDao.xml @@ -25,5 +25,23 @@ select * from coins_order where order_status=1 + <if test="list != null"> + and symbol not in + <foreach collection="list" separator="," item="item" open="(" close=")"> + #{item} + </foreach> + </if> + </select> + + <select id="selectCoinOrderOnTrade" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity"> + select * + from coins_order + where order_status=1 + <if test="list != null"> + and symbol in + <foreach collection="list" separator="," item="item" open="(" close=")"> + #{item} + </foreach> + </if> </select> </mapper> -- Gitblit v1.9.1