| | |
| | | |
| | | import cn.hutool.core.util.StrUtil; |
| | | import com.alibaba.fastjson.JSON; |
| | | import com.alibaba.fastjson.JSONArray; |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.huobi.client.model.Candlestick; |
| | | import com.xcong.excoin.common.contants.AppContants; |
| | | import com.xcong.excoin.modules.symbols.constants.SymbolsConstats; |
| | | import com.xcong.excoin.trade.CoinTraderFactory; |
| | | import com.xcong.excoin.utils.CoinTypeConvert; |
| | | import com.xcong.excoin.utils.RedisUtils; |
| | | import com.xcong.excoin.utils.SpringContextHolder; |
| | |
| | | import javax.websocket.*; |
| | | import javax.websocket.server.PathParam; |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.util.Collection; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.io.IOException; |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | |
| | | @ServerEndpoint(value = "/trade/market") |
| | | @Component |
| | | public class TradePlateSendWebSocket { |
| | | @Resource |
| | | RedisUtils redisUtils; |
| | | |
| | | /** |
| | | * 记录当前在线连接数 |
| | |
| | | @OnOpen |
| | | public void onOpen(Session session) { |
| | | onlineCount.incrementAndGet(); // 在线数加1 |
| | | log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); |
| | | // log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get()); |
| | | } |
| | | |
| | | /** |
| | |
| | | @OnClose |
| | | public void onClose(Session session) { |
| | | onlineCount.decrementAndGet(); // 在线数减1 |
| | | // Collection<Map<String, Session>> values = tradeplateClients.values(); |
| | | // if(CollectionUtils.isNotEmpty(values)){ |
| | | // for(Map<String,Session> map : values){ |
| | | // map.remove(session.getId()); |
| | | // } |
| | | // } |
| | | log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); |
| | | Collection<Map<String, Session>> values = tradeplateClients.values(); |
| | | if (CollectionUtils.isNotEmpty(values)) { |
| | | for (Map<String, Session> map : values) { |
| | | map.remove(session.getId()); |
| | | } |
| | | } |
| | | |
| | | Collection<Map<String, Session>> klineClientsValues = klineClients.values(); |
| | | if (CollectionUtils.isNotEmpty(klineClientsValues)) { |
| | | for (Map<String, Session> map : klineClientsValues) { |
| | | map.remove(session.getId()); |
| | | } |
| | | } |
| | | //log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | @OnMessage |
| | | public void onMessage(String message, Session session) { |
| | | log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); |
| | | // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol |
| | | //} |
| | | // 盘口订阅方法 {sub: 'market.btcusdt.depth.10,id: symbol} |
| | | JSONObject jsonObject = JSON.parseObject(message); |
| | | // log.info("订阅参数:{}", jsonObject); |
| | | // 盘口的判断 |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { |
| | | log.info("订阅盘口消息:{}", session.getId()); |
| | | |
| | | String sub = jsonObject.get("sub").toString(); |
| | | String symbol = sub.split("\\.")[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | |
| | | map.put(session.getId(), session); |
| | | tradeplateClients.put(symbol, map); |
| | | } |
| | | // 发送一次盘口 |
| | | CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class); |
| | | // 发送订阅消息 |
| | | String nekk = factory.getTrader(SymbolsConstats.ROC).sendTradePlateMessage(); |
| | | SubResultModel subResultModel = new SubResultModel(); |
| | | subResultModel.setId("bzzusdt"); |
| | | subResultModel.setSubbed(sub); |
| | | synchronized (session) { |
| | | try { |
| | | session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel)); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | synchronized (session) { |
| | | try { |
| | | session.getBasicRemote().sendText(nekk); |
| | | } catch (IOException e) { |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | |
| | | } |
| | | // 取消盘口订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | log.info("取消订阅盘口消息:{}", session.getId()); |
| | | // {unsub:'market.btcusdt.kline.1min'} |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String symbol = split[1]; |
| | |
| | | } |
| | | } |
| | | |
| | | |
| | | // 最新K线订阅 |
| | | |
| | | // 根据消息判断这个用户需要订阅哪种数据 |
| | | // {sub: `market.${symbol}.kline.${strPeriod}`, |
| | | // symbol: symbol, |
| | | // period: strPeriod |
| | | //} |
| | | // 取消订阅 {unsub: xxx(标识)} |
| | | if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { |
| | | // 订阅 |
| | | log.info("订阅最新K线消息:{}", session.getId()); |
| | | // {sub: 'market.btcusdt.kline.1min'} |
| | | String sub = jsonObject.get("sub").toString(); |
| | | String[] split = sub.split("\\."); |
| | | String symbol = split[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | | String period = split[3]; |
| | | if ("60min".equals(period)) { |
| | | period = "1hour"; |
| | | } |
| | | String key = symbol + "-" + period; |
| | | if (klineClients.containsKey(key)) { |
| | | // 有这个币种K线 |
| | |
| | | stringSessionMap.put(session.getId(), session); |
| | | klineClients.put(key, stringSessionMap); |
| | | } |
| | | // 给他发送最新K线 TODO |
| | | String newKline = "NEW_KINE_{}"; |
| | | key = StrUtil.format(newKline, symbol); |
| | | RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class); |
| | | Object o = redisUtils.get(key); |
| | | Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>) o; |
| | | String ch = "market.{}.kline.{}"; |
| | | String chKey = period; |
| | | if (period.equals("60min")) { |
| | | chKey = "1hour"; |
| | | } |
| | | if(currentKlineMap!=null && currentKlineMap.containsKey(chKey)){ |
| | | Candlestick value = currentKlineMap.get(chKey); |
| | | // 转换 |
| | | NewCandlestick newCandlestick = new NewCandlestick(); |
| | | String nekkusdt = split[1]; |
| | | ch = StrUtil.format(ch, nekkusdt, period); |
| | | newCandlestick.setCh(ch); |
| | | CandlestickModel model = new CandlestickModel(); |
| | | model.setVol(value.getVolume()); |
| | | model.setLow(value.getLow()); |
| | | model.setOpen(value.getOpen()); |
| | | model.setHigh(value.getHigh()); |
| | | model.setCount(value.getCount()); |
| | | model.setAmount(value.getAmount()); |
| | | model.setId(value.getTimestamp() / 1000); |
| | | model.setTimestamp(value.getTimestamp() / 1000); |
| | | model.setClose(value.getClose()); |
| | | newCandlestick.setTick(model); |
| | | sendMessageKlineNow(JSONObject.toJSONString(newCandlestick), session); |
| | | } |
| | | } |
| | | |
| | | // 取消订阅 |
| | | if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { |
| | | // `market.${symbol}.kline.${strPeriod} |
| | | log.info("取消订阅最新K消息:{}", session.getId()); |
| | | // {unsub:'market.${symbol}.kline.${strPeriod}'} |
| | | String unsub = jsonObject.get("unsub").toString(); |
| | | String[] split = unsub.split("\\."); |
| | | String strPeriod = split[3]; |
| | | if ("60min".equals(strPeriod)) { |
| | | strPeriod = "1hour"; |
| | | } |
| | | String symbol = split[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | | String key = symbol + "-" + strPeriod; |
| | |
| | | String symbol = split[1]; |
| | | symbol = CoinTypeConvert.convert(symbol); |
| | | String period = split[3]; |
| | | if ("60min".equals(period)) { |
| | | period = "1hour"; |
| | | } |
| | | //String key = symbol+"-"+period; |
| | | // String key = "KINE_BCH/USDT_1week"; |
| | | String key = "KINE_{}_{}"; |
| | | // 币币k线数据 |
| | | key = StrUtil.format(key, symbol, period); |
| | | //key = StrUtil.format(key, symbol, period); |
| | | key = StrUtil.format(key, "BZZ/USDT", period); |
| | | RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); |
| | | Object o = bean.get(key); |
| | | sendMessageHistory(JSON.toJSONString(o), session); |
| | | List<CandlestickModel> candlestickModels = new ArrayList<>(); |
| | | CandlestickResult result = new CandlestickResult(); |
| | | result.setRep(sub); |
| | | if (o != null) { |
| | | List<Candlestick> list = (List<Candlestick>) o; |
| | | |
| | | if(list!=null && list.size()>300){ |
| | | int size = list.size(); |
| | | list = list.subList(size-300,size); |
| | | } |
| | | CandlestickModel model = null; |
| | | for (Candlestick candlestick : list) { |
| | | model = new CandlestickModel(); |
| | | model.setAmount(candlestick.getAmount()); |
| | | model.setClose(candlestick.getClose()); |
| | | model.setCount(candlestick.getCount()); |
| | | model.setHigh(candlestick.getHigh()); |
| | | model.setId(candlestick.getTimestamp() / 1000); |
| | | model.setOpen(candlestick.getOpen()); |
| | | model.setLow(candlestick.getLow()); |
| | | model.setVol(candlestick.getVolume()); |
| | | candlestickModels.add(model); |
| | | } |
| | | result.setData(candlestickModels); |
| | | } |
| | | |
| | | sendMessageHistory(JSON.toJSONString(result), session); |
| | | } |
| | | } |
| | | |
| | | @OnError |
| | | public void onError(Session session, Throwable error) { |
| | | log.error("发生错误"); |
| | | error.printStackTrace(); |
| | | // log.error("发生错误"); |
| | | //error.printStackTrace(); |
| | | } |
| | | |
| | | /** |
| | | * 群发消息 |
| | | * 发送盘口消息 |
| | | * |
| | | * @param message 消息内容 |
| | | */ |
| | | public void sendMessagePlate(String symbol,String message, Session fromSession) { |
| | | public void sendMessagePlate(String symbol, String message, Session fromSession) { |
| | | if (tradeplateClients.containsKey(symbol)) { |
| | | Map<String, Session> nekk = tradeplateClients.get(symbol); |
| | | for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { |
| | | Session toSession = sessionEntry.getValue(); |
| | | // 排除掉自己 |
| | | //if (!fromSession.getId().equals(toSession.getId())) { |
| | | log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | |
| | | // } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * 发送历史K线 |
| | | * |
| | | * @param message, toSession |
| | | * @return void |
| | | */ |
| | | public void sendMessageHistory(String message, Session toSession) { |
| | | log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 发送最新K线 |
| | | * |
| | | * @param symbol, period, message, fromSession |
| | | * @return void |
| | | */ |
| | | public void sendMessageKline(String symbol, String period, String message, Session fromSession) { |
| | | String key = symbol + "-" + period; |
| | | if (klineClients.containsKey(key)) { |
| | | Map<String, Session> stringSessionMap = klineClients.get(key); |
| | | for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { |
| | | Session toSession = sessionEntry.getValue(); |
| | | // 排除掉自己 |
| | | //if (!fromSession.getId().equals(toSession.getId())) { |
| | | log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | // } |
| | | } |
| | | } |
| | | } |
| | | |
| | | public void sendMessageKlineNow(String message, Session toSession) { |
| | | boolean open = toSession.isOpen(); |
| | | if (open) { |
| | | toSession.getAsyncRemote().sendText(message); |
| | | } |
| | | } |
| | | } |