From 182c01b109ffb3b4be248c2e128835ace70eeae8 Mon Sep 17 00:00:00 2001
From: Helius <wangdoubleone@gmail.com>
Date: Thu, 27 May 2021 15:53:54 +0800
Subject: [PATCH] add websocket
---
src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java | 3
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 61 +++++++++++
src/main/java/com/xcong/excoin/common/contants/AppContants.java | 4
src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java | 10 ++
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 4
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 13 ++
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 6
src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java | 100 +++++++++++++++++++-
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 21 +++
src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java | 30 +++++
src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java | 2
src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java | 5
12 files changed, 241 insertions(+), 18 deletions(-)
diff --git a/src/main/java/com/xcong/excoin/common/contants/AppContants.java b/src/main/java/com/xcong/excoin/common/contants/AppContants.java
index dea3d1a..1848cfe 100644
--- a/src/main/java/com/xcong/excoin/common/contants/AppContants.java
+++ b/src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -96,4 +96,8 @@
public static final BigDecimal BASE_MIN_AMOUNT = new BigDecimal(100);
public static final String OTC_ORDER_CANCEL_TIMES = "OTC_ORDER_CANCEL_TIMES_";
+
+ public static final String MSG_NOTICE = "MSG_NOTICE_";
+
+ public static final String MSG_CHATTING = "MSG_CHATTING_";
}
diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
index 0511ab1..00dcc4d 100644
--- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -128,6 +128,9 @@
public static final String QUEUE_DELAY = "queue.delay";
public static final String EXCHANGE_DELAY = "exchange.delay";
+ public static final String QUEUE_MSG_HISTORY = "queue_msg_history";
+ public static final String ROUTING_KEY_MSG_HISTORY = "routing_key_msg_history";
+
@Resource
private ConnectionFactory connectionFactory;
@@ -183,6 +186,16 @@
}
@Bean
+ public Queue msgHistoryQueue() {
+ return new Queue(QUEUE_MSG_HISTORY);
+ }
+
+ @Bean
+ public Binding msgHistoryBinding() {
+ return BindingBuilder.bind(msgHistoryQueue()).to(defaultExchange()).with(ROUTING_KEY_MSG_HISTORY);
+ }
+
+ @Bean
public Queue testQueue() {
return new Queue(QUEUE_TEST, true);
}
diff --git a/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java b/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
index f902b60..3c268df 100644
--- a/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
+++ b/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
@@ -14,4 +14,6 @@
IPage<MsgListVo> getMsgList(@Param("record")OtcMsgUserListEntity otcMsgUserListEntity, Page<MsgListVo> page);
List<OtcMsgUserListEntity> selectListByMemberIdAndTargetId(@Param("memberId")Long memberId, @Param("targetId")long targetId);
+
+ OtcMsgUserListEntity selectChatListByToAndFrom(@Param("to") Long to, @Param("from") Long from);
}
diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
index a5e63a7..68dbd7f 100644
--- a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
+++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,6 +10,9 @@
private static final long serialVersionUID = 1L;
+ /**
+ * 1-鉴权 2-发送消息 3-新消息提醒
+ */
private Integer type;
private Integer status;
@@ -45,4 +48,11 @@
res.setStatus(0);
return res;
}
+
+ public static ResponseBean ok(Object data) {
+ ResponseBean res = new ResponseBean();
+ res.setStatus(1);
+ res.setData(data);
+ return res;
+ }
}
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 04163d6..6e16771 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -56,6 +56,10 @@
public static Channel findWsChannel(Long id){
ChannelId channelId = MEMBER_CHANNEL.get(id.toString());
+ if (channelId == null) {
+ return null;
+ }
+
return WEBSOCKET_GROUP.find(channelId);
}
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
index 9742e1a..2b4be56 100644
--- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -34,7 +34,9 @@
// 判断当前通道用户是否已经登陆
if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) {
- ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123"));
+ ResponseBean res = ResponseBean.fail();
+ res.setType(requestBean.getType());
+ ctx.channel().writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(res)));
return;
}
@@ -42,7 +44,7 @@
requestBean.setChannelId(ctx.channel().id().asShortText());
msgLogic.webSocketMsgLogic(requestBean);
} catch (Exception e) {
- log.info("#websocket json error:{}#", e);
+ log.info("#websocket json error:#", e);
ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail()));
}
}
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
index 7b5cd3d..6a70bc2 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -78,6 +78,7 @@
content = ((TextWebSocketFrame) frame).text();
if (content.contains(Contans.HEART_BEAT)) {
resetTimes(ctx.channel());
+ ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
} else {
SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content);
}
@@ -90,11 +91,27 @@
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- log.info("[触发器触发]");
+// log.info("[触发器触发]");
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
-
+ Integer times = pingTimes.get(ctx.channel().id().asShortText());
+ if (times == null) {
+ times = 0;
+ }
+ /*读超时*/
+// log.info("===服务端===({}读超时, {})", ctx.channel().id().asShortText(), times);
+ // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
+ if (times >= MAX_UN_REC_PING_TIMES) {
+ log.info("===服务端===(读超时,关闭chanel)");
+ // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
+ ctx.channel().close();
+ } else {
+ // 失败计数器加1
+ times++;
+ pingTimes.remove(ctx.channel().id().asShortText());
+ pingTimes.put(ctx.channel().id().asShortText(), times);
+ }
} else if (event.state() == IdleState.WRITER_IDLE) {
/*写超时*/
ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
index c202638..25f3130 100644
--- a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,9 +19,6 @@
@Component
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
-// @Autowired
-// private WebSocketServerHandler webSocketServerHandler;
-
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline cp = ch.pipeline();
@@ -32,7 +29,7 @@
cp.addLast(new HttpObjectAggregator(65536));
cp.addLast(new ChunkedWriteHandler());
// 心跳
-// ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
+ ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
// 自定义业务handler
cp.addLast(new WebSocketServerHandler());
}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
index 16b6f52..f1034c8 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -1,11 +1,21 @@
package com.xcong.excoin.netty.logic;
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.asymmetric.KeyType;
+import cn.hutool.crypto.asymmetric.RSA;
import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.configurations.properties.SecurityProperties;
+import com.xcong.excoin.netty.bean.ChatRequest;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.bean.ResponseBean;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
+import com.xcong.excoin.rabbit.producer.ChatProducer;
+import com.xcong.excoin.utils.RedisUtils;
import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -14,19 +24,64 @@
* @email wangdoubleone@gmail.com
* @date 2019-05-09
*/
+@Slf4j
@Component
public class WebSocketLogic {
+ @Autowired
+ private RedisUtils redisUtils;
+ @Autowired
+ private ChatProducer chatProducer;
+ @Autowired
+ private SecurityProperties securityProperties;
+
public void authCheck(RequestBean requestBean) {
Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
-
- ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString()));
-
ResponseBean responseBean = new ResponseBean();
responseBean.setType(requestBean.getType());
responseBean.setStatus(1);
+
+// String bearerToken = requestBean.getData().toString();
+// String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, "");
+// RSA rsa = new RSA(securityProperties.getPrivateKey(), null);
+// String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_");
+//
+// Long memberId = Long.parseLong(tokens[0]);
+
+
+ Long memberId = Long.parseLong(requestBean.getData().toString());
+
channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+ ChannelManager.addWsChannel(channel, memberId);
+ String value = redisUtils.getString(AppContants.MSG_NOTICE + memberId);
+ if (StrUtil.isNotBlank(value)) {
+ responseBean.setType(3);
+ responseBean.setData(Integer.parseInt(value));
+ channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+
+ redisUtils.del(AppContants.MSG_NOTICE + memberId);
+ }
}
+ public void sendMsg(RequestBean requestBean) {
+ String chatStr = requestBean.getData().toString();
+ ChatRequest chat = JSONObject.parseObject(chatStr, ChatRequest.class);
+ // 判断是否在线
+ Channel targetChannel = ChannelManager.findWsChannel(Long.parseLong(chat.getTo()));
+ if (targetChannel != null) {
+ targetChannel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(ResponseBean.ok(chat))));
+
+ chatProducer.sendMsgHistory(chat);
+ } else {
+ // 在redis中保存用户未在线时,给该用户发送的消息条数
+ String key = AppContants.MSG_NOTICE + chat.getTo();
+ String value = redisUtils.getString(key);
+ if (StrUtil.isEmpty(value)) {
+ redisUtils.set(key , 1);
+ } else {
+ redisUtils.set(key, Integer.parseInt(value) + 1);
+ }
+ }
+ }
}
diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
index 875461a..7e76d05 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -29,7 +29,8 @@
case Contans.AUTH_CHECK:
webSocketLogic.authCheck(requestBean);
break;
-// case Contans.HOME_SYMBOLS:
+ case Contans.MESSAGE:
+ webSocketLogic.sendMsg(requestBean);
default:
break;
}
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java
index fccfd6e..4902736 100644
--- a/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java
@@ -1,7 +1,97 @@
-package com.xcong.excoin.rabbit.consumer;/**
-*
-* @author wzy
-* @date 2021-05-27
-**/
+package com.xcong.excoin.rabbit.consumer;
+
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.modules.otc.dao.OtcMsgHistoryDao;
+import com.xcong.excoin.modules.otc.dao.OtcMsgUserListDao;
+import com.xcong.excoin.modules.otc.entity.OtcMsgHistoryEntity;
+import com.xcong.excoin.modules.otc.entity.OtcMsgUserListEntity;
+import com.xcong.excoin.netty.bean.ChatRequest;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Date;
+
+@Slf4j
+@Component
+//@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true")
public class ChatConsumer {
+
+ @Autowired
+ private OtcMsgUserListDao otcMsgUserListDao;
+
+ @Autowired
+ private OtcMsgHistoryDao otcMsgHistoryDao;
+
+ @Autowired
+ private RedisUtils redisUtils;
+
+ @RabbitListener(queues = RabbitMqConfig.QUEUE_MSG_HISTORY)
+ @Transactional(rollbackFor = Exception.class)
+ public void msgHistoryConsumer(String content) {
+ log.info("收到历史消息处理:{}", content);
+ ChatRequest chat = JSONObject.parseObject(content, ChatRequest.class);
+
+ Long toId = Long.parseLong(chat.getTo());
+ Long fromId = Long.parseLong(chat.getFrom());
+
+ // 发送人是否存在聊天框
+ OtcMsgUserListEntity fromList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getTo()), Long.parseLong(chat.getFrom()));
+ if (fromList == null) {
+ OtcMsgUserListEntity from = new OtcMsgUserListEntity();
+ from.setMemberId(Long.parseLong(chat.getFrom()));
+ from.setTargetId(Long.parseLong(chat.getTo()));
+ from.setIsRead(OtcMsgUserListEntity.ISREAD_TWO);
+ from.setLastMsgTime(new Date());
+ otcMsgUserListDao.insert(from);
+ }
+
+ // 收件人是否存在聊天框
+ OtcMsgUserListEntity toList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getFrom()), Long.parseLong(chat.getTo()));
+ if (toList == null) {
+ OtcMsgUserListEntity from = new OtcMsgUserListEntity();
+ from.setMemberId(Long.parseLong(chat.getTo()));
+ from.setTargetId(Long.parseLong(chat.getFrom()));
+ from.setIsRead(OtcMsgUserListEntity.ISREAD_ONE);
+ from.setLastMsgTime(new Date());
+ otcMsgUserListDao.insert(from);
+ } else {
+ // 收件人正在聊的用户
+ String value = redisUtils.getString(AppContants.MSG_CHATTING + chat.getTo());
+ if (StrUtil.isNotBlank(value) && value.equals(chat.getFrom())) {
+ toList.setLastMsgTime(new Date());
+ otcMsgUserListDao.updateById(toList);
+ } else {
+ toList.setIsRead(OtcMsgUserListEntity.ISREAD_TWO);
+ toList.setLastMsgTime(new Date());
+ otcMsgUserListDao.updateById(toList);
+ }
+ }
+
+
+ OtcMsgHistoryEntity toHistory = new OtcMsgHistoryEntity();
+ toHistory.setMemberId(toId);
+ toHistory.setFromMemberId(fromId);
+ toHistory.setTargetId(toId);
+ toHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_TWO);
+ toHistory.setMsgType(chat.getMsgType());
+ toHistory.setMsg(chat.getContent());
+
+ OtcMsgHistoryEntity fromHistory = new OtcMsgHistoryEntity();
+ BeanUtil.copyProperties(toHistory, fromHistory);
+ fromHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_ONE);
+ fromHistory.setMemberId(fromId);
+
+ otcMsgHistoryDao.insert(fromHistory);
+ otcMsgHistoryDao.insert(toHistory);
+ }
}
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java
index 6acaee6..d939432 100644
--- a/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java
@@ -1,5 +1,33 @@
package com.xcong.excoin.rabbit.producer;
-public class ChatProducer {
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.netty.bean.ChatRequest;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+@Slf4j
+@Component
+public class ChatProducer implements RabbitTemplate.ConfirmCallback {
+
+ @Autowired
+ private RabbitTemplate rabbitTemplate;
+
+ public void sendMsgHistory(ChatRequest chatRequest) {
+ CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
+ log.info("消息持久化消息: {}, {}", chatRequest, correlationData.getId());
+ String str = JSONObject.toJSONString(chatRequest);
+ rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_KEY_MSG_HISTORY, str, correlationData);
+ }
+
+ @Override
+ public void confirm(CorrelationData correlationData, boolean b, String s) {
+
+ }
}
--
Gitblit v1.9.1