src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -96,4 +96,8 @@ public static final BigDecimal BASE_MIN_AMOUNT = new BigDecimal(100); public static final String OTC_ORDER_CANCEL_TIMES = "OTC_ORDER_CANCEL_TIMES_"; public static final String MSG_NOTICE = "MSG_NOTICE_"; public static final String MSG_CHATTING = "MSG_CHATTING_"; } src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -128,6 +128,9 @@ public static final String QUEUE_DELAY = "queue.delay"; public static final String EXCHANGE_DELAY = "exchange.delay"; public static final String QUEUE_MSG_HISTORY = "queue_msg_history"; public static final String ROUTING_KEY_MSG_HISTORY = "routing_key_msg_history"; @Resource private ConnectionFactory connectionFactory; @@ -183,6 +186,16 @@ } @Bean public Queue msgHistoryQueue() { return new Queue(QUEUE_MSG_HISTORY); } @Bean public Binding msgHistoryBinding() { return BindingBuilder.bind(msgHistoryQueue()).to(defaultExchange()).with(ROUTING_KEY_MSG_HISTORY); } @Bean public Queue testQueue() { return new Queue(QUEUE_TEST, true); } src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
@@ -14,4 +14,6 @@ IPage<MsgListVo> getMsgList(@Param("record")OtcMsgUserListEntity otcMsgUserListEntity, Page<MsgListVo> page); List<OtcMsgUserListEntity> selectListByMemberIdAndTargetId(@Param("memberId")Long memberId, @Param("targetId")long targetId); OtcMsgUserListEntity selectChatListByToAndFrom(@Param("to") Long to, @Param("from") Long from); } src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,6 +10,9 @@ private static final long serialVersionUID = 1L; /** * 1-鉴权 2-发送消息 3-新消息提醒 */ private Integer type; private Integer status; @@ -45,4 +48,11 @@ res.setStatus(0); return res; } public static ResponseBean ok(Object data) { ResponseBean res = new ResponseBean(); res.setStatus(1); res.setData(data); return res; } } src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -56,6 +56,10 @@ public static Channel findWsChannel(Long id){ ChannelId channelId = MEMBER_CHANNEL.get(id.toString()); if (channelId == null) { return null; } return WEBSOCKET_GROUP.find(channelId); } src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -34,7 +34,9 @@ // 判断当前通道用户是否已经登陆 if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) { ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123")); ResponseBean res = ResponseBean.fail(); res.setType(requestBean.getType()); ctx.channel().writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(res))); return; } @@ -42,7 +44,7 @@ requestBean.setChannelId(ctx.channel().id().asShortText()); msgLogic.webSocketMsgLogic(requestBean); } catch (Exception e) { log.info("#websocket json error:{}#", e); log.info("#websocket json error:#", e); ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail())); } } src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -78,6 +78,7 @@ content = ((TextWebSocketFrame) frame).text(); if (content.contains(Contans.HEART_BEAT)) { resetTimes(ctx.channel()); ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); } else { SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content); } @@ -90,11 +91,27 @@ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { log.info("[触发器触发]"); // log.info("[触发器触发]"); if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { Integer times = pingTimes.get(ctx.channel().id().asShortText()); if (times == null) { times = 0; } /*读超时*/ // log.info("===服务端===({}读超时, {})", ctx.channel().id().asShortText(), times); // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连 if (times >= MAX_UN_REC_PING_TIMES) { log.info("===服务端===(读超时,关闭chanel)"); // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连 ctx.channel().close(); } else { // 失败计数器加1 times++; pingTimes.remove(ctx.channel().id().asShortText()); pingTimes.put(ctx.channel().id().asShortText(), times); } } else if (event.state() == IdleState.WRITER_IDLE) { /*写超时*/ ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT)); src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,9 +19,6 @@ @Component public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> { // @Autowired // private WebSocketServerHandler webSocketServerHandler; @Override protected void initChannel(NioSocketChannel ch) throws Exception { ChannelPipeline cp = ch.pipeline(); @@ -32,7 +29,7 @@ cp.addLast(new HttpObjectAggregator(65536)); cp.addLast(new ChunkedWriteHandler()); // 心跳 // ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)); ch.pipeline().addLast(new IdleStateHandler(10, 0, 0)); // 自定义业务handler cp.addLast(new WebSocketServerHandler()); } src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -1,11 +1,21 @@ package com.xcong.excoin.netty.logic; import cn.hutool.core.util.StrUtil; import cn.hutool.crypto.asymmetric.KeyType; import cn.hutool.crypto.asymmetric.RSA; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.configurations.properties.SecurityProperties; import com.xcong.excoin.netty.bean.ChatRequest; import com.xcong.excoin.netty.bean.RequestBean; import com.xcong.excoin.netty.bean.ResponseBean; import com.xcong.excoin.netty.common.ChannelManager; import com.xcong.excoin.netty.common.NettyTools; import com.xcong.excoin.rabbit.producer.ChatProducer; import com.xcong.excoin.utils.RedisUtils; import io.netty.channel.Channel; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -14,19 +24,64 @@ * @email wangdoubleone@gmail.com * @date 2019-05-09 */ @Slf4j @Component public class WebSocketLogic { @Autowired private RedisUtils redisUtils; @Autowired private ChatProducer chatProducer; @Autowired private SecurityProperties securityProperties; public void authCheck(RequestBean requestBean) { Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId()); ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString())); ResponseBean responseBean = new ResponseBean(); responseBean.setType(requestBean.getType()); responseBean.setStatus(1); // String bearerToken = requestBean.getData().toString(); // String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, ""); // RSA rsa = new RSA(securityProperties.getPrivateKey(), null); // String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_"); // // Long memberId = Long.parseLong(tokens[0]); Long memberId = Long.parseLong(requestBean.getData().toString()); channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); ChannelManager.addWsChannel(channel, memberId); String value = redisUtils.getString(AppContants.MSG_NOTICE + memberId); if (StrUtil.isNotBlank(value)) { responseBean.setType(3); responseBean.setData(Integer.parseInt(value)); channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean))); redisUtils.del(AppContants.MSG_NOTICE + memberId); } } public void sendMsg(RequestBean requestBean) { String chatStr = requestBean.getData().toString(); ChatRequest chat = JSONObject.parseObject(chatStr, ChatRequest.class); // 判断是否在线 Channel targetChannel = ChannelManager.findWsChannel(Long.parseLong(chat.getTo())); if (targetChannel != null) { targetChannel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(ResponseBean.ok(chat)))); chatProducer.sendMsgHistory(chat); } else { // 在redis中保存用户未在线时,给该用户发送的消息条数 String key = AppContants.MSG_NOTICE + chat.getTo(); String value = redisUtils.getString(key); if (StrUtil.isEmpty(value)) { redisUtils.set(key , 1); } else { redisUtils.set(key, Integer.parseInt(value) + 1); } } } } src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -29,7 +29,8 @@ case Contans.AUTH_CHECK: webSocketLogic.authCheck(requestBean); break; // case Contans.HOME_SYMBOLS: case Contans.MESSAGE: webSocketLogic.sendMsg(requestBean); default: break; } src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java
@@ -1,7 +1,97 @@ package com.xcong.excoin.rabbit.consumer;/** * * @author wzy * @date 2021-05-27 **/ package com.xcong.excoin.rabbit.consumer; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.util.StrUtil; import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.common.contants.AppContants; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.modules.otc.dao.OtcMsgHistoryDao; import com.xcong.excoin.modules.otc.dao.OtcMsgUserListDao; import com.xcong.excoin.modules.otc.entity.OtcMsgHistoryEntity; import com.xcong.excoin.modules.otc.entity.OtcMsgUserListEntity; import com.xcong.excoin.netty.bean.ChatRequest; import com.xcong.excoin.utils.RedisUtils; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import java.util.Date; @Slf4j @Component //@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true") public class ChatConsumer { @Autowired private OtcMsgUserListDao otcMsgUserListDao; @Autowired private OtcMsgHistoryDao otcMsgHistoryDao; @Autowired private RedisUtils redisUtils; @RabbitListener(queues = RabbitMqConfig.QUEUE_MSG_HISTORY) @Transactional(rollbackFor = Exception.class) public void msgHistoryConsumer(String content) { log.info("收到历史消息处理:{}", content); ChatRequest chat = JSONObject.parseObject(content, ChatRequest.class); Long toId = Long.parseLong(chat.getTo()); Long fromId = Long.parseLong(chat.getFrom()); // 发送人是否存在聊天框 OtcMsgUserListEntity fromList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getTo()), Long.parseLong(chat.getFrom())); if (fromList == null) { OtcMsgUserListEntity from = new OtcMsgUserListEntity(); from.setMemberId(Long.parseLong(chat.getFrom())); from.setTargetId(Long.parseLong(chat.getTo())); from.setIsRead(OtcMsgUserListEntity.ISREAD_TWO); from.setLastMsgTime(new Date()); otcMsgUserListDao.insert(from); } // 收件人是否存在聊天框 OtcMsgUserListEntity toList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getFrom()), Long.parseLong(chat.getTo())); if (toList == null) { OtcMsgUserListEntity from = new OtcMsgUserListEntity(); from.setMemberId(Long.parseLong(chat.getTo())); from.setTargetId(Long.parseLong(chat.getFrom())); from.setIsRead(OtcMsgUserListEntity.ISREAD_ONE); from.setLastMsgTime(new Date()); otcMsgUserListDao.insert(from); } else { // 收件人正在聊的用户 String value = redisUtils.getString(AppContants.MSG_CHATTING + chat.getTo()); if (StrUtil.isNotBlank(value) && value.equals(chat.getFrom())) { toList.setLastMsgTime(new Date()); otcMsgUserListDao.updateById(toList); } else { toList.setIsRead(OtcMsgUserListEntity.ISREAD_TWO); toList.setLastMsgTime(new Date()); otcMsgUserListDao.updateById(toList); } } OtcMsgHistoryEntity toHistory = new OtcMsgHistoryEntity(); toHistory.setMemberId(toId); toHistory.setFromMemberId(fromId); toHistory.setTargetId(toId); toHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_TWO); toHistory.setMsgType(chat.getMsgType()); toHistory.setMsg(chat.getContent()); OtcMsgHistoryEntity fromHistory = new OtcMsgHistoryEntity(); BeanUtil.copyProperties(toHistory, fromHistory); fromHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_ONE); fromHistory.setMemberId(fromId); otcMsgHistoryDao.insert(fromHistory); otcMsgHistoryDao.insert(toHistory); } } src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java
@@ -1,5 +1,33 @@ package com.xcong.excoin.rabbit.producer; public class ChatProducer { import com.alibaba.fastjson.JSONObject; import com.xcong.excoin.configurations.RabbitMqConfig; import com.xcong.excoin.netty.bean.ChatRequest; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Slf4j @Component public class ChatProducer implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; public void sendMsgHistory(ChatRequest chatRequest) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); log.info("消息持久化消息: {}, {}", chatRequest, correlationData.getId()); String str = JSONObject.toJSONString(chatRequest); rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_KEY_MSG_HISTORY, str, correlationData); } @Override public void confirm(CorrelationData correlationData, boolean b, String s) { } }