From 61d5fd4ace2f9b6455dcf13df54a7ba7fa2baf36 Mon Sep 17 00:00:00 2001
From: xiaoyong931011 <15274802129@163.com>
Date: Thu, 27 May 2021 16:25:32 +0800
Subject: [PATCH] Merge branch 'otc' of http://120.27.238.55:7000/r/exchange into otc

---
 src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java                  |   72 +++++++++++++
 src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java                     |   10 ++
 src/main/java/com/xcong/excoin/netty/common/ChannelManager.java                 |    4 
 src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java               |   13 ++
 src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java                  |    6 
 src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java           |    2 
 src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java |    5 
 src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java               |    3 
 src/main/java/com/xcong/excoin/common/contants/AppContants.java                 |    4 
 src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java                |   97 +++++++++++++++++++
 src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java        |   21 +++
 src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java                |   33 ++++++
 src/main/resources/mapper/otc/OtcMsgUserListDao.xml                             |    5 +
 13 files changed, 263 insertions(+), 12 deletions(-)

diff --git a/src/main/java/com/xcong/excoin/common/contants/AppContants.java b/src/main/java/com/xcong/excoin/common/contants/AppContants.java
index dea3d1a..1848cfe 100644
--- a/src/main/java/com/xcong/excoin/common/contants/AppContants.java
+++ b/src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -96,4 +96,8 @@
     public static final BigDecimal BASE_MIN_AMOUNT = new BigDecimal(100);
 
     public static final String OTC_ORDER_CANCEL_TIMES = "OTC_ORDER_CANCEL_TIMES_";
+
+    public static final String MSG_NOTICE = "MSG_NOTICE_";
+
+    public static final String MSG_CHATTING = "MSG_CHATTING_";
 }
diff --git a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
index 0511ab1..00dcc4d 100644
--- a/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
+++ b/src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -128,6 +128,9 @@
     public static final String QUEUE_DELAY = "queue.delay";
     public static final String EXCHANGE_DELAY = "exchange.delay";
 
+    public static final String QUEUE_MSG_HISTORY = "queue_msg_history";
+    public static final String ROUTING_KEY_MSG_HISTORY = "routing_key_msg_history";
+
     @Resource
     private ConnectionFactory connectionFactory;
 
@@ -183,6 +186,16 @@
     }
 
     @Bean
+    public Queue msgHistoryQueue() {
+        return new Queue(QUEUE_MSG_HISTORY);
+    }
+
+    @Bean
+    public Binding msgHistoryBinding() {
+        return BindingBuilder.bind(msgHistoryQueue()).to(defaultExchange()).with(ROUTING_KEY_MSG_HISTORY);
+    }
+
+    @Bean
     public Queue testQueue() {
         return new Queue(QUEUE_TEST, true);
     }
diff --git a/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java b/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
index f902b60..3c268df 100644
--- a/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
+++ b/src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
@@ -14,4 +14,6 @@
     IPage<MsgListVo> getMsgList(@Param("record")OtcMsgUserListEntity otcMsgUserListEntity, Page<MsgListVo> page);
 
     List<OtcMsgUserListEntity> selectListByMemberIdAndTargetId(@Param("memberId")Long memberId, @Param("targetId")long targetId);
+
+    OtcMsgUserListEntity selectChatListByToAndFrom(@Param("to") Long to, @Param("from") Long from);
 }
diff --git a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
index a5e63a7..68dbd7f 100644
--- a/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
+++ b/src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,6 +10,9 @@
 
     private static final long serialVersionUID = 1L;
 
+    /**
+     *  1-鉴权 2-发送消息 3-新消息提醒
+     */
     private Integer type;
 
     private Integer status;
@@ -45,4 +48,11 @@
         res.setStatus(0);
         return res;
     }
+
+    public static ResponseBean ok(Object data) {
+        ResponseBean res = new ResponseBean();
+        res.setStatus(1);
+        res.setData(data);
+        return res;
+    }
 }
diff --git a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
index 04163d6..6e16771 100644
--- a/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
+++ b/src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -56,6 +56,10 @@
 
     public static Channel findWsChannel(Long id){
         ChannelId channelId = MEMBER_CHANNEL.get(id.toString());
+        if (channelId == null) {
+            return null;
+        }
+
         return WEBSOCKET_GROUP.find(channelId);
     }
 
