From 61d5fd4ace2f9b6455dcf13df54a7ba7fa2baf36 Mon Sep 17 00:00:00 2001 From: xiaoyong931011 <15274802129@163.com> Date: Thu, 27 May 2021 16:25:32 +0800 Subject: [PATCH] Merge branch 'otc' of http://120.27.238.55:7000/r/exchange into otc --- src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java | 72 +++++++++++++ src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java | 10 ++ src/main/java/com/xcong/excoin/netty/common/ChannelManager.java | 4 src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java | 13 ++ src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java | 6 src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java | 2 src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java | 5 src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java | 3 src/main/java/com/xcong/excoin/common/contants/AppContants.java | 4 src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java | 97 +++++++++++++++++++ src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java | 21 +++ src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java | 33 ++++++ src/main/resources/mapper/otc/OtcMsgUserListDao.xml | 5 + 13 files changed, 263 insertions(+), 12 deletions(-) diff --git a/src/main/java/com/xcong/excoin/common/contants/AppContants.java b/src/main/java/com/xcong/excoin/common/contants/AppContants.java index dea3d1a..1848cfe 100644 --- a/src/main/java/com/xcong/excoin/common/contants/AppContants.java +++ b/src/main/java/com/xcong/excoin/common/contants/AppContants.java @@ -96,4 +96,8 @@ public static final BigDecimal BASE_MIN_AMOUNT = new BigDecimal(100); public static final String OTC_ORDER_CANCEL_TIMES = "OTC_ORDER_CANCEL_TIMES_"; + + public static final String MSG_NOTICE = "MSG_NOTICE_"; + + public static final String MSG_CHATTING = "MSG_CHATTING_"; } diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java index 0511ab1..00dcc4d 100644 --- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java +++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java @@ -128,6 +128,9 @@ public static final String QUEUE_DELAY = "queue.delay"; public static final String EXCHANGE_DELAY = "exchange.delay"; + public static final String QUEUE_MSG_HISTORY = "queue_msg_history"; + public static final String ROUTING_KEY_MSG_HISTORY = "routing_key_msg_history"; + @Resource private ConnectionFactory connectionFactory; @@ -183,6 +186,16 @@ } @Bean + public Queue msgHistoryQueue() { + return new Queue(QUEUE_MSG_HISTORY); + } + + @Bean + public Binding msgHistoryBinding() { + return BindingBuilder.bind(msgHistoryQueue()).to(defaultExchange()).with(ROUTING_KEY_MSG_HISTORY); + } + + @Bean public Queue testQueue() { return new Queue(QUEUE_TEST, true); } diff --git a/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java b/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java index f902b60..3c268df 100644 --- a/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java +++ b/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java @@ -14,4 +14,6 @@ IPage<MsgListVo> getMsgList(@Param("record")OtcMsgUserListEntity otcMsgUserListEntity, Page<MsgListVo> page); List<OtcMsgUserListEntity> selectListByMemberIdAndTargetId(@Param("memberId")Long memberId, @Param("targetId")long targetId); + + OtcMsgUserListEntity selectChatListByToAndFrom(@Param("to") Long to, @Param("from") Long from); } diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java index a5e63a7..68dbd7f 100644 --- a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java +++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java @@ -10,6 +10,9 @@ private static final long serialVersionUID = 1L; + /** + * 1-鉴权 2-发送消息 3-新消息提醒 + */ private Integer type; private Integer status; @@ -45,4 +48,11 @@ res.setStatus(0); return res; } + + public static ResponseBean ok(Object data) { + ResponseBean res = new ResponseBean(); + res.setStatus(1); + res.setData(data); + return res; + } } diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java index 04163d6..6e16771 100644 --- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java +++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java @@ -56,6 +56,10 @@ public static Channel findWsChannel(Long id){ ChannelId channelId = MEMBER_CHANNEL.get(id.toString()); + if (channelId == null) { + return null; + } + return WEBSOCKET_GROUP.find(channelId); } diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java index 9742e1a..2b4be56 100644 --- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java +++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java @@ -34,7 +34,9 @@ // 判断当前通道用户是否已经登陆 if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) { - ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123")); + ResponseBean res = ResponseBean.fail(); + res.setType(requestBean.getType()); + ctx.channel().writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(res))); return; } @@ -42,7 +44,7 @@ requestBean.setChannelId(ctx.channel().id().asShortText()); msgLogic.webSocketMsgLogic(requestBean); } catch (Exception e) { - log.info("#websocket json error:{}#", e); + log.info("#websocket json error:#", e); ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail())); } } diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java index 7b5cd3d..6a70bc2 100644 --- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java +++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java @@ -78,6 +78,7 @@ content = ((TextWebSocketFrame) frame).text(); if (content.contains(Contans.HEART_BEAT)) { resetTimes(ctx.channel()); + ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); } else { SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content); } @@ -90,11 +91,27 @@ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - log.info("[触发器触发]"); +// log.info("[触发器触发]"); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { - + Integer times = pingTimes.get(ctx.channel().id().asShortText()); + if (times == null) { + times = 0; + } + /*读超时*/ +// log.info("===服务端===({}读超时, {})", ctx.channel().id().asShortText(), times); + // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 + if (times >= MAX_UN_REC_PING_TIMES) { + log.info("===服务端===(读超时,关闭chanel)"); + // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 + ctx.channel().close(); + } else { + // 失败计数器加1 + times++; + pingTimes.remove(ctx.channel().id().asShortText()); + pingTimes.put(ctx.channel().id().asShortText(), times); + } } else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java index c202638..25f3130 100644 --- a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java +++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java @@ -19,9 +19,6 @@ @Component public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { -// @Autowired -// private WebSocketServerHandler webSocketServerHandler; - @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline cp = ch.pipeline(); @@ -32,7 +29,7 @@ cp.addLast(new HttpObjectAggregator(65536)); cp.addLast(new ChunkedWriteHandler()); // 心跳 -// ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); + ch.pipeline().addLast(new IdleStateHandler(10, 0, 0)); // 自定义业务handler cp.addLast(new WebSocketServerHandler()); } diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java index 16b6f52..44751e1 100644 --- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java +++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java @@ -1,11 +1,22 @@ package com.xcong.excoin.netty.logic; +import cn.hutool.core.util.StrUtil; +import cn.hutool.crypto.asymmetric.KeyType; +import cn.hutool.crypto.asymmetric.RSA; import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.contants.AppContants; +import com.xcong.excoin.configurations.properties.SecurityProperties; +import com.xcong.excoin.modules.member.entity.MemberEntity; +import com.xcong.excoin.netty.bean.ChatRequest; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.bean.ResponseBean; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.NettyTools; +import com.xcong.excoin.rabbit.producer.ChatProducer; +import com.xcong.excoin.utils.RedisUtils; import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -14,19 +25,74 @@ * @email wangdoubleone@gmail.com * @date 2019-05-09 */ +@Slf4j @Component public class WebSocketLogic { + @Autowired + private RedisUtils redisUtils; + @Autowired + private ChatProducer chatProducer; + @Autowired + private SecurityProperties securityProperties; + public void authCheck(RequestBean requestBean) { Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); - - ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString())); - ResponseBean responseBean = new ResponseBean(); responseBean.setType(requestBean.getType()); responseBean.setStatus(1); + +// String bearerToken = requestBean.getData().toString(); +// String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, ""); +// RSA rsa = new RSA(securityProperties.getPrivateKey(), null); +// String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_"); +// +// Long memberId = Long.parseLong(tokens[0]); + + String token = requestBean.getData().toString(); + String redisKey = AppContants.APP_LOGIN_PREFIX + token; + String loginStr = redisUtils.getString(redisKey); + if (StrUtil.isBlank(loginStr)) { + ResponseBean res = ResponseBean.fail(); + res.setType(requestBean.getType()); + channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(res))); + return; + } + + MemberEntity loginUser = JSONObject.parseObject(loginStr, MemberEntity.class); + Long memberId = loginUser.getId(); + channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); + ChannelManager.addWsChannel(channel, memberId); + String value = redisUtils.getString(AppContants.MSG_NOTICE + memberId); + if (StrUtil.isNotBlank(value)) { + responseBean.setType(3); + responseBean.setData(Integer.parseInt(value)); + channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); + + redisUtils.del(AppContants.MSG_NOTICE + memberId); + } } + public void sendMsg(RequestBean requestBean) { + String chatStr = requestBean.getData().toString(); + ChatRequest chat = JSONObject.parseObject(chatStr, ChatRequest.class); + // 判断是否在线 + Channel targetChannel = ChannelManager.findWsChannel(Long.parseLong(chat.getTo())); + if (targetChannel != null) { + targetChannel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(ResponseBean.ok(chat)))); + + chatProducer.sendMsgHistory(chat); + } else { + // 在redis中保存用户未在线时,给该用户发送的消息条数 + String key = AppContants.MSG_NOTICE + chat.getTo(); + String value = redisUtils.getString(key); + if (StrUtil.isEmpty(value)) { + redisUtils.set(key , 1); + } else { + redisUtils.set(key, Integer.parseInt(value) + 1); + } + } + } } diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java index 875461a..7e76d05 100644 --- a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java +++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java @@ -29,7 +29,8 @@ case Contans.AUTH_CHECK: webSocketLogic.authCheck(requestBean); break; -// case Contans.HOME_SYMBOLS: + case Contans.MESSAGE: + webSocketLogic.sendMsg(requestBean); default: break; } diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java new file mode 100644 index 0000000..4902736 --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java @@ -0,0 +1,97 @@ +package com.xcong.excoin.rabbit.consumer; + + +import cn.hutool.core.bean.BeanUtil; +import cn.hutool.core.util.StrUtil; +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.common.contants.AppContants; +import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.modules.otc.dao.OtcMsgHistoryDao; +import com.xcong.excoin.modules.otc.dao.OtcMsgUserListDao; +import com.xcong.excoin.modules.otc.entity.OtcMsgHistoryEntity; +import com.xcong.excoin.modules.otc.entity.OtcMsgUserListEntity; +import com.xcong.excoin.netty.bean.ChatRequest; +import com.xcong.excoin.utils.RedisUtils; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Date; + +@Slf4j +@Component +//@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true") +public class ChatConsumer { + + @Autowired + private OtcMsgUserListDao otcMsgUserListDao; + + @Autowired + private OtcMsgHistoryDao otcMsgHistoryDao; + + @Autowired + private RedisUtils redisUtils; + + @RabbitListener(queues = RabbitMqConfig.QUEUE_MSG_HISTORY) + @Transactional(rollbackFor = Exception.class) + public void msgHistoryConsumer(String content) { + log.info("收到历史消息处理:{}", content); + ChatRequest chat = JSONObject.parseObject(content, ChatRequest.class); + + Long toId = Long.parseLong(chat.getTo()); + Long fromId = Long.parseLong(chat.getFrom()); + + // 发送人是否存在聊天框 + OtcMsgUserListEntity fromList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getTo()), Long.parseLong(chat.getFrom())); + if (fromList == null) { + OtcMsgUserListEntity from = new OtcMsgUserListEntity(); + from.setMemberId(Long.parseLong(chat.getFrom())); + from.setTargetId(Long.parseLong(chat.getTo())); + from.setIsRead(OtcMsgUserListEntity.ISREAD_TWO); + from.setLastMsgTime(new Date()); + otcMsgUserListDao.insert(from); + } + + // 收件人是否存在聊天框 + OtcMsgUserListEntity toList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getFrom()), Long.parseLong(chat.getTo())); + if (toList == null) { + OtcMsgUserListEntity from = new OtcMsgUserListEntity(); + from.setMemberId(Long.parseLong(chat.getTo())); + from.setTargetId(Long.parseLong(chat.getFrom())); + from.setIsRead(OtcMsgUserListEntity.ISREAD_ONE); + from.setLastMsgTime(new Date()); + otcMsgUserListDao.insert(from); + } else { + // 收件人正在聊的用户 + String value = redisUtils.getString(AppContants.MSG_CHATTING + chat.getTo()); + if (StrUtil.isNotBlank(value) && value.equals(chat.getFrom())) { + toList.setLastMsgTime(new Date()); + otcMsgUserListDao.updateById(toList); + } else { + toList.setIsRead(OtcMsgUserListEntity.ISREAD_TWO); + toList.setLastMsgTime(new Date()); + otcMsgUserListDao.updateById(toList); + } + } + + + OtcMsgHistoryEntity toHistory = new OtcMsgHistoryEntity(); + toHistory.setMemberId(toId); + toHistory.setFromMemberId(fromId); + toHistory.setTargetId(toId); + toHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_TWO); + toHistory.setMsgType(chat.getMsgType()); + toHistory.setMsg(chat.getContent()); + + OtcMsgHistoryEntity fromHistory = new OtcMsgHistoryEntity(); + BeanUtil.copyProperties(toHistory, fromHistory); + fromHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_ONE); + fromHistory.setMemberId(fromId); + + otcMsgHistoryDao.insert(fromHistory); + otcMsgHistoryDao.insert(toHistory); + } +} diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java new file mode 100644 index 0000000..d939432 --- /dev/null +++ b/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java @@ -0,0 +1,33 @@ +package com.xcong.excoin.rabbit.producer; + + +import com.alibaba.fastjson.JSONObject; +import com.xcong.excoin.configurations.RabbitMqConfig; +import com.xcong.excoin.netty.bean.ChatRequest; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Slf4j +@Component +public class ChatProducer implements RabbitTemplate.ConfirmCallback { + + @Autowired + private RabbitTemplate rabbitTemplate; + + public void sendMsgHistory(ChatRequest chatRequest) { + CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); + log.info("消息持久化消息: {}, {}", chatRequest, correlationData.getId()); + String str = JSONObject.toJSONString(chatRequest); + rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_KEY_MSG_HISTORY, str, correlationData); + } + + @Override + public void confirm(CorrelationData correlationData, boolean b, String s) { + + } +} diff --git a/src/main/resources/mapper/otc/OtcMsgUserListDao.xml b/src/main/resources/mapper/otc/OtcMsgUserListDao.xml index 446527f..0be3ee3 100644 --- a/src/main/resources/mapper/otc/OtcMsgUserListDao.xml +++ b/src/main/resources/mapper/otc/OtcMsgUserListDao.xml @@ -28,5 +28,10 @@ order by a.create_time desc </select> + <select id="selectChatListByToAndFrom" resultType="com.xcong.excoin.modules.otc.entity.OtcMsgUserListEntity"> + select * from otc_msg_user_list + where member_id=#{from} and target_id=#{to} + </select> + </mapper> \ No newline at end of file -- Gitblit v1.9.1