zainali5120
2020-10-08 c24fc100ef9966495dc706e110fc37f13e003448
src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java
@@ -28,8 +28,6 @@
@ServerEndpoint(value = "/trade/market")
@Component
public class TradePlateSendWebSocket {
    @Resource
    RedisUtils redisUtils;
    /**
     * 记录当前在线连接数
@@ -57,15 +55,15 @@
    public void onClose(Session session) {
        onlineCount.decrementAndGet(); // 在线数减1
        Collection<Map<String, Session>> values = tradeplateClients.values();
        if(CollectionUtils.isNotEmpty(values)){
            for(Map<String,Session> map : values){
        if (CollectionUtils.isNotEmpty(values)) {
            for (Map<String, Session> map : values) {
                map.remove(session.getId());
            }
        }
        Collection<Map<String, Session>> klineClientsValues = klineClients.values();
        if(CollectionUtils.isNotEmpty(klineClientsValues)){
            for(Map<String,Session> map : klineClientsValues){
        if (CollectionUtils.isNotEmpty(klineClientsValues)) {
            for (Map<String, Session> map : klineClientsValues) {
                map.remove(session.getId());
            }
        }
@@ -101,20 +99,20 @@
            SubResultModel subResultModel = new SubResultModel();
            subResultModel.setId("rocusdt");
            subResultModel.setSubbed(sub);
            synchronized (session){
            synchronized (session) {
                try {
                    session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel));
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
           synchronized (session){
               try {
                   session.getBasicRemote().sendText(nekk);
               } catch (IOException e) {
                   e.printStackTrace();
               }
           }
            synchronized (session) {
                try {
                    session.getBasicRemote().sendText(nekk);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
@@ -140,7 +138,7 @@
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String period = split[3];
            if("60min".equals(period)){
            if ("60min".equals(period)) {
                period = "1hour";
            }
            String key = symbol + "-" + period;
@@ -155,6 +153,37 @@
                stringSessionMap.put(session.getId(), session);
                klineClients.put(key, stringSessionMap);
            }
            // 给他发送最新K线 TODO
            String newKline = "NEW_KINE_{}";
            key = StrUtil.format(newKline, symbol);
            RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class);
            Object o = redisUtils.get(key);
            Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>) o;
            String ch = "market.{}.kline.{}";
            String chKey = period;
            if (period.equals("60min")) {
                chKey = "1hour";
            }
            if(currentKlineMap!=null && currentKlineMap.containsKey(chKey)){
                Candlestick value = currentKlineMap.get(chKey);
                // 转换
                NewCandlestick newCandlestick = new NewCandlestick();
                String nekkusdt = split[1];
                ch = StrUtil.format(ch, nekkusdt, period);
                newCandlestick.setCh(ch);
                CandlestickModel model = new CandlestickModel();
                model.setVol(value.getVolume());
                model.setLow(value.getLow());
                model.setOpen(value.getOpen());
                model.setHigh(value.getHigh());
                model.setCount(value.getCount());
                model.setAmount(value.getAmount());
                model.setId(value.getTimestamp() / 1000);
                model.setTimestamp(value.getTimestamp() / 1000);
                model.setClose(value.getClose());
                newCandlestick.setTick(model);
                sendMessageKlineNow(JSONObject.toJSONString(newCandlestick), session);
            }
        }
        // 取消订阅
@@ -163,7 +192,7 @@
            String unsub = jsonObject.get("unsub").toString();
            String[] split = unsub.split("\\.");
            String strPeriod = split[3];
            if("60min".equals(strPeriod)){
            if ("60min".equals(strPeriod)) {
                strPeriod = "1hour";
            }
            String symbol = split[1];
@@ -182,7 +211,7 @@
            String symbol = split[1];
            symbol = CoinTypeConvert.convert(symbol);
            String period = split[3];
            if("60min".equals(period)){
            if ("60min".equals(period)) {
                period = "1hour";
            }
            //String key = symbol+"-"+period;
@@ -196,15 +225,15 @@
            List<CandlestickModel> candlestickModels = new ArrayList<>();
            CandlestickResult result = new CandlestickResult();
            result.setRep(sub);
            if(o!=null){
                List<Candlestick> list = (List<Candlestick>)o;
                for(Candlestick candlestick : list){
            if (o != null) {
                List<Candlestick> list = (List<Candlestick>) o;
                for (Candlestick candlestick : list) {
                    CandlestickModel model = new CandlestickModel();
                    model.setAmount(candlestick.getAmount());
                    model.setClose(candlestick.getClose());
                    model.setCount(candlestick.getCount());
                    model.setHigh(candlestick.getHigh());
                    model.setId(candlestick.getTimestamp()/1000);
                    model.setId(candlestick.getTimestamp() / 1000);
                    model.setOpen(candlestick.getOpen());
                    model.setLow(candlestick.getLow());
                    model.setVol(candlestick.getVolume());
@@ -228,7 +257,7 @@
     *
     * @param message 消息内容
     */
    public void sendMessagePlate(String symbol,String message, Session fromSession) {
    public void sendMessagePlate(String symbol, String message, Session fromSession) {
        if (tradeplateClients.containsKey(symbol)) {
            Map<String, Session> nekk = tradeplateClients.get(symbol);
            for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
@@ -245,6 +274,7 @@
    /**
     * 发送历史K线
     *
     * @param message, toSession
     * @return void
     */
@@ -257,6 +287,7 @@
    /**
     * 发送最新K线
     *
     * @param symbol, period, message, fromSession
     * @return void
     */
@@ -273,4 +304,11 @@
            }
        }
    }
    public void sendMessageKlineNow(String message, Session toSession) {
        boolean open = toSession.isOpen();
        if (open) {
            toSession.getAsyncRemote().sendText(message);
        }
    }
}