diff --git a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
index 9742e1a..2b4be56 100644
--- a/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
+++ b/src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -34,7 +34,9 @@
 
         // 判断当前通道用户是否已经登陆
         if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) {
-            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123"));
+            ResponseBean res = ResponseBean.fail();
+            res.setType(requestBean.getType());
+            ctx.channel().writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(res)));
             return;
         }
 
@@ -42,7 +44,7 @@
             requestBean.setChannelId(ctx.channel().id().asShortText());
             msgLogic.webSocketMsgLogic(requestBean);
         } catch (Exception e) {
-            log.info("#websocket json error:{}#", e);
+            log.info("#websocket json error:#", e);
             ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail()));
         }
     }
diff --git a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
index 7b5cd3d..6a70bc2 100644
--- a/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
+++ b/src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -78,6 +78,7 @@
                 content = ((TextWebSocketFrame) frame).text();
                 if (content.contains(Contans.HEART_BEAT)) {
                     resetTimes(ctx.channel());
+                    ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
                 } else {
                     SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content);
                 }
@@ -90,11 +91,27 @@
 
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-        log.info("[触发器触发]");
+//        log.info("[触发器触发]");
         if (evt instanceof IdleStateEvent) {
             IdleStateEvent event = (IdleStateEvent) evt;
             if (event.state() == IdleState.READER_IDLE) {
-
+                Integer times = pingTimes.get(ctx.channel().id().asShortText());
+                if (times == null) {
+                    times = 0;
+                }
+                /*读超时*/
+//                log.info("===服务端===({}读超时, {})", ctx.channel().id().asShortText(), times);
+                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
+                if (times >= MAX_UN_REC_PING_TIMES) {
+                    log.info("===服务端===(读超时,关闭chanel)");
+                    // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
+                    ctx.channel().close();
+                } else {
+                    // 失败计数器加1
+                    times++;
+                    pingTimes.remove(ctx.channel().id().asShortText());
+                    pingTimes.put(ctx.channel().id().asShortText(), times);
+                }
             } else if (event.state() == IdleState.WRITER_IDLE) {
                 /*写超时*/
                 ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
diff --git a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
index c202638..25f3130 100644
--- a/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
+++ b/src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,9 +19,6 @@
 @Component
 public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
 
-//    @Autowired
-//    private WebSocketServerHandler webSocketServerHandler;
-
     @Override
     protected void initChannel(NioSocketChannel ch) throws Exception {
         ChannelPipeline cp = ch.pipeline();
@@ -32,7 +29,7 @@
         cp.addLast(new HttpObjectAggregator(65536));
         cp.addLast(new ChunkedWriteHandler());
         // 心跳
-//        ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
+        ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
         // 自定义业务handler
         cp.addLast(new WebSocketServerHandler());
     }
diff --git a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
index 16b6f52..44751e1 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -1,11 +1,22 @@
 package com.xcong.excoin.netty.logic;
 
+import cn.hutool.core.util.StrUtil;
+import cn.hutool.crypto.asymmetric.KeyType;
+import cn.hutool.crypto.asymmetric.RSA;
 import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.configurations.properties.SecurityProperties;
+import com.xcong.excoin.modules.member.entity.MemberEntity;
+import com.xcong.excoin.netty.bean.ChatRequest;
 import com.xcong.excoin.netty.bean.RequestBean;
 import com.xcong.excoin.netty.bean.ResponseBean;
 import com.xcong.excoin.netty.common.ChannelManager;
 import com.xcong.excoin.netty.common.NettyTools;
+import com.xcong.excoin.rabbit.producer.ChatProducer;
+import com.xcong.excoin.utils.RedisUtils;
 import io.netty.channel.Channel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 
@@ -14,19 +25,74 @@
  * @email wangdoubleone@gmail.com
  * @date 2019-05-09
  */
+@Slf4j
 @Component
 public class WebSocketLogic {
 
+    @Autowired
+    private RedisUtils redisUtils;
+    @Autowired
+    private ChatProducer chatProducer;
+    @Autowired
+    private SecurityProperties securityProperties;
+
     public void authCheck(RequestBean requestBean) {
         Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
-
-        ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString()));
-
         ResponseBean responseBean = new ResponseBean();
         responseBean.setType(requestBean.getType());
         responseBean.setStatus(1);
+
+//        String bearerToken = requestBean.getData().toString();
+//        String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, "");
+//        RSA rsa = new RSA(securityProperties.getPrivateKey(), null);
+//        String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_");
+//
+//        Long memberId = Long.parseLong(tokens[0]);
+
+        String token = requestBean.getData().toString();
+        String redisKey = AppContants.APP_LOGIN_PREFIX + token;
+        String loginStr = redisUtils.getString(redisKey);
+        if (StrUtil.isBlank(loginStr)) {
+            ResponseBean res = ResponseBean.fail();
+            res.setType(requestBean.getType());
+            channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(res)));
+            return;
+        }
+
+        MemberEntity loginUser = JSONObject.parseObject(loginStr, MemberEntity.class);
+        Long memberId = loginUser.getId();
+
         channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+        ChannelManager.addWsChannel(channel, memberId);
