From 2e2eeeb8291415706f4941e55270a0b23e76bf23 Mon Sep 17 00:00:00 2001 From: xiaoyong931011 <15274802129@163.com> Date: Fri, 04 Mar 2022 15:16:06 +0800 Subject: [PATCH] 20222223 --- src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 152 ++++++++++++++++++++++++++++++-------------------- 1 files changed, 92 insertions(+), 60 deletions(-) diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java index 15d4168..c676e8c 100644 --- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java +++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java @@ -6,6 +6,7 @@ import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.common.contants.AppContants; +import com.xcong.excoin.modules.symbols.constants.SymbolsConstats; import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; @@ -27,8 +28,6 @@ @ServerEndpoint(value = "/trade/market") @Component public class TradePlateSendWebSocket { - @Resource - RedisUtils redisUtils; /** * 记录当前在线连接数 @@ -46,7 +45,7 @@ @OnOpen public void onOpen(Session session) { onlineCount.incrementAndGet(); // 在线数加1 - log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); + // log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** @@ -56,19 +55,19 @@ public void onClose(Session session) { onlineCount.decrementAndGet(); // 在线数减1 Collection<Map<String, Session>> values = tradeplateClients.values(); - if(CollectionUtils.isNotEmpty(values)){ - for(Map<String,Session> map : values){ + if (CollectionUtils.isNotEmpty(values)) { + for (Map<String, Session> map : values) { map.remove(session.getId()); } } Collection<Map<String, Session>> klineClientsValues = klineClients.values(); - if(CollectionUtils.isNotEmpty(klineClientsValues)){ - for(Map<String,Session> map : klineClientsValues){ + if (CollectionUtils.isNotEmpty(klineClientsValues)) { + for (Map<String, Session> map : klineClientsValues) { map.remove(session.getId()); } } - log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); + //log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** @@ -78,13 +77,12 @@ */ @OnMessage public void onMessage(String message, Session session) { - //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); - // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol - //} + // 盘口订阅方法 {sub: 'market.btcusdt.depth.10,id: symbol} JSONObject jsonObject = JSON.parseObject(message); +// log.info("订阅参数:{}", jsonObject); // 盘口的判断 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { - //log.info("订阅盘口消息:{}", session.getId()); + String sub = jsonObject.get("sub").toString(); String symbol = sub.split("\\.")[1]; symbol = CoinTypeConvert.convert(symbol); @@ -98,31 +96,30 @@ // 发送一次盘口 CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class); // 发送订阅消息 - String nekk = factory.getTrader("NEKK").sendTradePlateMessage(); + String nekk = factory.getTrader(SymbolsConstats.ROC).sendTradePlateMessage(); SubResultModel subResultModel = new SubResultModel(); - subResultModel.setId("nekkusdt"); + subResultModel.setId("griceusdt"); subResultModel.setSubbed(sub); - synchronized (session){ + synchronized (session) { try { session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel)); } catch (IOException e) { e.printStackTrace(); } } - synchronized (session){ - try { - session.getBasicRemote().sendText(nekk); - } catch (IOException e) { - e.printStackTrace(); - } - } + synchronized (session) { + try { + session.getBasicRemote().sendText(nekk); + } catch (IOException e) { + e.printStackTrace(); + } + } } // 取消盘口订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { - // `market.${symbol}.kline.${strPeriod} - //log.info("取消订阅盘口消息:{}", session.getId()); + // {unsub:'market.btcusdt.kline.1min'} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String symbol = split[1]; @@ -133,51 +130,70 @@ } } - // 最新K线订阅 - - // 根据消息判断这个用户需要订阅哪种数据 - // {sub: `market.${symbol}.kline.${strPeriod}`, - // symbol: symbol, - // period: strPeriod - //} - // 取消订阅 {unsub: xxx(标识)} if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { // 订阅 + // {sub: 'market.btcusdt.kline.1min'} String sub = jsonObject.get("sub").toString(); - log.info("订阅最新K线消息:{}", sub); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; - if("60min".equals(period)){ + if ("60min".equals(period)) { period = "1hour"; } String key = symbol + "-" + period; - log.info("最新K线key:{}", key); if (klineClients.containsKey(key)) { // 有这个币种K线 Map<String, Session> stringSessionMap = klineClients.get(key); if (!stringSessionMap.containsKey(session.getId())) { stringSessionMap.put(session.getId(), session); - log.info("放入最新K线Map:{}", key); } } else { Map<String, Session> stringSessionMap = new HashMap<>(); stringSessionMap.put(session.getId(), session); klineClients.put(key, stringSessionMap); - log.info("放入最新K线Map:{}", key); + } + // 给他发送最新K线 TODO + String newKline = "NEW_KINE_{}"; + key = StrUtil.format(newKline, symbol); + RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class); + Object o = redisUtils.get(key); + Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>) o; + String ch = "market.{}.kline.{}"; + String chKey = period; + if (period.equals("60min")) { + chKey = "1hour"; + } + if(currentKlineMap!=null && currentKlineMap.containsKey(chKey)){ + Candlestick value = currentKlineMap.get(chKey); + // 转换 + NewCandlestick newCandlestick = new NewCandlestick(); + String nekkusdt = split[1]; + ch = StrUtil.format(ch, nekkusdt, period); + newCandlestick.setCh(ch); + CandlestickModel model = new CandlestickModel(); + model.setVol(value.getVolume()); + model.setLow(value.getLow()); + model.setOpen(value.getOpen()); + model.setHigh(value.getHigh()); + model.setCount(value.getCount()); + model.setAmount(value.getAmount()); + model.setId(value.getTimestamp() / 1000); + model.setTimestamp(value.getTimestamp() / 1000); + model.setClose(value.getClose()); + newCandlestick.setTick(model); + sendMessageKlineNow(JSONObject.toJSONString(newCandlestick), session); } } // 取消订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { - // `market.${symbol}.kline.${strPeriod} - //log.info("取消订阅最新K消息:{}", session.getId()); + // {unsub:'market.${symbol}.kline.${strPeriod}'} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String strPeriod = split[3]; - if("60min".equals(strPeriod)){ + if ("60min".equals(strPeriod)) { strPeriod = "1hour"; } String symbol = split[1]; @@ -185,7 +201,6 @@ String key = symbol + "-" + strPeriod; if (klineClients.containsKey(key)) { klineClients.get(key).remove(session.getId()); - //session.getAsyncRemote().sendText(message); } } @@ -197,7 +212,7 @@ String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; - if("60min".equals(period)){ + if ("60min".equals(period)) { period = "1hour"; } //String key = symbol+"-"+period; @@ -205,21 +220,27 @@ String key = "KINE_{}_{}"; // 币币k线数据 //key = StrUtil.format(key, symbol, period); - key = StrUtil.format(key, "NEKK/USDT", period); + key = StrUtil.format(key, "GRICE/USDT", period); RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); Object o = bean.get(key); List<CandlestickModel> candlestickModels = new ArrayList<>(); CandlestickResult result = new CandlestickResult(); result.setRep(sub); - if(o!=null){ - List<Candlestick> list = (List<Candlestick>)o; - for(Candlestick candlestick : list){ - CandlestickModel model = new CandlestickModel(); + if (o != null) { + List<Candlestick> list = (List<Candlestick>) o; + + if(list!=null && list.size()>300){ + int size = list.size(); + list = list.subList(size-300,size); + } + CandlestickModel model = null; + for (Candlestick candlestick : list) { + model = new CandlestickModel(); model.setAmount(candlestick.getAmount()); model.setClose(candlestick.getClose()); model.setCount(candlestick.getCount()); model.setHigh(candlestick.getHigh()); - model.setId(candlestick.getTimestamp()/1000); + model.setId(candlestick.getTimestamp() / 1000); model.setOpen(candlestick.getOpen()); model.setLow(candlestick.getLow()); model.setVol(candlestick.getVolume()); @@ -234,56 +255,67 @@ @OnError public void onError(Session session, Throwable error) { - log.error("发生错误"); - error.printStackTrace(); + // log.error("发生错误"); + //error.printStackTrace(); } /** - * 群发消息 + * 发送盘口消息 * * @param message 消息内容 */ - public void sendMessagePlate(String symbol,String message, Session fromSession) { + public void sendMessagePlate(String symbol, String message, Session fromSession) { if (tradeplateClients.containsKey(symbol)) { Map<String, Session> nekk = tradeplateClients.get(symbol); for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 - //if (!fromSession.getId().equals(toSession.getId())) { - // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } - - // } } } } + /** + * 发送历史K线 + * + * @param message, toSession + * @return void + */ public void sendMessageHistory(String message, Session toSession) { - // log.info("服务端给客户端[{}]发送历史K线", toSession.getId()); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } + /** + * 发送最新K线 + * + * @param symbol, period, message, fromSession + * @return void + */ public void sendMessageKline(String symbol, String period, String message, Session fromSession) { - String key = symbol + "-" + period; - //log.info("发送最新K线[{}],数据[{}]",key,message); if (klineClients.containsKey(key)) { Map<String, Session> stringSessionMap = klineClients.get(key); for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { Session toSession = sessionEntry.getValue(); boolean open = toSession.isOpen(); if (open) { - log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message); toSession.getAsyncRemote().sendText(message); } } } } + + public void sendMessageKlineNow(String message, Session toSession) { + boolean open = toSession.isOpen(); + if (open) { + toSession.getAsyncRemote().sendText(message); + } + } } -- Gitblit v1.9.1