package com.xcong.excoin.websocket; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.huobi.client.model.Candlestick; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.modules.symbols.constants.SymbolsConstats; import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.utils.SpringContextHolder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @Slf4j @ServerEndpoint(value = "/trade/market") @Component public class TradePlateSendWebSocket { /** * 记录当前在线连接数 */ private static AtomicInteger onlineCount = new AtomicInteger(0); private static Map> tradeplateClients = new ConcurrentHashMap<>(); private static Map> klineClients = new ConcurrentHashMap<>(); /** * 连接建立成功调用的方法 */ @OnOpen public void onOpen(Session session) { onlineCount.incrementAndGet(); // 在线数加1 // log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** * 连接关闭调用的方法 */ @OnClose public void onClose(Session session) { onlineCount.decrementAndGet(); // 在线数减1 Collection> values = tradeplateClients.values(); if (CollectionUtils.isNotEmpty(values)) { for (Map map : values) { map.remove(session.getId()); } } Collection> klineClientsValues = klineClients.values(); if (CollectionUtils.isNotEmpty(klineClientsValues)) { for (Map map : klineClientsValues) { map.remove(session.getId()); } } //log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 */ @OnMessage public void onMessage(String message, Session session) { // 盘口订阅方法 {sub: 'market.btcusdt.depth.10,id: symbol} JSONObject jsonObject = JSON.parseObject(message); // log.info("订阅参数:{}", jsonObject); // 盘口的判断 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { String sub = jsonObject.get("sub").toString(); String symbol = sub.split("\\.")[1]; symbol = CoinTypeConvert.convert(symbol); if (tradeplateClients.containsKey(symbol)) { tradeplateClients.get(symbol).put(session.getId(), session); } else { Map map = new HashMap<>(); map.put(session.getId(), session); tradeplateClients.put(symbol, map); } // 发送一次盘口 CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class); // 发送订阅消息 String nekk = factory.getTrader(SymbolsConstats.ROC).sendTradePlateMessage(); SubResultModel subResultModel = new SubResultModel(); subResultModel.setId("griceusdt"); subResultModel.setSubbed(sub); synchronized (session) { try { session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel)); } catch (IOException e) { e.printStackTrace(); } } synchronized (session) { try { session.getBasicRemote().sendText(nekk); } catch (IOException e) { e.printStackTrace(); } } } // 取消盘口订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { // {unsub:'market.btcusdt.kline.1min'} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol; if (tradeplateClients.containsKey(key)) { tradeplateClients.get(key).remove(session.getId()); } } // 最新K线订阅 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { // 订阅 // {sub: 'market.btcusdt.kline.1min'} String sub = jsonObject.get("sub").toString(); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; if ("60min".equals(period)) { period = "1hour"; } String key = symbol + "-" + period; if (klineClients.containsKey(key)) { // 有这个币种K线 Map stringSessionMap = klineClients.get(key); if (!stringSessionMap.containsKey(session.getId())) { stringSessionMap.put(session.getId(), session); } } else { Map stringSessionMap = new HashMap<>(); stringSessionMap.put(session.getId(), session); klineClients.put(key, stringSessionMap); } // 给他发送最新K线 TODO String newKline = "NEW_KINE_{}"; key = StrUtil.format(newKline, symbol); RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class); Object o = redisUtils.get(key); Map currentKlineMap = (Map) o; String ch = "market.{}.kline.{}"; String chKey = period; if (period.equals("60min")) { chKey = "1hour"; } if(currentKlineMap!=null && currentKlineMap.containsKey(chKey)){ Candlestick value = currentKlineMap.get(chKey); // 转换 NewCandlestick newCandlestick = new NewCandlestick(); String nekkusdt = split[1]; ch = StrUtil.format(ch, nekkusdt, period); newCandlestick.setCh(ch); CandlestickModel model = new CandlestickModel(); model.setVol(value.getVolume()); model.setLow(value.getLow()); model.setOpen(value.getOpen()); model.setHigh(value.getHigh()); model.setCount(value.getCount()); model.setAmount(value.getAmount()); model.setId(value.getTimestamp() / 1000); model.setTimestamp(value.getTimestamp() / 1000); model.setClose(value.getClose()); newCandlestick.setTick(model); sendMessageKlineNow(JSONObject.toJSONString(newCandlestick), session); } } // 取消订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { // {unsub:'market.${symbol}.kline.${strPeriod}'} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String strPeriod = split[3]; if ("60min".equals(strPeriod)) { strPeriod = "1hour"; } String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol + "-" + strPeriod; if (klineClients.containsKey(key)) { klineClients.get(key).remove(session.getId()); } } // 历史K线订阅 // {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"} if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) { String sub = jsonObject.get("req").toString(); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; if ("60min".equals(period)) { period = "1hour"; } //String key = symbol+"-"+period; // String key = "KINE_BCH/USDT_1week"; String key = "KINE_{}_{}"; // 币币k线数据 //key = StrUtil.format(key, symbol, period); key = StrUtil.format(key, "GRICE/USDT", period); RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); Object o = bean.get(key); List candlestickModels = new ArrayList<>(); CandlestickResult result = new CandlestickResult(); result.setRep(sub); if (o != null) { List list = (List) o; if(list!=null && list.size()>300){ int size = list.size(); list = list.subList(size-300,size); } CandlestickModel model = null; for (Candlestick candlestick : list) { model = new CandlestickModel(); model.setAmount(candlestick.getAmount()); model.setClose(candlestick.getClose()); model.setCount(candlestick.getCount()); model.setHigh(candlestick.getHigh()); model.setId(candlestick.getTimestamp() / 1000); model.setOpen(candlestick.getOpen()); model.setLow(candlestick.getLow()); model.setVol(candlestick.getVolume()); candlestickModels.add(model); } result.setData(candlestickModels); } sendMessageHistory(JSON.toJSONString(result), session); } } @OnError public void onError(Session session, Throwable error) { // log.error("发生错误"); //error.printStackTrace(); } /** * 发送盘口消息 * * @param message 消息内容 */ public void sendMessagePlate(String symbol, String message, Session fromSession) { if (tradeplateClients.containsKey(symbol)) { Map nekk = tradeplateClients.get(symbol); for (Map.Entry sessionEntry : nekk.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } } } /** * 发送历史K线 * * @param message, toSession * @return void */ public void sendMessageHistory(String message, Session toSession) { boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } /** * 发送最新K线 * * @param symbol, period, message, fromSession * @return void */ public void sendMessageKline(String symbol, String period, String message, Session fromSession) { String key = symbol + "-" + period; if (klineClients.containsKey(key)) { Map stringSessionMap = klineClients.get(key); for (Map.Entry sessionEntry : stringSessionMap.entrySet()) { Session toSession = sessionEntry.getValue(); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } } } public void sendMessageKlineNow(String message, Session toSession) { boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } }