package com.xcong.excoin.websocket;
|
|
import cn.hutool.core.util.StrUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.xcong.excoin.common.contants.AppContants;
|
import com.xcong.excoin.utils.CoinTypeConvert;
|
import com.xcong.excoin.utils.RedisUtils;
|
import com.xcong.excoin.utils.SpringContextHolder;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.collections.CollectionUtils;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import javax.websocket.*;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
import java.util.Collection;
|
import java.util.HashMap;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@Slf4j
|
@ServerEndpoint(value = "/trade/market")
|
@Component
|
public class TradePlateSendWebSocket {
|
@Resource
|
RedisUtils redisUtils;
|
|
/**
|
* 记录当前在线连接数
|
*/
|
private static AtomicInteger onlineCount = new AtomicInteger(0);
|
|
|
private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>();
|
|
private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>();
|
|
/**
|
* 连接建立成功调用的方法
|
*/
|
@OnOpen
|
public void onOpen(Session session) {
|
onlineCount.incrementAndGet(); // 在线数加1
|
log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
|
}
|
|
/**
|
* 连接关闭调用的方法
|
*/
|
@OnClose
|
public void onClose(Session session) {
|
onlineCount.decrementAndGet(); // 在线数减1
|
// Collection<Map<String, Session>> values = tradeplateClients.values();
|
// if(CollectionUtils.isNotEmpty(values)){
|
// for(Map<String,Session> map : values){
|
// map.remove(session.getId());
|
// }
|
// }
|
log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
|
}
|
|
/**
|
* 收到客户端消息后调用的方法
|
*
|
* @param message 客户端发送过来的消息
|
*/
|
@OnMessage
|
public void onMessage(String message, Session session) {
|
log.info("服务端收到客户端[{}]的消息:{}", session.getId(), message);
|
// 订阅方法 {sub: 'market.' + symbol + '.depth.' + this._caculateType(),id: symbol
|
//}
|
JSONObject jsonObject = JSON.parseObject(message);
|
// 盘口的判断
|
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
|
log.info("订阅盘口消息:{}", session.getId());
|
String sub = jsonObject.get("sub").toString();
|
String symbol = sub.split("\\.")[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
if (tradeplateClients.containsKey(symbol)) {
|
tradeplateClients.get(symbol).put(session.getId(), session);
|
} else {
|
Map<String, Session> map = new HashMap<>();
|
map.put(session.getId(), session);
|
tradeplateClients.put(symbol, map);
|
}
|
}
|
// 取消盘口订阅
|
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
|
// `market.${symbol}.kline.${strPeriod}
|
log.info("取消订阅盘口消息:{}", session.getId());
|
String unsub = jsonObject.get("unsub").toString();
|
String[] split = unsub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String key = symbol;
|
if (tradeplateClients.containsKey(key)) {
|
tradeplateClients.get(key).remove(session.getId());
|
}
|
}
|
|
|
// 最新K线订阅
|
|
// 根据消息判断这个用户需要订阅哪种数据
|
// {sub: `market.${symbol}.kline.${strPeriod}`,
|
// symbol: symbol,
|
// period: strPeriod
|
//}
|
// 取消订阅 {unsub: xxx(标识)}
|
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
|
// 订阅
|
log.info("订阅最新K线消息:{}", session.getId());
|
String sub = jsonObject.get("sub").toString();
|
String[] split = sub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String period = split[3];
|
String key = symbol + "-" + period;
|
if (klineClients.containsKey(key)) {
|
// 有这个币种K线
|
Map<String, Session> stringSessionMap = klineClients.get(key);
|
if (!stringSessionMap.containsKey(session.getId())) {
|
stringSessionMap.put(session.getId(), session);
|
}
|
} else {
|
Map<String, Session> stringSessionMap = new HashMap<>();
|
stringSessionMap.put(session.getId(), session);
|
klineClients.put(key, stringSessionMap);
|
}
|
}
|
|
// 取消订阅
|
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
|
// `market.${symbol}.kline.${strPeriod}
|
log.info("取消订阅最新K消息:{}", session.getId());
|
String unsub = jsonObject.get("unsub").toString();
|
String[] split = unsub.split("\\.");
|
String strPeriod = split[3];
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String key = symbol + "-" + strPeriod;
|
if (klineClients.containsKey(key)) {
|
klineClients.get(key).remove(session.getId());
|
}
|
}
|
|
// 历史K线订阅
|
// {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"}
|
if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) {
|
String sub = jsonObject.get("req").toString();
|
String[] split = sub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String period = split[3];
|
//String key = symbol+"-"+period;
|
// String key = "KINE_BCH/USDT_1week";
|
String key = "KINE_{}_{}";
|
// 币币k线数据
|
key = StrUtil.format(key, symbol, period);
|
RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
|
Object o = bean.get(key);
|
sendMessageHistory(JSON.toJSONString(o), session);
|
}
|
}
|
|
@OnError
|
public void onError(Session session, Throwable error) {
|
log.error("发生错误");
|
error.printStackTrace();
|
}
|
|
/**
|
* 群发消息
|
*
|
* @param message 消息内容
|
*/
|
public void sendMessagePlate(String symbol,String message, Session fromSession) {
|
if (tradeplateClients.containsKey(symbol)) {
|
Map<String, Session> nekk = tradeplateClients.get(symbol);
|
for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
|
Session toSession = sessionEntry.getValue();
|
// 排除掉自己
|
//if (!fromSession.getId().equals(toSession.getId())) {
|
log.info("服务端给客户端[{}]发送盘口消息{}", toSession.getId(), message);
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
|
// }
|
}
|
}
|
|
}
|
|
public void sendMessageHistory(String message, Session toSession) {
|
log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
}
|
|
public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
|
String key = symbol + "-" + period;
|
if (klineClients.containsKey(key)) {
|
Map<String, Session> stringSessionMap = klineClients.get(key);
|
for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
|
Session toSession = sessionEntry.getValue();
|
// 排除掉自己
|
//if (!fromSession.getId().equals(toSession.getId())) {
|
log.info("服务端给客户端[{}]发送消息{}", toSession.getId(), message);
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
// }
|
}
|
}
|
}
|
}
|