+        String value = redisUtils.getString(AppContants.MSG_NOTICE + memberId);
+        if (StrUtil.isNotBlank(value)) {
+            responseBean.setType(3);
+            responseBean.setData(Integer.parseInt(value));
+            channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
+
+            redisUtils.del(AppContants.MSG_NOTICE + memberId);
+        }
     }
 
+    public void sendMsg(RequestBean requestBean) {
+        String chatStr = requestBean.getData().toString();
+        ChatRequest chat = JSONObject.parseObject(chatStr, ChatRequest.class);
 
+        // 判断是否在线
+        Channel targetChannel = ChannelManager.findWsChannel(Long.parseLong(chat.getTo()));
+        if (targetChannel != null) {
+            targetChannel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(ResponseBean.ok(chat))));
+
+            chatProducer.sendMsgHistory(chat);
+        } else {
+            // 在redis中保存用户未在线时,给该用户发送的消息条数
+            String key = AppContants.MSG_NOTICE + chat.getTo();
+            String value = redisUtils.getString(key);
+            if (StrUtil.isEmpty(value)) {
+                redisUtils.set(key , 1);
+            } else {
+                redisUtils.set(key, Integer.parseInt(value) + 1);
+            }
+        }
+    }
 }
diff --git a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
index 875461a..7e76d05 100644
--- a/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
+++ b/src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -29,7 +29,8 @@
             case Contans.AUTH_CHECK:
                 webSocketLogic.authCheck(requestBean);
                 break;
-//            case Contans.HOME_SYMBOLS:
+            case Contans.MESSAGE:
+                webSocketLogic.sendMsg(requestBean);
             default:
                 break;
         }
