| | |
| | |
|
| | | OrderCoinsEntity findWalletCoinOrderByOrderNo(@Param("orderNo")String orderNo);
|
| | |
|
| | | List<OrderCoinsEntity> selectAllEntrustingCoinOrderList();
|
| | | List<OrderCoinsEntity> selectAllEntrustingCoinOrderList(List<String> list);
|
| | |
|
| | | List<OrderCoinsEntity> selectCoinOrderOnTrade(List<String> list);
|
| | | }
|
| | |
| | | @Override
|
| | | @Transactional(rollbackFor = Exception.class)
|
| | | public void dealEntrustCoinOrder() {
|
| | | List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList();
|
| | | List<String> ignoreTypes = new ArrayList<>();
|
| | | ignoreTypes.add("NEKK");
|
| | | List<OrderCoinsEntity> list = orderCoinsDao.selectAllEntrustingCoinOrderList(ignoreTypes);
|
| | | if (CollUtil.isNotEmpty(list)) {
|
| | | for (OrderCoinsEntity orderCoinsEntity : list) {
|
| | | BigDecimal nowPrice = new BigDecimal(redisUtils.getString(CoinTypeConvert.convertToKey(orderCoinsEntity.getSymbol() + "/USDT")));
|
| | |
| | | public void handleOrder(List<ExchangeTrade> trades){
|
| | | // 处理撮合交易的订单
|
| | | for(ExchangeTrade exchangeTrade : trades){
|
| | |
|
| | | if(exchangeTrade==null){
|
| | | continue;
|
| | | }
|
| | | BigDecimal amount = exchangeTrade.getAmount();
|
| | | Long buyOrderId = exchangeTrade.getBuyOrderId();
|
| | | BigDecimal buyTurnover = exchangeTrade.getBuyTurnover();
|
| | |
| | | CoinProcessor processor = processorFactory.getProcessor(symbol); |
| | | Map<String, Candlestick> currentKlineMap = processor.getCurrentKlineMap(); |
| | | Collection<Candlestick> values = currentKlineMap.values(); |
| | | BigDecimal newPrice = trades.get(trades.size()-1).getPrice(); |
| | | BigDecimal newPrice =null; |
| | | for (Candlestick candlestick : values) { |
| | | for (ExchangeTrade exchangeTrade : trades) { |
| | | if(exchangeTrade==null){ |
| | | continue; |
| | | } |
| | | processor.processTrade(candlestick, exchangeTrade); |
| | | newPrice=exchangeTrade.getPrice(); |
| | | } |
| | | } |
| | | |
| | |
| | | key = StrUtil.format(key, symbolUsdt); |
| | | redisUtils.set(key,currentKlineMap); |
| | | // 更新最新价 |
| | | if(newPrice!=null){ |
| | | redisUtils.set(CoinTypeConvert.convertToKey(symbolUsdt), newPrice); |
| | | } |
| | | |
| | | } |
| | | } |
| | |
| | | import com.xcong.excoin.processor.DefaultCoinProcessor; |
| | | import com.xcong.excoin.processor.MarketService; |
| | | import com.xcong.excoin.rabbit.pricequeue.WebsocketPriceService; |
| | | import com.xcong.excoin.rabbit.producer.ExchangeProducer; |
| | | import com.xcong.excoin.trade.CoinTrader; |
| | | import com.xcong.excoin.trade.CoinTraderFactory; |
| | | import com.xcong.excoin.utils.CoinTypeConvert; |
| | |
| | | @Resource |
| | | private CoinProcessorFactory processorFactory; |
| | | |
| | | @Resource |
| | | ExchangeProducer exchangeProducer; |
| | | |
| | | @PostConstruct |
| | | public void initCoinTrade() { |
| | | log.info("#=======撮合交易器开启=======#"); |
| | | String symbol = "NEKK"; |
| | | CoinTrader newTrader = new CoinTrader(symbol); |
| | | newTrader.setOrderCoinService(coinService); |
| | | newTrader.setExchangeProducer(exchangeProducer); |
| | | //newTrader.setKafkaTemplate(kafkaTemplate); |
| | | //newTrader.setBaseCoinScale(coin.getBaseCoinScale()); |
| | | //newTrader.setCoinScale(coin.getCoinScale()); |
| | |
| | | |
| | | // 创建成功以后需要对未处理订单预处理 |
| | | log.info("======CoinTrader Process: " + symbol + "======"); |
| | | List<OrderCoinsEntity> orders = orderCoinsDao.selectAllEntrustingCoinOrderList(); |
| | | List<String> symbolList = new ArrayList<>(); |
| | | symbolList.add(symbol); |
| | | List<OrderCoinsEntity> orders = orderCoinsDao.selectCoinOrderOnTrade(symbolList); |
| | | List<OrderCoinsEntity> tradingOrders = new ArrayList<>(); |
| | | List<OrderCoinsEntity> completedOrders = new ArrayList<>(); |
| | | orders.forEach(order -> { |
| | |
| | | @Resource |
| | | private RedisUtils redisUtils; |
| | | |
| | | @Scheduled(cron = "0/2 * * * * *") |
| | | @Scheduled(cron = "0/10 * * * * *") |
| | | public void tradePlate(){ |
| | | redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random())); |
| | | redisUtils.set("NEKK_NEW_PRICE",new BigDecimal(Math.random()*20)); |
| | | Candlestick candlestick = new Candlestick(); |
| | | candlestick.setOpen(new BigDecimal("10.33")); |
| | | candlestick.setHigh(new BigDecimal("15.23")); |
| | |
| | | candlestick.setId(1599840000L); |
| | | candlestick.setCount(100002); |
| | | candlestick.setClose(new BigDecimal("12.2323")); |
| | | redisUtils.set("NEKK/USDT",candlestick); |
| | | //redisUtils.set("NEKK/USDT",candlestick); |
| | | // [[10244.21, 0.000855], [10243.7, 0.008777], [10243.59, 0.14], [10243.37, 0.467663]] |
| | | TradePlateModel tradePlateModel = new TradePlateModel(); |
| | | List<BigDecimal> buy; |
| | |
| | | sell.add(new BigDecimal(Math.random()*i*2)); |
| | | tradePlateModel.getSell().add(sell); |
| | | } |
| | | |
| | | plateSendWebSocket.sendMessagePlate(JSON.toJSONString(tradePlateModel),null); |
| | | plateSendWebSocket.sendMessageKline("nekkusdt","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); |
| | | plateSendWebSocket.sendMessageKline("nekkusdt","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); |
| | | plateSendWebSocket.sendMessageKline("nekkusdt","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); |
| | | log.info("准备发送消息"); |
| | | plateSendWebSocket.sendMessagePlate("NEKK/USDT",JSON.toJSONString(tradePlateModel),null); |
| | | plateSendWebSocket.sendMessageKline("NEKK/USDT","1min","{amount: 114419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 311958.06091543}",null); |
| | | plateSendWebSocket.sendMessageKline("NEKK/USDT","5min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); |
| | | plateSendWebSocket.sendMessageKline("NEKK/USDT","15min","{amount: 514419.67835656216,close: 2.7261,count: 782,high: 2.7299,id: 1599632100,low: 2.723,open: 2.7288,vol: 911958.06091543}",null); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | }); |
| | | } |
| | | |
| | | } |
| | |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.Resource; |
| | | import java.util.Iterator; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.Set; |
| | |
| | | */ |
| | | @RabbitListener(queues = RabbitMqConfig.QUEUE_TRADE_PLATE) |
| | | public void tradePlate(String content) { |
| | | log.info("#---->{}#", content); |
| | | tradePlateSendWebSocket.sendMessagePlate(content,null); |
| | | log.info("#盘口信息消费者---->{}#", content); |
| | | tradePlateSendWebSocket.sendMessagePlate("NEKK/USDT",content,null); |
| | | } |
| | | |
| | | /** |
| | |
| | | public void handleTradeExchange(String content) { |
| | | log.info("#---->{}#", content); |
| | | List<ExchangeTrade> exchangeTrades = JSONObject.parseArray(content, ExchangeTrade.class); |
| | | // 去掉空的 暂时这样 |
| | | Iterator<ExchangeTrade> iterator = exchangeTrades.iterator(); |
| | | while (iterator.hasNext()){ |
| | | if(iterator.next()==null){ |
| | | iterator.remove(); |
| | | } |
| | | } |
| | | // 处理K线 并更新最新价 |
| | | handleKlineService.handleExchangeOrderToKline(exchangeTrades); |
| | | // 推送最新K线 |
| | |
| | | public class CoinTrader { |
| | | private String symbol; |
| | | private ExchangeProducer exchangeProducer; |
| | | private OrderCoinService orderCoinService; |
| | | //交易币种的精度 |
| | | private int coinScale = 4; |
| | | //基币的精度 |
| | |
| | | return count; |
| | | } |
| | | |
| | | public OrderCoinService getOrderCoinService() { |
| | | return orderCoinService; |
| | | |
| | | public ExchangeProducer getExchangeProducer() { |
| | | return exchangeProducer; |
| | | } |
| | | |
| | | public void setOrderCoinService(OrderCoinService orderCoinService) { |
| | | this.orderCoinService = orderCoinService; |
| | | public void setExchangeProducer(ExchangeProducer exchangeProducer) { |
| | | this.exchangeProducer = exchangeProducer; |
| | | } |
| | | } |
| | |
| | | JSONObject jsonObject = JSON.parseObject(message); |
| | | // 盘口的判断 |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { |
| | | log.info("订阅盘口消息:{}", session.getId()); |
| | | String sub = jsonObject.get("sub").toString(); |
| | | String symbol = sub.split("\\.")[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | |
| | | // 取消盘口订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | log.info("取消订阅盘口消息:{}", session.getId()); |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String symbol = split[1]; |
| | |
| | | // 取消订阅 {unsub: xxx(标识)} |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { |
| | | // 订阅 |
| | | log.info("订阅最新K线消息:{}", session.getId()); |
| | | String sub = jsonObject.get("sub").toString(); |
| | | String[] split = sub.split("\\."); |
| | | String symbol = split[1]; |
| | |
| | | // 取消订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | log.info("取消订阅最新K消息:{}", session.getId()); |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String strPeriod = split[3]; |
| | |
| | | * |
| | | * @param message 消息内容 |
| | | */ |
| | | public void sendMessagePlate(String message, Session fromSession) { |
| | | if (tradeplateClients.containsKey("nekkusdt")) { |
| | | Map<String, Session> nekk = tradeplateClients.get("nekkusdt"); |
| | | public void sendMessagePlate(String symbol,String message, Session fromSession) { |
| | | if (tradeplateClients.containsKey(symbol)) { |
| | | Map<String, Session> nekk = tradeplateClients.get(symbol); |
| | | for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { |
| | | Session toSession = sessionEntry.getValue(); |
| | | // 排除掉自己 |
| | | //if (!fromSession.getId().equals(toSession.getId())) { |
| | | log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); |
| | | log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | |
| | | select *
|
| | | from coins_order
|
| | | where order_status=1
|
| | | <if test="list != null">
|
| | | and symbol not in
|
| | | <foreach collection="list" separator="," item="item" open="(" close=")">
|
| | | #{item}
|
| | | </foreach>
|
| | | </if>
|
| | | </select>
|
| | |
|
| | | <select id="selectCoinOrderOnTrade" resultType="com.xcong.excoin.modules.coin.entity.OrderCoinsEntity">
|
| | | select *
|
| | | from coins_order
|
| | | where order_status=1
|
| | | <if test="list != null">
|
| | | and symbol in
|
| | | <foreach collection="list" separator="," item="item" open="(" close=")">
|
| | | #{item}
|
| | | </foreach>
|
| | | </if>
|
| | | </select>
|
| | | </mapper>
|