5 files added
3 files modified
New file |
| | |
| | | package com.xcong.excoin.netty.bean; |
| | | |
| | | import lombok.Data; |
| | | |
| | | import java.io.Serializable; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @date 2021-02-24 |
| | | **/ |
| | | @Data |
| | | public class SubRequest implements Serializable { |
| | | private static final long serialVersionUID = -5661972007013485438L; |
| | | private String sub; |
| | | |
| | | private String id; |
| | | |
| | | } |
New file |
| | |
| | | package com.xcong.excoin.netty.bean; |
| | | |
| | | import lombok.Data; |
| | | |
| | | import java.io.Serializable; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @date 2021-02-24 |
| | | **/ |
| | | @Data |
| | | public class SubResponse implements Serializable { |
| | | |
| | | private String id; |
| | | |
| | | private String status; |
| | | |
| | | private String subbed; |
| | | |
| | | private long ts; |
| | | } |
New file |
| | |
| | | package com.xcong.excoin.netty.bean; |
| | | |
| | | import lombok.Data; |
| | | |
| | | import java.io.Serializable; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @date 2021-02-24 |
| | | **/ |
| | | @Data |
| | | public class UnSubRequest implements Serializable { |
| | | |
| | | private static final long serialVersionUID = -7082425849051404678L; |
| | | private String unsub; |
| | | |
| | | private String id; |
| | | } |
New file |
| | |
| | | package com.xcong.excoin.netty.bean; |
| | | |
| | | import lombok.Data; |
| | | |
| | | import java.io.Serializable; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @date 2021-02-24 |
| | | **/ |
| | | @Data |
| | | public class UnSubResponse implements Serializable { |
| | | |
| | | private String id; |
| | | |
| | | private String status; |
| | | |
| | | private String subbed; |
| | | |
| | | private long ts; |
| | | } |
| | |
| | | import io.netty.channel.group.DefaultChannelGroup; |
| | | import io.netty.util.concurrent.GlobalEventExecutor; |
| | | |
| | | import javax.validation.constraints.NotBlank; |
| | | import javax.validation.constraints.NotNull; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | |
| | |
| | | public class ChannelManager { |
| | | |
| | | private static final ChannelGroup WEBSOCKET_GROUP = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> DEPTH_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> TRADE_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | private static final ConcurrentMap<String, ChannelGroup> KLINE_MAP = new ConcurrentHashMap<>(); |
| | | |
| | | // 当前连接到服务器的通道(tcp和websocket) |
| | | private static final ConcurrentMap<String, ChannelId> CHANNEL_MAP = new ConcurrentHashMap<>(); |
| | |
| | | WEBSOCKET_GROUP.writeAndFlush(NettyTools.webSocketBytes(msg)); |
| | | } |
| | | |
| | | public static void putSymbolSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { |
| | | switch (type) { |
| | | case "kline" : |
| | | ChannelGroup kline = KLINE_MAP.get(symbol); |
| | | if (kline == null) { |
| | | kline = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | kline.add(channel); |
| | | KLINE_MAP.put(symbol, kline); |
| | | break; |
| | | case "depth" : |
| | | ChannelGroup depth = DEPTH_MAP.get(symbol); |
| | | if (depth == null) { |
| | | depth = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | depth.add(channel); |
| | | DEPTH_MAP.put(symbol, depth); |
| | | break; |
| | | case "trade" : |
| | | ChannelGroup trade = TRADE_MAP.get(symbol); |
| | | if (trade == null) { |
| | | trade = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | trade.add(channel); |
| | | TRADE_MAP.put(symbol, trade); |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | } |
| | | |
| | | public static void removeSymbolUnSubChannel(@NotBlank String symbol, @NotNull Channel channel, @NotBlank String type) { |
| | | switch (type) { |
| | | case "kline" : |
| | | ChannelGroup kline = KLINE_MAP.get(symbol); |
| | | if (kline == null) { |
| | | return; |
| | | } |
| | | kline.remove(channel); |
| | | KLINE_MAP.put(symbol, kline); |
| | | break; |
| | | case "depth" : |
| | | ChannelGroup depth = DEPTH_MAP.get(symbol); |
| | | if (depth == null) { |
| | | return; |
| | | } |
| | | depth.remove(channel); |
| | | DEPTH_MAP.put(symbol, depth); |
| | | break; |
| | | case "trade" : |
| | | ChannelGroup trade = TRADE_MAP.get(symbol); |
| | | if (trade == null) { |
| | | return; |
| | | } |
| | | trade.remove(channel); |
| | | TRADE_MAP.put(symbol, trade); |
| | | break; |
| | | default: |
| | | break; |
| | | } |
| | | } |
| | | |
| | | public static ChannelGroup getChannelGroup(@NotBlank String symbol, @NotBlank String type) { |
| | | switch (type) { |
| | | case "kline" : |
| | | return KLINE_MAP.get(symbol); |
| | | case "depth" : |
| | | return DEPTH_MAP.get(symbol); |
| | | case "trade" : |
| | | return TRADE_MAP.get(symbol); |
| | | default: |
| | | return new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.xcong.excoin.netty.bean.RequestBean; |
| | | import com.xcong.excoin.netty.bean.SubRequest; |
| | | import com.xcong.excoin.netty.bean.SubResponse; |
| | | import com.xcong.excoin.netty.bean.UnSubResponse; |
| | | import com.xcong.excoin.netty.common.ChannelManager; |
| | | import com.xcong.excoin.netty.common.NettyTools; |
| | | import com.xcong.excoin.netty.logic.MsgLogic; |
| | | import io.netty.channel.ChannelHandlerContext; |
| | |
| | | private MsgLogic msgLogic; |
| | | |
| | | public void webSocketDispatch(ChannelHandlerContext ctx, String msg) { |
| | | RequestBean requestBean = null; |
| | | try { |
| | | requestBean = JSONObject.parseObject(msg, RequestBean.class); |
| | | requestBean.setChannelId(ctx.channel().id().asShortText()); |
| | | msgLogic.webSocketMsgLogic(requestBean); |
| | | } catch (Exception e) { |
| | | log.info("#websocket json error:{}#", e); |
| | | ctx.channel().writeAndFlush(NettyTools.webSocketBytes("params error")); |
| | | JSONObject jsonObject = JSONObject.parseObject(msg); |
| | | if (jsonObject.containsKey("sub")) { |
| | | String sub = jsonObject.getString("sub"); |
| | | log.info("{}", sub); |
| | | String[] split = sub.split("\\."); |
| | | if (split.length != 4) { |
| | | ctx.writeAndFlush("error"); |
| | | return; |
| | | } |
| | | |
| | | ChannelManager.putSymbolSubChannel(split[1], ctx.channel(), split[2]); |
| | | SubResponse subResponse = new SubResponse(); |
| | | subResponse.setSubbed(sub); |
| | | subResponse.setId(jsonObject.getString("id")); |
| | | subResponse.setTs(System.currentTimeMillis()); |
| | | subResponse.setStatus("ok"); |
| | | ctx.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(subResponse))); |
| | | } else if (jsonObject.containsKey("unsub")) { |
| | | String sub = jsonObject.getString("unsub"); |
| | | String[] split = sub.split("\\."); |
| | | if (split.length != 4) { |
| | | ctx.writeAndFlush("error"); |
| | | return; |
| | | } |
| | | |
| | | ChannelManager.removeSymbolUnSubChannel(split[1], ctx.channel(), split[2]); |
| | | UnSubResponse resp = new UnSubResponse(); |
| | | resp.setSubbed(sub); |
| | | resp.setId(jsonObject.getString("id")); |
| | | resp.setTs(System.currentTimeMillis()); |
| | | resp.setStatus("ok"); |
| | | ctx.writeAndFlush(resp); |
| | | } |
| | | } |
| | | |
| | |
| | | |
| | | @Override |
| | | public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { |
| | | // LogUtil.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); |
| | | // log.info("[websocket服务器收到消息]-->{}, {}", ctx.channel().id(), msg); |
| | | if (msg instanceof FullHttpRequest) { |
| | | // 以http请求形式接入,但是走的是websocket |
| | | handleHttpRequest(ctx, (FullHttpRequest) msg); |
| | |
| | | @Override |
| | | public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { |
| | | log.info("[触发器触发]"); |
| | | // super.userEventTriggered(ctx, evt); |
| | | if (evt instanceof IdleStateEvent) { |
| | | IdleStateEvent event = (IdleStateEvent) evt; |
| | | if (event.state() == IdleState.READER_IDLE) { |
| | |
| | | times = 0; |
| | | } |
| | | /*读超时*/ |
| | | log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); |
| | | // log.info("===服务端===({}写超时, {})", ctx.channel().id().asShortText(), times); |
| | | // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 |
| | | if (times >= MAX_UN_REC_PING_TIMES) { |
| | | log.info("===服务端===(写超时,关闭chanel)"); |
New file |
| | |
| | | package com.xcong.excoin.quartz.job; |
| | | |
| | | import com.alibaba.fastjson.JSONObject; |
| | | import com.huobi.client.SubscriptionClient; |
| | | import com.huobi.client.SubscriptionOptions; |
| | | import com.huobi.client.model.Candlestick; |
| | | import com.huobi.client.model.enums.CandlestickInterval; |
| | | import com.huobi.client.model.enums.MBPLevelEnums; |
| | | import com.huobi.client.model.event.PriceDepthEvent; |
| | | import com.xcong.excoin.netty.server.WebSocketServer; |
| | | import com.xcong.excoin.utils.CoinTypeConvert; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import javax.annotation.PostConstruct; |
| | | |
| | | /** |
| | | * @author wzy |
| | | * @date 2021-02-23 |
| | | **/ |
| | | @Slf4j |
| | | @Component |
| | | public class KLineDataJob { |
| | | |
| | | @Autowired |
| | | WebSocketServer webSocketServer; |
| | | |
| | | @PostConstruct |
| | | public void data() throws Exception { |
| | | webSocketServer.start(); |
| | | log.info("=================="); |
| | | SubscriptionOptions subscriptionOptions = new SubscriptionOptions(); |
| | | subscriptionOptions.setConnectionDelayOnFailure(5); |
| | | subscriptionOptions.setUri("wss://api.hadax.com/ws"); |
| | | SubscriptionClient subscriptionClient = SubscriptionClient.create("", "", subscriptionOptions); |
| | | |
| | | subscriptionClient.subscribeCandlestickEvent("btcusdt,ethusdt,eosusdt,etcusdt,ltcusdt,bchusdt,xrpusdt", CandlestickInterval.DAY1, (candlestickEvent) -> { |
| | | Candlestick data = candlestickEvent.getData(); |
| | | }); |
| | | |
| | | subscriptionClient.subscribePriceDepthEvent("btcusdt", priceDepthEvent -> { |
| | | // log.info("bids:{}", JSONObject.toJSONString(priceDepthEvent.getData().getBids())); |
| | | // log.info("asks:{}", JSONObject.toJSONString(priceDepthEvent.getData().getAsks())); |
| | | }); |
| | | } |
| | | } |