|  |  | 
 |  |  | 
 | 
 |  |  |     OrderCoinsEntity findWalletCoinOrderByOrderNo(@Param("orderNo")String orderNo);
 | 
 |  |  | 
 | 
 |  |  |     List<OrderCoinsEntity> selectAllEntrustingCoinOrderList();
 | 
 |  |  |     List<OrderCoinsEntity> selectAllEntrustingCoinOrderList(List<String> list);
 | 
 |  |  | 
 | 
 |  |  |     List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list);
 | 
 |  |  | }
 | 
 
 |  |  | 
 |  |  |     @Override
 | 
 |  |  |     @Transactional(rollbackFor = Exception.class)
 | 
 |  |  |     public void dealEntrustCoinOrder() {
 | 
 |  |  |         List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList();
 | 
 |  |  |         List<String> ignoreTypes = new ArrayList<>();
 | 
 |  |  |         ignoreTypes.add("NEKK");
 | 
 |  |  |         List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList(ignoreTypes);
 | 
 |  |  |         if (CollUtil.isNotEmpty(list)) {
 | 
 |  |  |             for (OrderCoinsEntity orderCoinsEntity : list) {
 | 
 |  |  |                 BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(orderCoinsEntity.getSymbol() + "/USDT")));
 | 
 |  |  | 
 |  |  |     public void handleOrder(List<ExchangeTrade> trades){
 | 
 |  |  |         // 处理撮合交易的订单
 | 
 |  |  |         for(ExchangeTrade exchangeTrade : trades){
 | 
 |  |  | 
 | 
 |  |  |             if(exchangeTrade==null){
 | 
 |  |  |                 continue;
 | 
 |  |  |             }
 | 
 |  |  |             BigDecimal amount = exchangeTrade.getAmount();
 | 
 |  |  |             Long buyOrderId = exchangeTrade.getBuyOrderId();
 | 
 |  |  |             BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
 | 
 
 |  |  | 
 |  |  |         CoinProcessor processor = processorFactory.getProcessor(symbol); | 
 |  |  |         Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap(); | 
 |  |  |         Collection<Candlestick> values = currentKlineMap.values(); | 
 |  |  |         BigDecimal newPrice = trades.get(trades.size()-1).getPrice(); | 
 |  |  |         BigDecimal newPrice =null; | 
 |  |  |         for (Candlestick candlestick : values) { | 
 |  |  |             for (ExchangeTrade exchangeTrade : trades) { | 
 |  |  |                 if(exchangeTrade==null){ | 
 |  |  |                     continue; | 
 |  |  |                 } | 
 |  |  |                 processor.processTrade(candlestick, exchangeTrade); | 
 |  |  |                 newPrice=exchangeTrade.getPrice(); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |  | 
 |  |  | 
 |  |  |         key = StrUtil.format(key, symbolUsdt); | 
 |  |  |         redisUtils.set(key,currentKlineMap); | 
 |  |  |         // 更新最新价 | 
 |  |  |         redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice); | 
 |  |  |         if(newPrice!=null){ | 
 |  |  |             redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |     } | 
 |  |  | } | 
 
 |  |  | 
 |  |  | import com.xcong.excoin.processor.DefaultCoinProcessor; | 
 |  |  | import com.xcong.excoin.processor.MarketService; | 
 |  |  | import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; | 
 |  |  | import com.xcong.excoin.rabbit.producer.ExchangeProducer; | 
 |  |  | import com.xcong.excoin.trade.CoinTrader; | 
 |  |  | import com.xcong.excoin.trade.CoinTraderFactory; | 
 |  |  | import com.xcong.excoin.utils.CoinTypeConvert; | 
 |  |  | 
 |  |  |     @Resource | 
 |  |  |     private CoinProcessorFactory processorFactory; | 
 |  |  |  | 
 |  |  |     @Resource | 
 |  |  |     ExchangeProducer exchangeProducer; | 
 |  |  |  | 
 |  |  |     @PostConstruct | 
 |  |  |     public void initCoinTrade() { | 
 |  |  |         log.info("#=======撮合交易器开启=======#"); | 
 |  |  |         String symbol = "NEKK"; | 
 |  |  |         CoinTrader newTrader = new CoinTrader(symbol); | 
 |  |  |         newTrader.setOrderCoinService(coinService); | 
 |  |  |         newTrader.setExchangeProducer(exchangeProducer); | 
 |  |  |         //newTrader.setKafkaTemplate(kafkaTemplate); | 
 |  |  |         //newTrader.setBaseCoinScale(coin.getBaseCoinScale()); | 
 |  |  |         //newTrader.setCoinScale(coin.getCoinScale()); | 
 |  |  | 
 |  |  |  | 
 |  |  |         // 创建成功以后需要对未处理订单预处理 | 
 |  |  |         log.info("======CoinTrader Process: " + symbol + "======"); | 
 |  |  |         List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList(); | 
 |  |  |         List<String> symbolList = new ArrayList<>(); | 
 |  |  |         symbolList.add(symbol); | 
 |  |  |         List<OrderCoinsEntity> orders = orderCoinsDao.selectCoinOrderOnTrade(symbolList); | 
 |  |  |         List<OrderCoinsEntity> tradingOrders = new ArrayList<>(); | 
 |  |  |         List<OrderCoinsEntity> completedOrders = new ArrayList<>(); | 
 |  |  |         orders.forEach(order -> { | 
 
 |  |  | 
 |  |  |     @Resource | 
 |  |  |     private RedisUtils redisUtils; | 
 |  |  |  | 
 |  |  |     @Scheduled(cron = "0/2 * * * * *") | 
 |  |  |     @Scheduled(cron = "0/10 * * * * *") | 
 |  |  |     public void tradePlate(){ | 
 |  |  |         redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random())); | 
 |  |  |         redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20)); | 
 |  |  |         Candlestick candlestick = new Candlestick(); | 
 |  |  |         candlestick.setOpen(new BigDecimal("10.33")); | 
 |  |  |         candlestick.setHigh(new BigDecimal("15.23")); | 
 |  |  | 
 |  |  |         candlestick.setId(1599840000L); | 
 |  |  |         candlestick.setCount(100002); | 
 |  |  |         candlestick.setClose(new BigDecimal("12.2323")); | 
 |  |  |         redisUtils.set("NEKK/USDT",candlestick); | 
 |  |  |         //redisUtils.set("NEKK/USDT",candlestick); | 
 |  |  |         // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]] | 
 |  |  |         TradePlateModel tradePlateModel = new TradePlateModel(); | 
 |  |  |         List<BigDecimal> buy; | 
 |  |  | 
 |  |  |             sell.add(new BigDecimal(Math.random()*i*2)); | 
 |  |  |             tradePlateModel.getSell().add(sell); | 
 |  |  |         } | 
 |  |  |  | 
 |  |  |         plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); | 
 |  |  |         log.info("准备发送消息"); | 
 |  |  |         plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); | 
 |  |  |         plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  | 
 |  |  |             } | 
 |  |  |         }); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  | } | 
 
 |  |  | 
 |  |  | import org.springframework.stereotype.Component; | 
 |  |  |  | 
 |  |  | import javax.annotation.Resource; | 
 |  |  | import java.util.Iterator; | 
 |  |  | import java.util.List; | 
 |  |  | import java.util.Map; | 
 |  |  | import java.util.Set; | 
 |  |  | 
 |  |  |      */ | 
 |  |  |     @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) | 
 |  |  |     public void tradePlate(String content) { | 
 |  |  |         log.info("#---->{}#", content); | 
 |  |  |         tradePlateSendWebSocket.sendMessagePlate(content,null); | 
 |  |  |         log.info("#盘口信息消费者---->{}#", content); | 
 |  |  |         tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null); | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     /** | 
 |  |  | 
 |  |  |     public void handleTradeExchange(String content) { | 
 |  |  |         log.info("#---->{}#", content); | 
 |  |  |         List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); | 
 |  |  |         // 去掉空的  暂时这样 | 
 |  |  |         Iterator<ExchangeTrade> iterator = exchangeTrades.iterator(); | 
 |  |  |         while (iterator.hasNext()){ | 
 |  |  |             if(iterator.next()==null){ | 
 |  |  |                 iterator.remove(); | 
 |  |  |             } | 
 |  |  |         } | 
 |  |  |         // 处理K线 并更新最新价 | 
 |  |  |         handleKlineService.handleExchangeOrderToKline(exchangeTrades); | 
 |  |  |         // 推送最新K线 | 
 
 |  |  | 
 |  |  | public class CoinTrader { | 
 |  |  |     private String symbol; | 
 |  |  |     private ExchangeProducer exchangeProducer; | 
 |  |  |     private OrderCoinService orderCoinService; | 
 |  |  |     //交易币种的精度 | 
 |  |  |     private int coinScale = 4; | 
 |  |  |     //基币的精度 | 
 |  |  | 
 |  |  |         return count; | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public OrderCoinService getOrderCoinService() { | 
 |  |  |         return orderCoinService; | 
 |  |  |  | 
 |  |  |     public ExchangeProducer getExchangeProducer() { | 
 |  |  |         return exchangeProducer; | 
 |  |  |     } | 
 |  |  |  | 
 |  |  |     public void setOrderCoinService(OrderCoinService orderCoinService) { | 
 |  |  |         this.orderCoinService = orderCoinService; | 
 |  |  |     public void setExchangeProducer(ExchangeProducer exchangeProducer) { | 
 |  |  |         this.exchangeProducer = exchangeProducer; | 
 |  |  |     } | 
 |  |  | } | 
 
 |  |  | 
 |  |  |         JSONObject jsonObject = JSON.parseObject(message); | 
 |  |  |         // 盘口的判断 | 
 |  |  |         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { | 
 |  |  |             log.info("订阅盘口消息:{}", session.getId()); | 
 |  |  |             String sub = jsonObject.get("sub").toString(); | 
 |  |  |             String symbol = sub.split("\\.")[1]; | 
 |  |  |             symbol = CoinTypeConvert.convert(symbol); | 
 |  |  | 
 |  |  |         // 取消盘口订阅 | 
 |  |  |         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { | 
 |  |  |             // `market.${symbol}.kline.${strPeriod} | 
 |  |  |             log.info("取消订阅盘口消息:{}", session.getId()); | 
 |  |  |             String unsub = jsonObject.get("unsub").toString(); | 
 |  |  |             String[] split = unsub.split("\\."); | 
 |  |  |             String symbol = split[1]; | 
 |  |  | 
 |  |  |         // 取消订阅 {unsub: xxx(标识)} | 
 |  |  |         if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { | 
 |  |  |             // 订阅 | 
 |  |  |             log.info("订阅最新K线消息:{}", session.getId()); | 
 |  |  |             String sub = jsonObject.get("sub").toString(); | 
 |  |  |             String[] split = sub.split("\\."); | 
 |  |  |             String symbol = split[1]; | 
 |  |  | 
 |  |  |         // 取消订阅 | 
 |  |  |         if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { | 
 |  |  |             // `market.${symbol}.kline.${strPeriod} | 
 |  |  |             log.info("取消订阅最新K消息:{}", session.getId()); | 
 |  |  |             String unsub = jsonObject.get("unsub").toString(); | 
 |  |  |             String[] split = unsub.split("\\."); | 
 |  |  |             String strPeriod = split[3]; | 
 |  |  | 
 |  |  |      * | 
 |  |  |      * @param message 消息内容 | 
 |  |  |      */ | 
 |  |  |     public void sendMessagePlate(String message, Session fromSession) { | 
 |  |  |         if (tradeplateClients.containsKey("nekkusdt")) { | 
 |  |  |             Map<String, Session> nekk = tradeplateClients.get("nekkusdt"); | 
 |  |  |     public void sendMessagePlate(String symbol,String message, Session fromSession) { | 
 |  |  |         if (tradeplateClients.containsKey(symbol)) { | 
 |  |  |             Map<String, Session> nekk = tradeplateClients.get(symbol); | 
 |  |  |             for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { | 
 |  |  |                 Session toSession = sessionEntry.getValue(); | 
 |  |  |                 // 排除掉自己 | 
 |  |  |                 //if (!fromSession.getId().equals(toSession.getId())) { | 
 |  |  |                 log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); | 
 |  |  |                 log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); | 
 |  |  |                 boolean open = toSession.isOpen(); | 
 |  |  |                 if (open) { | 
 |  |  |                     toSession.getAsyncRemote().sendText(message); | 
 
 |  |  | 
 |  |  |         select *
 | 
 |  |  |         from coins_order
 | 
 |  |  |         where order_status=1
 | 
 |  |  |         <if test="list != null">
 | 
 |  |  |             and symbol not in
 | 
 |  |  |             <foreach collection="list" separator="," item="item" open="(" close=")">
 | 
 |  |  |                 #{item}
 | 
 |  |  |             </foreach>
 | 
 |  |  |         </if>
 | 
 |  |  |     </select>
 | 
 |  |  | 
 | 
 |  |  |     <select id="selectCoinOrderOnTrade" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
 | 
 |  |  |         select *
 | 
 |  |  |         from coins_order
 | 
 |  |  |         where order_status=1
 | 
 |  |  |         <if test="list != null">
 | 
 |  |  |             and symbol in
 | 
 |  |  |             <foreach collection="list" separator="," item="item" open="(" close=")">
 | 
 |  |  |                 #{item}
 | 
 |  |  |             </foreach>
 | 
 |  |  |         </if>
 | 
 |  |  |     </select>
 | 
 |  |  | </mapper>
 |