package com.xcong.excoin.websocket; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.utils.SpringContextHolder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @ServerEndpoint(value = "/trade/market") @Component public class TradePlateSendWebSocket { @Resource RedisUtils redisUtils; /** * 记录当前在线连接数 */ private static AtomicInteger onlineCount = new AtomicInteger(0); private static Map> tradeplateClients = new ConcurrentHashMap<>(); private static Map> klineClients = new ConcurrentHashMap<>(); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) { onlineCount.incrementAndGet(); // 在线数加1 log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session) { onlineCount.decrementAndGet(); // 在线数减1 // Collection> values = tradeplateClients.values(); // if(CollectionUtils.isNotEmpty(values)){ // for(Map map : values){ // map.remove(session.getId()); // } // } log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol //} JSONObject jsonObject = JSON.parseObject(message); // 盘口的判断 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { log.info("订阅盘口消息:{}", session.getId()); String sub = jsonObject.get("sub").toString(); String symbol = sub.split("\\.")[1]; symbol = CoinTypeConvert.convert(symbol); if (tradeplateClients.containsKey(symbol)) { tradeplateClients.get(symbol).put(session.getId(), session); } else { Map map = new HashMap<>(); map.put(session.getId(), session); tradeplateClients.put(symbol, map); } } // 取消盘口订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { // `market.${symbol}.kline.${strPeriod} log.info("取消订阅盘口消息:{}", session.getId()); String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol; if (tradeplateClients.containsKey(key)) { tradeplateClients.get(key).remove(session.getId()); } } // 最新K线订阅 // 根据消息判断这个用户需要订阅哪种数据 // {sub: `market.${symbol}.kline.${strPeriod}`, //            symbol: symbol, //            period: strPeriod //} // 取消订阅 {unsub: xxx(标识)} if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { // 订阅 log.info("订阅最新K线消息:{}", session.getId()); String sub = jsonObject.get("sub").toString(); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; String key = symbol + "-" + period; if (klineClients.containsKey(key)) { // 有这个币种K线 Map stringSessionMap = klineClients.get(key); if (!stringSessionMap.containsKey(session.getId())) { stringSessionMap.put(session.getId(), session); } } else { Map stringSessionMap = new HashMap<>(); stringSessionMap.put(session.getId(), session); klineClients.put(key, stringSessionMap); } } // 取消订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { // `market.${symbol}.kline.${strPeriod} log.info("取消订阅最新K消息:{}", session.getId()); String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String strPeriod = split[3]; String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol + "-" + strPeriod; if (klineClients.containsKey(key)) { klineClients.get(key).remove(session.getId()); } } // 历史K线订阅 // {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"} if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) { String sub = jsonObject.get("req").toString(); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; //String key = symbol+"-"+period; // String key = "KINE_BCH/USDT_1week"; String key = "KINE_{}_{}"; // 币币k线数据 key = StrUtil.format(key, symbol, period); RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); Object o = bean.get(key); sendMessageHistory(JSON.toJSONString(o), session); } } @OnError public void onError(Session session, Throwable error) { log.error("发生错误"); error.printStackTrace(); } /** * 群发消息 * * @param message 消息内容 */ public void sendMessagePlate(String symbol,String message, Session fromSession) { if (tradeplateClients.containsKey(symbol)) { Map nekk = tradeplateClients.get(symbol); for (Map.Entry sessionEntry : nekk.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 //if (!fromSession.getId().equals(toSession.getId())) { log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } // } } } } public void sendMessageHistory(String message, Session toSession) { log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } public void sendMessageKline(String symbol, String period, String message, Session fromSession) { String key = symbol + "-" + period; if (klineClients.containsKey(key)) { Map stringSessionMap = klineClients.get(key); for (Map.Entry sessionEntry : stringSessionMap.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 //if (!fromSession.getId().equals(toSession.getId())) { log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } // } } } } }