| | |
| | | */ |
| | | @OnMessage |
| | | public void onMessage(String message, Session session) { |
| | | //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); |
| | | // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol |
| | | //} |
| | | // 盘口订阅方法 {sub: 'market.btcusdt.depth.10,id: symbol} |
| | | JSONObject jsonObject = JSON.parseObject(message); |
| | | // 盘口的判断 |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { |
| | | //log.info("订阅盘口消息:{}", session.getId()); |
| | | String sub = jsonObject.get("sub").toString(); |
| | | String symbol = sub.split("\\.")[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | |
| | | } |
| | | // 取消盘口订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | //log.info("取消订阅盘口消息:{}", session.getId()); |
| | | // {unsub:'market.btcusdt.kline.1min'} |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String symbol = split[1]; |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | // 最新K线订阅 |
| | | |
| | | // 根据消息判断这个用户需要订阅哪种数据 |
| | | // {sub: `market.${symbol}.kline.${strPeriod}`, |
| | | // symbol: symbol, |
| | | // period: strPeriod |
| | | //} |
| | | // 取消订阅 {unsub: xxx(标识)} |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { |
| | | // 订阅 |
| | | // {sub: 'market.btcusdt.kline.1min'} |
| | | String sub = jsonObject.get("sub").toString(); |
| | | log.info("订阅最新K线消息:{}", sub); |
| | | String[] split = sub.split("\\."); |
| | | String symbol = split[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | |
| | | period = "1hour"; |
| | | } |
| | | String key = symbol + "-" + period; |
| | | log.info("最新K线key:{}", key); |
| | | if (klineClients.containsKey(key)) { |
| | | // 有这个币种K线 |
| | | Map<String, Session> stringSessionMap = klineClients.get(key); |
| | | if (!stringSessionMap.containsKey(session.getId())) { |
| | | stringSessionMap.put(session.getId(), session); |
| | | log.info("放入最新K线Map:{}", key); |
| | | } |
| | | } else { |
| | | Map<String, Session> stringSessionMap = new HashMap<>(); |
| | | stringSessionMap.put(session.getId(), session); |
| | | klineClients.put(key, stringSessionMap); |
| | | log.info("放入最新K线Map:{}", key); |
| | | } |
| | | } |
| | | |
| | | // 取消订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | //log.info("取消订阅最新K消息:{}", session.getId()); |
| | | // {unsub:'market.${symbol}.kline.${strPeriod}'} |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String strPeriod = split[3]; |
| | |
| | | String key = symbol + "-" + strPeriod; |
| | | if (klineClients.containsKey(key)) { |
| | | klineClients.get(key).remove(session.getId()); |
| | | //session.getAsyncRemote().sendText(message); |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * 群发消息 |
| | | * 发送盘口消息 |
| | | * |
| | | * @param message 消息内容 |
| | | */ |
| | |
| | | for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { |
| | | Session toSession = sessionEntry.getValue(); |
| | | // 排除掉自己 |
| | | //if (!fromSession.getId().equals(toSession.getId())) { |
| | | // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | |
| | | // } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送历史K线 |
| | | * @param message, toSession |
| | | * @return void |
| | | */ |
| | | public void sendMessageHistory(String message, Session toSession) { |
| | | // log.info("服务端给客户端[{}]发送历史K线", toSession.getId()); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送最新K线 |
| | | * @param symbol, period, message, fromSession |
| | | * @return void |
| | | */ |
| | | public void sendMessageKline(String symbol, String period, String message, Session fromSession) { |
| | | |
| | | String key = symbol + "-" + period; |
| | | //log.info("发送最新K线[{}],数据[{}]",key,message); |
| | | if (klineClients.containsKey(key)) { |
| | | Map<String, Session> stringSessionMap = klineClients.get(key); |
| | | for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { |
| | | Session toSession = sessionEntry.getValue(); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message); |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | } |