From d174d6963d62b3bd176f9e7ba3cf0d7f75a91b69 Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Wed, 16 Sep 2020 16:03:22 +0800 Subject: [PATCH] 撮合交易代码提交 --- src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 109 +++++++++++++++++++++++++++++++++++++++++++----------- 1 files changed, 87 insertions(+), 22 deletions(-) diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java index bb8bcb3..15d4168 100644 --- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java +++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java @@ -2,8 +2,11 @@ import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.huobi.client.model.Candlestick; import com.xcong.excoin.common.contants.AppContants; +import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.utils.SpringContextHolder; @@ -15,9 +18,8 @@ import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -53,12 +55,19 @@ @OnClose public void onClose(Session session) { onlineCount.decrementAndGet(); // 在线数减1 -// Collection<Map<String, Session>> values = tradeplateClients.values(); -// if(CollectionUtils.isNotEmpty(values)){ -// for(Map<String,Session> map : values){ -// map.remove(session.getId()); -// } -// } + Collection<Map<String, Session>> values = tradeplateClients.values(); + if(CollectionUtils.isNotEmpty(values)){ + for(Map<String,Session> map : values){ + map.remove(session.getId()); + } + } + + Collection<Map<String, Session>> klineClientsValues = klineClients.values(); + if(CollectionUtils.isNotEmpty(klineClientsValues)){ + for(Map<String,Session> map : klineClientsValues){ + map.remove(session.getId()); + } + } log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } @@ -69,13 +78,13 @@ */ @OnMessage public void onMessage(String message, Session session) { - log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); + //log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol //} JSONObject jsonObject = JSON.parseObject(message); // 盘口的判断 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { - log.info("订阅盘口消息:{}", session.getId()); + //log.info("订阅盘口消息:{}", session.getId()); String sub = jsonObject.get("sub").toString(); String symbol = sub.split("\\.")[1]; symbol = CoinTypeConvert.convert(symbol); @@ -86,11 +95,34 @@ map.put(session.getId(), session); tradeplateClients.put(symbol, map); } + // 发送一次盘口 + CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class); + // 发送订阅消息 + String nekk = factory.getTrader("NEKK").sendTradePlateMessage(); + SubResultModel subResultModel = new SubResultModel(); + subResultModel.setId("nekkusdt"); + subResultModel.setSubbed(sub); + synchronized (session){ + try { + session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel)); + } catch (IOException e) { + e.printStackTrace(); + } + } + synchronized (session){ + try { + session.getBasicRemote().sendText(nekk); + } catch (IOException e) { + e.printStackTrace(); + } + } + + } // 取消盘口订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { // `market.${symbol}.kline.${strPeriod} - log.info("取消订阅盘口消息:{}", session.getId()); + //log.info("取消订阅盘口消息:{}", session.getId()); String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String symbol = split[1]; @@ -112,38 +144,48 @@ // 取消订阅 {unsub: xxx(标识)} if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { // 订阅 - log.info("订阅最新K线消息:{}", session.getId()); String sub = jsonObject.get("sub").toString(); + log.info("订阅最新K线消息:{}", sub); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; + if("60min".equals(period)){ + period = "1hour"; + } String key = symbol + "-" + period; + log.info("最新K线key:{}", key); if (klineClients.containsKey(key)) { // 有这个币种K线 Map<String, Session> stringSessionMap = klineClients.get(key); if (!stringSessionMap.containsKey(session.getId())) { stringSessionMap.put(session.getId(), session); + log.info("放入最新K线Map:{}", key); } } else { Map<String, Session> stringSessionMap = new HashMap<>(); stringSessionMap.put(session.getId(), session); klineClients.put(key, stringSessionMap); + log.info("放入最新K线Map:{}", key); } } // 取消订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { // `market.${symbol}.kline.${strPeriod} - log.info("取消订阅最新K消息:{}", session.getId()); + //log.info("取消订阅最新K消息:{}", session.getId()); String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String strPeriod = split[3]; + if("60min".equals(strPeriod)){ + strPeriod = "1hour"; + } String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol + "-" + strPeriod; if (klineClients.containsKey(key)) { klineClients.get(key).remove(session.getId()); + //session.getAsyncRemote().sendText(message); } } @@ -155,14 +197,38 @@ String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; + if("60min".equals(period)){ + period = "1hour"; + } //String key = symbol+"-"+period; // String key = "KINE_BCH/USDT_1week"; String key = "KINE_{}_{}"; // 币币k线数据 - key = StrUtil.format(key, symbol, period); + //key = StrUtil.format(key, symbol, period); + key = StrUtil.format(key, "NEKK/USDT", period); RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); Object o = bean.get(key); - sendMessageHistory(JSON.toJSONString(o), session); + List<CandlestickModel> candlestickModels = new ArrayList<>(); + CandlestickResult result = new CandlestickResult(); + result.setRep(sub); + if(o!=null){ + List<Candlestick> list = (List<Candlestick>)o; + for(Candlestick candlestick : list){ + CandlestickModel model = new CandlestickModel(); + model.setAmount(candlestick.getAmount()); + model.setClose(candlestick.getClose()); + model.setCount(candlestick.getCount()); + model.setHigh(candlestick.getHigh()); + model.setId(candlestick.getTimestamp()/1000); + model.setOpen(candlestick.getOpen()); + model.setLow(candlestick.getLow()); + model.setVol(candlestick.getVolume()); + candlestickModels.add(model); + } + result.setData(candlestickModels); + } + + sendMessageHistory(JSON.toJSONString(result), session); } } @@ -184,7 +250,7 @@ Session toSession = sessionEntry.getValue(); // 排除掉自己 //if (!fromSession.getId().equals(toSession.getId())) { - log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); + // log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); @@ -197,7 +263,7 @@ } public void sendMessageHistory(String message, Session toSession) { - log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); + // log.info("服务端给客户端[{}]发送历史K线", toSession.getId()); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); @@ -205,19 +271,18 @@ } public void sendMessageKline(String symbol, String period, String message, Session fromSession) { + String key = symbol + "-" + period; + //log.info("发送最新K线[{}],数据[{}]",key,message); if (klineClients.containsKey(key)) { Map<String, Session> stringSessionMap = klineClients.get(key); for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { Session toSession = sessionEntry.getValue(); - // 排除掉自己 - //if (!fromSession.getId().equals(toSession.getId())) { - log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { + log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message); toSession.getAsyncRemote().sendText(message); } - // } } } } -- Gitblit v1.9.1