package com.xcong.excoin.websocket;
|
|
import cn.hutool.core.util.StrUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.huobi.client.model.Candlestick;
|
import com.xcong.excoin.common.contants.AppContants;
|
import com.xcong.excoin.trade.CoinTraderFactory;
|
import com.xcong.excoin.utils.CoinTypeConvert;
|
import com.xcong.excoin.utils.RedisUtils;
|
import com.xcong.excoin.utils.SpringContextHolder;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.collections.CollectionUtils;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import javax.websocket.*;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
import java.io.IOException;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@Slf4j
|
@ServerEndpoint(value = "/trade/market")
|
@Component
|
public class TradePlateSendWebSocket {
|
@Resource
|
RedisUtils redisUtils;
|
|
/**
|
* 记录当前在线连接数
|
*/
|
private static AtomicInteger onlineCount = new AtomicInteger(0);
|
|
|
private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>();
|
|
private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>();
|
|
/**
|
* 连接建立成功调用的方法
|
*/
|
@OnOpen
|
public void onOpen(Session session) {
|
onlineCount.incrementAndGet(); // 在线数加1
|
log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
|
}
|
|
/**
|
* 连接关闭调用的方法
|
*/
|
@OnClose
|
public void onClose(Session session) {
|
onlineCount.decrementAndGet(); // 在线数减1
|
Collection<Map<String, Session>> values = tradeplateClients.values();
|
if(CollectionUtils.isNotEmpty(values)){
|
for(Map<String,Session> map : values){
|
map.remove(session.getId());
|
}
|
}
|
|
Collection<Map<String, Session>> klineClientsValues = klineClients.values();
|
if(CollectionUtils.isNotEmpty(klineClientsValues)){
|
for(Map<String,Session> map : klineClientsValues){
|
map.remove(session.getId());
|
}
|
}
|
log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
|
}
|
|
/**
|
* 收到客户端消息后调用的方法
|
*
|
* @param message 客户端发送过来的消息
|
*/
|
@OnMessage
|
public void onMessage(String message, Session session) {
|
//log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
|
// 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
|
//}
|
JSONObject jsonObject = JSON.parseObject(message);
|
// 盘口的判断
|
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
|
//log.info("订阅盘口消息:{}", session.getId());
|
String sub = jsonObject.get("sub").toString();
|
String symbol = sub.split("\\.")[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
if (tradeplateClients.containsKey(symbol)) {
|
tradeplateClients.get(symbol).put(session.getId(), session);
|
} else {
|
Map<String, Session> map = new HashMap<>();
|
map.put(session.getId(), session);
|
tradeplateClients.put(symbol, map);
|
}
|
// 发送一次盘口
|
CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class);
|
// 发送订阅消息
|
String nekk = factory.getTrader("NEKK").sendTradePlateMessage();
|
SubResultModel subResultModel = new SubResultModel();
|
subResultModel.setId("nekkusdt");
|
subResultModel.setSubbed(sub);
|
synchronized (session){
|
try {
|
session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel));
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
synchronized (session){
|
try {
|
session.getBasicRemote().sendText(nekk);
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
|
}
|
// 取消盘口订阅
|
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
|
// `market.${symbol}.kline.${strPeriod}
|
//log.info("取消订阅盘口消息:{}", session.getId());
|
String unsub = jsonObject.get("unsub").toString();
|
String[] split = unsub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String key = symbol;
|
if (tradeplateClients.containsKey(key)) {
|
tradeplateClients.get(key).remove(session.getId());
|
}
|
}
|
|
|
// 最新K线订阅
|
|
// 根据消息判断这个用户需要订阅哪种数据
|
// {sub: `market.${symbol}.kline.${strPeriod}`,
|
// symbol: symbol,
|
// period: strPeriod
|
//}
|
// 取消订阅 {unsub: xxx(标识)}
|
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
|
// 订阅
|
String sub = jsonObject.get("sub").toString();
|
log.info("订阅最新K线消息:{}", sub);
|
String[] split = sub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String period = split[3];
|
if("60min".equals(period)){
|
period = "1hour";
|
}
|
String key = symbol + "-" + period;
|
log.info("最新K线key:{}", key);
|
if (klineClients.containsKey(key)) {
|
// 有这个币种K线
|
Map<String, Session> stringSessionMap = klineClients.get(key);
|
if (!stringSessionMap.containsKey(session.getId())) {
|
stringSessionMap.put(session.getId(), session);
|
log.info("放入最新K线Map:{}", key);
|
}
|
} else {
|
Map<String, Session> stringSessionMap = new HashMap<>();
|
stringSessionMap.put(session.getId(), session);
|
klineClients.put(key, stringSessionMap);
|
log.info("放入最新K线Map:{}", key);
|
}
|
}
|
|
// 取消订阅
|
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
|
// `market.${symbol}.kline.${strPeriod}
|
//log.info("取消订阅最新K消息:{}", session.getId());
|
String unsub = jsonObject.get("unsub").toString();
|
String[] split = unsub.split("\\.");
|
String strPeriod = split[3];
|
if("60min".equals(strPeriod)){
|
strPeriod = "1hour";
|
}
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String key = symbol + "-" + strPeriod;
|
if (klineClients.containsKey(key)) {
|
klineClients.get(key).remove(session.getId());
|
//session.getAsyncRemote().sendText(message);
|
}
|
}
|
|
// 历史K线订阅
|
// {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"}
|
if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) {
|
String sub = jsonObject.get("req").toString();
|
String[] split = sub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String period = split[3];
|
if("60min".equals(period)){
|
period = "1hour";
|
}
|
//String key = symbol+"-"+period;
|
// String key = "KINE_BCH/USDT_1week";
|
String key = "KINE_{}_{}";
|
// 币币k线数据
|
//key = StrUtil.format(key, symbol, period);
|
key = StrUtil.format(key, "NEKK/USDT", period);
|
RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
|
Object o = bean.get(key);
|
List<CandlestickModel> candlestickModels = new ArrayList<>();
|
CandlestickResult result = new CandlestickResult();
|
result.setRep(sub);
|
if(o!=null){
|
List<Candlestick> list = (List<Candlestick>)o;
|
for(Candlestick candlestick : list){
|
CandlestickModel model = new CandlestickModel();
|
model.setAmount(candlestick.getAmount());
|
model.setClose(candlestick.getClose());
|
model.setCount(candlestick.getCount());
|
model.setHigh(candlestick.getHigh());
|
model.setId(candlestick.getTimestamp()/1000);
|
model.setOpen(candlestick.getOpen());
|
model.setLow(candlestick.getLow());
|
model.setVol(candlestick.getVolume());
|
candlestickModels.add(model);
|
}
|
result.setData(candlestickModels);
|
}
|
|
sendMessageHistory(JSON.toJSONString(result), session);
|
}
|
}
|
|
@OnError
|
public void onError(Session session, Throwable error) {
|
log.error("发生错误");
|
error.printStackTrace();
|
}
|
|
/**
|
* 群发消息
|
*
|
* @param message 消息内容
|
*/
|
public void sendMessagePlate(String symbol,String message, Session fromSession) {
|
if (tradeplateClients.containsKey(symbol)) {
|
Map<String, Session> nekk = tradeplateClients.get(symbol);
|
for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
|
Session toSession = sessionEntry.getValue();
|
// 排除掉自己
|
//if (!fromSession.getId().equals(toSession.getId())) {
|
// log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
|
// }
|
}
|
}
|
|
}
|
|
public void sendMessageHistory(String message, Session toSession) {
|
// log.info("服务端给客户端[{}]发送历史K线", toSession.getId());
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
}
|
|
public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
|
|
String key = symbol + "-" + period;
|
//log.info("发送最新K线[{}],数据[{}]",key,message);
|
if (klineClients.containsKey(key)) {
|
Map<String, Session> stringSessionMap = klineClients.get(key);
|
for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
|
Session toSession = sessionEntry.getValue();
|
boolean open = toSession.isOpen();
|
if (open) {
|
log.info("服务端给客户端[{}]发送最新K线消息{}", toSession.getId(), message);
|
toSession.getAsyncRemote().sendText(message);
|
}
|
}
|
}
|
}
|
}
|