From 6dbdd5b82a5361d90c08246664252e50ac626cbf Mon Sep 17 00:00:00 2001 From: zainali5120 <512061637@qq.com> Date: Wed, 16 Sep 2020 17:40:24 +0800 Subject: [PATCH] 撮合交易代码提交 --- src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java | 128 ++++++++++++++++++++++++++++++------------ 1 files changed, 91 insertions(+), 37 deletions(-) diff --git a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java index b955f11..fc1ccab 100644 --- a/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java +++ b/src/main/java/com/xcong/excoin/websocket/TradePlateSendWebSocket.java @@ -2,8 +2,11 @@ import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import com.huobi.client.model.Candlestick; import com.xcong.excoin.common.contants.AppContants; +import com.xcong.excoin.trade.CoinTraderFactory; import com.xcong.excoin.utils.CoinTypeConvert; import com.xcong.excoin.utils.RedisUtils; import com.xcong.excoin.utils.SpringContextHolder; @@ -15,9 +18,8 @@ import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; -import java.util.Collection; -import java.util.HashMap; -import java.util.Map; +import java.io.IOException; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; @@ -53,12 +55,19 @@ @OnClose public void onClose(Session session) { onlineCount.decrementAndGet(); // 在线数减1 -// Collection<Map<String, Session>> values = tradeplateClients.values(); -// if(CollectionUtils.isNotEmpty(values)){ -// for(Map<String,Session> map : values){ -// map.remove(session.getId()); -// } -// } + Collection<Map<String, Session>> values = tradeplateClients.values(); + if(CollectionUtils.isNotEmpty(values)){ + for(Map<String,Session> map : values){ + map.remove(session.getId()); + } + } + + Collection<Map<String, Session>> klineClientsValues = klineClients.values(); + if(CollectionUtils.isNotEmpty(klineClientsValues)){ + for(Map<String,Session> map : klineClientsValues){ + map.remove(session.getId()); + } + } log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get()); } @@ -69,9 +78,7 @@ */ @OnMessage public void onMessage(String message, Session session) { - log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message); - // 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol - //} + // 盘口订阅方法 {sub: 'market.btcusdt.depth.10,id: symbol} JSONObject jsonObject = JSON.parseObject(message); // 盘口的判断 if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) { @@ -85,10 +92,33 @@ map.put(session.getId(), session); tradeplateClients.put(symbol, map); } + // 发送一次盘口 + CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class); + // 发送订阅消息 + String nekk = factory.getTrader("NEKK").sendTradePlateMessage(); + SubResultModel subResultModel = new SubResultModel(); + subResultModel.setId("nekkusdt"); + subResultModel.setSubbed(sub); + synchronized (session){ + try { + session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel)); + } catch (IOException e) { + e.printStackTrace(); + } + } + synchronized (session){ + try { + session.getBasicRemote().sendText(nekk); + } catch (IOException e) { + e.printStackTrace(); + } + } + + } // 取消盘口订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) { - // `market.${symbol}.kline.${strPeriod} + // {unsub:'market.btcusdt.kline.1min'} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String symbol = split[1]; @@ -99,22 +129,18 @@ } } - // 最新K线订阅 - - // 根据消息判断这个用户需要订阅哪种数据 - // {sub: `market.${symbol}.kline.${strPeriod}`, - // symbol: symbol, - // period: strPeriod - //} - // 取消订阅 {unsub: xxx(标识)} if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) { // 订阅 + // {sub: 'market.btcusdt.kline.1min'} String sub = jsonObject.get("sub").toString(); String[] split = sub.split("\\."); String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; + if("60min".equals(period)){ + period = "1hour"; + } String key = symbol + "-" + period; if (klineClients.containsKey(key)) { // 有这个币种K线 @@ -131,10 +157,13 @@ // 取消订阅 if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) { - // `market.${symbol}.kline.${strPeriod} + // {unsub:'market.${symbol}.kline.${strPeriod}'} String unsub = jsonObject.get("unsub").toString(); String[] split = unsub.split("\\."); String strPeriod = split[3]; + if("60min".equals(strPeriod)){ + strPeriod = "1hour"; + } String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String key = symbol + "-" + strPeriod; @@ -151,14 +180,38 @@ String symbol = split[1]; symbol = CoinTypeConvert.convert(symbol); String period = split[3]; + if("60min".equals(period)){ + period = "1hour"; + } //String key = symbol+"-"+period; // String key = "KINE_BCH/USDT_1week"; String key = "KINE_{}_{}"; // 币币k线数据 - key = StrUtil.format(key, symbol, period); + //key = StrUtil.format(key, symbol, period); + key = StrUtil.format(key, "NEKK/USDT", period); RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class); Object o = bean.get(key); - sendMessageHistory(JSON.toJSONString(o), session); + List<CandlestickModel> candlestickModels = new ArrayList<>(); + CandlestickResult result = new CandlestickResult(); + result.setRep(sub); + if(o!=null){ + List<Candlestick> list = (List<Candlestick>)o; + for(Candlestick candlestick : list){ + CandlestickModel model = new CandlestickModel(); + model.setAmount(candlestick.getAmount()); + model.setClose(candlestick.getClose()); + model.setCount(candlestick.getCount()); + model.setHigh(candlestick.getHigh()); + model.setId(candlestick.getTimestamp()/1000); + model.setOpen(candlestick.getOpen()); + model.setLow(candlestick.getLow()); + model.setVol(candlestick.getVolume()); + candlestickModels.add(model); + } + result.setData(candlestickModels); + } + + sendMessageHistory(JSON.toJSONString(result), session); } } @@ -169,51 +222,52 @@ } /** - * 群发消息 + * 发送盘口消息 * * @param message 消息内容 */ - public void sendMessagePlate(String message, Session fromSession) { - if (tradeplateClients.containsKey("nekkusdt")) { - Map<String, Session> nekk = tradeplateClients.get("nekkusdt"); + public void sendMessagePlate(String symbol,String message, Session fromSession) { + if (tradeplateClients.containsKey(symbol)) { + Map<String, Session> nekk = tradeplateClients.get(symbol); for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) { Session toSession = sessionEntry.getValue(); // 排除掉自己 - //if (!fromSession.getId().equals(toSession.getId())) { - log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } - - // } } } } + /** + * 发送历史K线 + * @param message, toSession + * @return void + */ public void sendMessageHistory(String message, Session toSession) { - log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } } + /** + * 发送最新K线 + * @param symbol, period, message, fromSession + * @return void + */ public void sendMessageKline(String symbol, String period, String message, Session fromSession) { String key = symbol + "-" + period; if (klineClients.containsKey(key)) { Map<String, Session> stringSessionMap = klineClients.get(key); for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) { Session toSession = sessionEntry.getValue(); - // 排除掉自己 - //if (!fromSession.getId().equals(toSession.getId())) { - log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message); boolean open = toSession.isOpen(); if (open) { toSession.getAsyncRemote().sendText(message); } - // } } } } -- Gitblit v1.9.1