package com.xcong.excoin.websocket;
|
|
import cn.hutool.core.util.StrUtil;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONArray;
|
import com.alibaba.fastjson.JSONObject;
|
import com.huobi.client.model.Candlestick;
|
import com.xcong.excoin.common.contants.AppContants;
|
import com.xcong.excoin.modules.symbols.constants.SymbolsConstats;
|
import com.xcong.excoin.trade.CoinTraderFactory;
|
import com.xcong.excoin.utils.CoinTypeConvert;
|
import com.xcong.excoin.utils.RedisUtils;
|
import com.xcong.excoin.utils.SpringContextHolder;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.collections.CollectionUtils;
|
import org.springframework.stereotype.Component;
|
|
import javax.annotation.Resource;
|
import javax.websocket.*;
|
import javax.websocket.server.PathParam;
|
import javax.websocket.server.ServerEndpoint;
|
import java.io.IOException;
|
import java.util.*;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
@Slf4j
|
@ServerEndpoint(value = "/trade/market")
|
@Component
|
public class TradePlateSendWebSocket {
|
|
/**
|
* 记录当前在线连接数
|
*/
|
private static AtomicInteger onlineCount = new AtomicInteger(0);
|
|
|
private static Map<String, Map<String, Session>> tradeplateClients = new ConcurrentHashMap<>();
|
|
private static Map<String, Map<String, Session>> klineClients = new ConcurrentHashMap<>();
|
|
/**
|
* 连接建立成功调用的方法
|
*/
|
@OnOpen
|
public void onOpen(Session session) {
|
onlineCount.incrementAndGet(); // 在线数加1
|
// log.info("有新连接加入:{},当前在线人数为:{}", session.getId(), onlineCount.get());
|
}
|
|
/**
|
* 连接关闭调用的方法
|
*/
|
@OnClose
|
public void onClose(Session session) {
|
onlineCount.decrementAndGet(); // 在线数减1
|
Collection<Map<String, Session>> values = tradeplateClients.values();
|
if (CollectionUtils.isNotEmpty(values)) {
|
for (Map<String, Session> map : values) {
|
map.remove(session.getId());
|
}
|
}
|
|
Collection<Map<String, Session>> klineClientsValues = klineClients.values();
|
if (CollectionUtils.isNotEmpty(klineClientsValues)) {
|
for (Map<String, Session> map : klineClientsValues) {
|
map.remove(session.getId());
|
}
|
}
|
//log.info("有一连接关闭:{},当前在线人数为:{}", session.getId(), onlineCount.get());
|
}
|
|
/**
|
* 收到客户端消息后调用的方法
|
*
|
* @param message 客户端发送过来的消息
|
*/
|
@OnMessage
|
public void onMessage(String message, Session session) {
|
// 盘口订阅方法 {sub: 'market.btcusdt.depth.10,id: symbol}
|
JSONObject jsonObject = JSON.parseObject(message);
|
// log.info("订阅参数:{}", jsonObject);
|
// 盘口的判断
|
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("depth")) {
|
|
String sub = jsonObject.get("sub").toString();
|
String symbol = sub.split("\\.")[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
if (tradeplateClients.containsKey(symbol)) {
|
tradeplateClients.get(symbol).put(session.getId(), session);
|
} else {
|
Map<String, Session> map = new HashMap<>();
|
map.put(session.getId(), session);
|
tradeplateClients.put(symbol, map);
|
}
|
// 发送一次盘口
|
CoinTraderFactory factory = SpringContextHolder.getBean(CoinTraderFactory.class);
|
// 发送订阅消息
|
String nekk = factory.getTrader(SymbolsConstats.ROC).sendTradePlateMessage();
|
SubResultModel subResultModel = new SubResultModel();
|
subResultModel.setId("griceusdt");
|
subResultModel.setSubbed(sub);
|
synchronized (session) {
|
try {
|
session.getBasicRemote().sendText(JSONObject.toJSONString(subResultModel));
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
synchronized (session) {
|
try {
|
session.getBasicRemote().sendText(nekk);
|
} catch (IOException e) {
|
e.printStackTrace();
|
}
|
}
|
|
|
}
|
// 取消盘口订阅
|
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("depth")) {
|
// {unsub:'market.btcusdt.kline.1min'}
|
String unsub = jsonObject.get("unsub").toString();
|
String[] split = unsub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String key = symbol;
|
if (tradeplateClients.containsKey(key)) {
|
tradeplateClients.get(key).remove(session.getId());
|
}
|
}
|
|
// 最新K线订阅
|
if (jsonObject.containsKey("sub") && jsonObject.get("sub").toString().contains("kline")) {
|
// 订阅
|
// {sub: 'market.btcusdt.kline.1min'}
|
String sub = jsonObject.get("sub").toString();
|
String[] split = sub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String period = split[3];
|
if ("60min".equals(period)) {
|
period = "1hour";
|
}
|
String key = symbol + "-" + period;
|
if (klineClients.containsKey(key)) {
|
// 有这个币种K线
|
Map<String, Session> stringSessionMap = klineClients.get(key);
|
if (!stringSessionMap.containsKey(session.getId())) {
|
stringSessionMap.put(session.getId(), session);
|
}
|
} else {
|
Map<String, Session> stringSessionMap = new HashMap<>();
|
stringSessionMap.put(session.getId(), session);
|
klineClients.put(key, stringSessionMap);
|
}
|
// 给他发送最新K线 TODO
|
String newKline = "NEW_KINE_{}";
|
key = StrUtil.format(newKline, symbol);
|
RedisUtils redisUtils = SpringContextHolder.getBean(RedisUtils.class);
|
Object o = redisUtils.get(key);
|
Map<String, Candlestick> currentKlineMap = (Map<String, Candlestick>) o;
|
String ch = "market.{}.kline.{}";
|
String chKey = period;
|
if (period.equals("60min")) {
|
chKey = "1hour";
|
}
|
if(currentKlineMap!=null && currentKlineMap.containsKey(chKey)){
|
Candlestick value = currentKlineMap.get(chKey);
|
// 转换
|
NewCandlestick newCandlestick = new NewCandlestick();
|
String nekkusdt = split[1];
|
ch = StrUtil.format(ch, nekkusdt, period);
|
newCandlestick.setCh(ch);
|
CandlestickModel model = new CandlestickModel();
|
model.setVol(value.getVolume());
|
model.setLow(value.getLow());
|
model.setOpen(value.getOpen());
|
model.setHigh(value.getHigh());
|
model.setCount(value.getCount());
|
model.setAmount(value.getAmount());
|
model.setId(value.getTimestamp() / 1000);
|
model.setTimestamp(value.getTimestamp() / 1000);
|
model.setClose(value.getClose());
|
newCandlestick.setTick(model);
|
sendMessageKlineNow(JSONObject.toJSONString(newCandlestick), session);
|
}
|
}
|
|
// 取消订阅
|
if (jsonObject.containsKey("unsub") && jsonObject.get("unsub").toString().contains("kline")) {
|
// {unsub:'market.${symbol}.kline.${strPeriod}'}
|
String unsub = jsonObject.get("unsub").toString();
|
String[] split = unsub.split("\\.");
|
String strPeriod = split[3];
|
if ("60min".equals(strPeriod)) {
|
strPeriod = "1hour";
|
}
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String key = symbol + "-" + strPeriod;
|
if (klineClients.containsKey(key)) {
|
klineClients.get(key).remove(session.getId());
|
}
|
}
|
|
// 历史K线订阅
|
// {req: "market.nekkusdt.kline.1min", symbol: "nekkusdt", period: "1min"}
|
if (jsonObject.containsKey("req") && jsonObject.get("req").toString().contains("kline")) {
|
String sub = jsonObject.get("req").toString();
|
String[] split = sub.split("\\.");
|
String symbol = split[1];
|
symbol = CoinTypeConvert.convert(symbol);
|
String period = split[3];
|
if ("60min".equals(period)) {
|
period = "1hour";
|
}
|
//String key = symbol+"-"+period;
|
// String key = "KINE_BCH/USDT_1week";
|
String key = "KINE_{}_{}";
|
// 币币k线数据
|
//key = StrUtil.format(key, symbol, period);
|
key = StrUtil.format(key, "GRICE/USDT", period);
|
RedisUtils bean = SpringContextHolder.getBean(RedisUtils.class);
|
Object o = bean.get(key);
|
List<CandlestickModel> candlestickModels = new ArrayList<>();
|
CandlestickResult result = new CandlestickResult();
|
result.setRep(sub);
|
if (o != null) {
|
List<Candlestick> list = (List<Candlestick>) o;
|
|
if(list!=null && list.size()>300){
|
int size = list.size();
|
list = list.subList(size-300,size);
|
}
|
CandlestickModel model = null;
|
for (Candlestick candlestick : list) {
|
model = new CandlestickModel();
|
model.setAmount(candlestick.getAmount());
|
model.setClose(candlestick.getClose());
|
model.setCount(candlestick.getCount());
|
model.setHigh(candlestick.getHigh());
|
model.setId(candlestick.getTimestamp() / 1000);
|
model.setOpen(candlestick.getOpen());
|
model.setLow(candlestick.getLow());
|
model.setVol(candlestick.getVolume());
|
candlestickModels.add(model);
|
}
|
result.setData(candlestickModels);
|
}
|
|
sendMessageHistory(JSON.toJSONString(result), session);
|
}
|
}
|
|
@OnError
|
public void onError(Session session, Throwable error) {
|
// log.error("发生错误");
|
//error.printStackTrace();
|
}
|
|
/**
|
* 发送盘口消息
|
*
|
* @param message 消息内容
|
*/
|
public void sendMessagePlate(String symbol, String message, Session fromSession) {
|
if (tradeplateClients.containsKey(symbol)) {
|
Map<String, Session> nekk = tradeplateClients.get(symbol);
|
for (Map.Entry<String, Session> sessionEntry : nekk.entrySet()) {
|
Session toSession = sessionEntry.getValue();
|
// 排除掉自己
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
}
|
}
|
|
}
|
|
/**
|
* 发送历史K线
|
*
|
* @param message, toSession
|
* @return void
|
*/
|
public void sendMessageHistory(String message, Session toSession) {
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
}
|
|
/**
|
* 发送最新K线
|
*
|
* @param symbol, period, message, fromSession
|
* @return void
|
*/
|
public void sendMessageKline(String symbol, String period, String message, Session fromSession) {
|
String key = symbol + "-" + period;
|
if (klineClients.containsKey(key)) {
|
Map<String, Session> stringSessionMap = klineClients.get(key);
|
for (Map.Entry<String, Session> sessionEntry : stringSessionMap.entrySet()) {
|
Session toSession = sessionEntry.getValue();
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
}
|
}
|
}
|
|
public void sendMessageKlineNow(String message, Session toSession) {
|
boolean open = toSession.isOpen();
|
if (open) {
|
toSession.getAsyncRemote().sendText(message);
|
}
|
}
|
}
|