diff --git a/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java b/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java
new file mode 100644
index 0000000..4902736
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java
@@ -0,0 +1,97 @@
+package com.xcong.excoin.rabbit.consumer;
+
+
+import cn.hutool.core.bean.BeanUtil;
+import cn.hutool.core.util.StrUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.common.contants.AppContants;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.modules.otc.dao.OtcMsgHistoryDao;
+import com.xcong.excoin.modules.otc.dao.OtcMsgUserListDao;
+import com.xcong.excoin.modules.otc.entity.OtcMsgHistoryEntity;
+import com.xcong.excoin.modules.otc.entity.OtcMsgUserListEntity;
+import com.xcong.excoin.netty.bean.ChatRequest;
+import com.xcong.excoin.utils.RedisUtils;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Date;
+
+@Slf4j
+@Component
+//@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true")
+public class ChatConsumer {
+
+    @Autowired
+    private OtcMsgUserListDao otcMsgUserListDao;
+
+    @Autowired
+    private OtcMsgHistoryDao otcMsgHistoryDao;
+
+    @Autowired
+    private RedisUtils redisUtils;
+
+    @RabbitListener(queues = RabbitMqConfig.QUEUE_MSG_HISTORY)
+    @Transactional(rollbackFor = Exception.class)
+    public void msgHistoryConsumer(String content) {
+        log.info("收到历史消息处理:{}", content);
+        ChatRequest chat = JSONObject.parseObject(content, ChatRequest.class);
+
+        Long toId = Long.parseLong(chat.getTo());
+        Long fromId = Long.parseLong(chat.getFrom());
+
+        // 发送人是否存在聊天框
+        OtcMsgUserListEntity fromList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getTo()), Long.parseLong(chat.getFrom()));
+        if (fromList == null) {
+            OtcMsgUserListEntity from = new OtcMsgUserListEntity();
+            from.setMemberId(Long.parseLong(chat.getFrom()));
+            from.setTargetId(Long.parseLong(chat.getTo()));
+            from.setIsRead(OtcMsgUserListEntity.ISREAD_TWO);
+            from.setLastMsgTime(new Date());
+            otcMsgUserListDao.insert(from);
+        }
+
+        // 收件人是否存在聊天框
+        OtcMsgUserListEntity toList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getFrom()), Long.parseLong(chat.getTo()));
+        if (toList == null) {
+            OtcMsgUserListEntity from = new OtcMsgUserListEntity();
+            from.setMemberId(Long.parseLong(chat.getTo()));
+            from.setTargetId(Long.parseLong(chat.getFrom()));
+            from.setIsRead(OtcMsgUserListEntity.ISREAD_ONE);
+            from.setLastMsgTime(new Date());
+            otcMsgUserListDao.insert(from);
+        } else {
+            // 收件人正在聊的用户
+            String value = redisUtils.getString(AppContants.MSG_CHATTING + chat.getTo());
+            if (StrUtil.isNotBlank(value) && value.equals(chat.getFrom())) {
+                toList.setLastMsgTime(new Date());
+                otcMsgUserListDao.updateById(toList);
+            } else {
+                toList.setIsRead(OtcMsgUserListEntity.ISREAD_TWO);
+                toList.setLastMsgTime(new Date());
+                otcMsgUserListDao.updateById(toList);
+            }
+        }
+
+
+        OtcMsgHistoryEntity toHistory = new OtcMsgHistoryEntity();
+        toHistory.setMemberId(toId);
+        toHistory.setFromMemberId(fromId);
+        toHistory.setTargetId(toId);
+        toHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_TWO);
+        toHistory.setMsgType(chat.getMsgType());
+        toHistory.setMsg(chat.getContent());
+
+        OtcMsgHistoryEntity fromHistory = new OtcMsgHistoryEntity();
+        BeanUtil.copyProperties(toHistory, fromHistory);
+        fromHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_ONE);
+        fromHistory.setMemberId(fromId);
+
+        otcMsgHistoryDao.insert(fromHistory);
+        otcMsgHistoryDao.insert(toHistory);
+    }
+}
diff --git a/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java b/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java
new file mode 100644
index 0000000..d939432
--- /dev/null
+++ b/src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java
@@ -0,0 +1,33 @@
+package com.xcong.excoin.rabbit.producer;
+
+
+import com.alibaba.fastjson.JSONObject;
+import com.xcong.excoin.configurations.RabbitMqConfig;
+import com.xcong.excoin.netty.bean.ChatRequest;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.connection.CorrelationData;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+@Slf4j
+@Component
+public class ChatProducer implements RabbitTemplate.ConfirmCallback {
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    public void sendMsgHistory(ChatRequest chatRequest) {
+        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
+        log.info("消息持久化消息: {}, {}", chatRequest, correlationData.getId());
+        String str = JSONObject.toJSONString(chatRequest);
+        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_KEY_MSG_HISTORY, str, correlationData);
+    }
+
+    @Override
+    public void confirm(CorrelationData correlationData, boolean b, String s) {
+
+    }
+}
diff --git a/src/main/resources/mapper/otc/OtcMsgUserListDao.xml b/src/main/resources/mapper/otc/OtcMsgUserListDao.xml
index 446527f..0be3ee3 100644
--- a/src/main/resources/mapper/otc/OtcMsgUserListDao.xml
+++ b/src/main/resources/mapper/otc/OtcMsgUserListDao.xml
@@ -28,5 +28,10 @@
         order by a.create_time desc
     </select>
 
+    <select id="selectChatListByToAndFrom" resultType="com.xcong.excoin.modules.otc.entity.OtcMsgUserListEntity">
+        select * from otc_msg_user_list
+        where member_id=#{from} and target_id=#{to}
+    </select>
+
 
 </mapper>
\ No newline at end of file

--
Gitblit v1.9.1