| | |
| | | |
| | | import cn.hutool.core.util.StrUtil; |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONArray; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.huobi.client.model.Candlestick; |
| | | import com.xcong.excoin.common.contants.AppContants; |
| | | import com.xcong.excoin.trade.CoinTraderFactory; |
| | | import com.xcong.excoin.utils.CoinTypeConvert; |
| | | import com.xcong.excoin.utils.RedisUtils; |
| | | import com.xcong.excoin.utils.SpringContextHolder; |
| | |
| | | import javax.websocket.*; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.util.Collection; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | |
| | | @OnClose |
| | | public void onClose(Session session) { |
| | | onlineCount.decrementAndGet(); // 在线数减1 |
| | | // Collection<Map<String, Session>> values = tradeplateClients.values(); |
| | | // if(CollectionUtils.isNotEmpty(values)){ |
| | | // for(Map<String,Session> map : values){ |
| | | // map.remove(session.getId()); |
| | | // } |
| | | // } |
| | | Collection<Map<String, Session>> values = tradeplateClients.values(); |
| | | if(CollectionUtils.isNotEmpty(values)){ |
| | | for(Map<String,Session> map : values){ |
| | | map.remove(session.getId()); |
| | | } |
| | | } |
| | | |
| | | Collection<Map<String, Session>> klineClientsValues = klineClients.values(); |
| | | if(CollectionUtils.isNotEmpty(klineClientsValues)){ |
| | | for(Map<String,Session> map : klineClientsValues){ |
| | | map.remove(session.getId()); |
| | | } |
| | | } |
| | | log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); |
| | | } |
| | | |
| | |
| | | */ |
| | | @OnMessage |
| | | public void onMessage(String message, Session session) { |
| | | log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); |
| | | //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); |
| | | // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol |
| | | //} |
| | | JSONObject jsonObject = JSON.parseObject(message); |
| | | // 盘口的判断 |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { |
| | | //log.info("订阅盘口消息:{}", session.getId()); |
| | | String sub = jsonObject.get("sub").toString(); |
| | | String symbol = sub.split("\\.")[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | |
| | | map.put(session.getId(), session); |
| | | tradeplateClients.put(symbol, map); |
| | | } |
| | | // 发送一次盘口 |
| | | CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class); |
| | | // 发送订阅消息 |
| | | String nekk = factory.getTrader("NEKK").sendTradePlateMessage(); |
| | | SubResultModel subResultModel = new SubResultModel(); |
| | | subResultModel.setId("nekkusdt"); |
| | | subResultModel.setSubbed(sub); |
| | | synchronized (session){ |
| | | try { |
| | | session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel)); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | synchronized (session){ |
| | | try { |
| | | session.getBasicRemote().sendText(nekk); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | |
| | | } |
| | | // 取消盘口订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | //log.info("取消订阅盘口消息:{}", session.getId()); |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String symbol = split[1]; |
| | |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { |
| | | // 订阅 |
| | | String sub = jsonObject.get("sub").toString(); |
| | | log.info("订阅最新K线消息:{}", sub); |
| | | String[] split = sub.split("\\."); |
| | | String symbol = split[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | | String period = split[3]; |
| | | if("60min".equals(period)){ |
| | | period = "1hour"; |
| | | } |
| | | String key = symbol + "-" + period; |
| | | log.info("最新K线key:{}", key); |
| | | if (klineClients.containsKey(key)) { |
| | | // 有这个币种K线 |
| | | Map<String, Session> stringSessionMap = klineClients.get(key); |
| | | if (!stringSessionMap.containsKey(session.getId())) { |
| | | stringSessionMap.put(session.getId(), session); |
| | | log.info("放入最新K线Map:{}", key); |
| | | } |
| | | } else { |
| | | Map<String, Session> stringSessionMap = new HashMap<>(); |
| | | stringSessionMap.put(session.getId(), session); |
| | | klineClients.put(key, stringSessionMap); |
| | | log.info("放入最新K线Map:{}", key); |
| | | } |
| | | } |
| | | |
| | | // 取消订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | //log.info("取消订阅最新K消息:{}", session.getId()); |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String strPeriod = split[3]; |
| | | if("60min".equals(strPeriod)){ |
| | | strPeriod = "1hour"; |
| | | } |
| | | String symbol = split[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | | String key = symbol + "-" + strPeriod; |
| | | if (klineClients.containsKey(key)) { |
| | | klineClients.get(key).remove(session.getId()); |
| | | //session.getAsyncRemote().sendText(message); |
| | | } |
| | | } |
| | | |
| | |
| | | String symbol = split[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | | String period = split[3]; |
| | | if("60min".equals(period)){ |
| | | period = "1hour"; |
| | | } |
| | | //String key = symbol+"-"+period; |
| | | // String key = "KINE_BCH/USDT_1week"; |
| | | String key = "KINE_{}_{}"; |
| | | // 币币k线数据 |
| | | key = StrUtil.format(key, symbol, period); |
| | | //key = StrUtil.format(key, symbol, period); |
| | | key = StrUtil.format(key, "NEKK/USDT", period); |
| | | RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); |
| | | Object o = bean.get(key); |
| | | sendMessageHistory(JSON.toJSONString(o), session); |
| | | List<CandlestickModel> candlestickModels = new ArrayList<>(); |
| | | CandlestickResult result = new CandlestickResult(); |
| | | result.setRep(sub); |
| | | if(o!=null){ |
| | | List<Candlestick> list = (List<Candlestick>)o; |
| | | for(Candlestick candlestick : list){ |
| | | CandlestickModel model = new CandlestickModel(); |
| | | model.setAmount(candlestick.getAmount()); |
| | | model.setClose(candlestick.getClose()); |
| | | model.setCount(candlestick.getCount()); |
| | | model.setHigh(candlestick.getHigh()); |
| | | model.setId(candlestick.getTimestamp()/1000); |
| | | model.setOpen(candlestick.getOpen()); |
| | | model.setLow(candlestick.getLow()); |
| | | model.setVol(candlestick.getVolume()); |
| | | candlestickModels.add(model); |
| | | } |
| | | result.setData(candlestickModels); |
| | | } |
| | | |
| | | sendMessageHistory(JSON.toJSONString(result), session); |
| | | } |
| | | } |
| | | |
| | |
| | | * |
| | | * @param message 消息内容 |
| | | */ |
| | | public void sendMessagePlate(String message, Session fromSession) { |
| | | if (tradeplateClients.containsKey("nekkusdt")) { |
| | | Map<String, Session> nekk = tradeplateClients.get("nekkusdt"); |
| | | public void sendMessagePlate(String symbol,String message, Session fromSession) { |
| | | if (tradeplateClients.containsKey(symbol)) { |
| | | Map<String, Session> nekk = tradeplateClients.get(symbol); |
| | | for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { |
| | | Session toSession = sessionEntry.getValue(); |
| | | // 排除掉自己 |
| | | //if (!fromSession.getId().equals(toSession.getId())) { |
| | | log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); |
| | | // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | |
| | | } |
| | | |
| | | public void sendMessageHistory(String message, Session toSession) { |
| | | log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); |
| | | // log.info("服务端给客户端[{}]发送历史K线", toSession.getId()); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | |
| | | } |
| | | |
| | | public void sendMessageKline(String symbol, String period, String message, Session fromSession) { |
| | | |
| | | String key = symbol + "-" + period; |
| | | //log.info("发送最新K线[{}],数据[{}]",key,message); |
| | | if (klineClients.containsKey(key)) { |
| | | Map<String, Session> stringSessionMap = klineClients.get(key); |
| | | for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { |
| | | Session toSession = sessionEntry.getValue(); |
| | | // 排除掉自己 |
| | | //if (!fromSession.getId().equals(toSession.getId())) { |
| | | log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message); |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | // } |
| | | } |
| | | } |
| | | } |