Helius
2021-05-27 182c01b109ffb3b4be248c2e128835ace70eeae8
add websocket
12 files modified
259 ■■■■■ changed files
src/main/java/com/xcong/excoin/common/contants/AppContants.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java 13 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java 2 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java 10 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java 4 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java 21 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java 5 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java 61 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java 3 ●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java 100 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java 30 ●●●●● patch | view | raw | blame | history
src/main/java/com/xcong/excoin/common/contants/AppContants.java
@@ -96,4 +96,8 @@
    public static final BigDecimal BASE_MIN_AMOUNT = new BigDecimal(100);
    public static final String OTC_ORDER_CANCEL_TIMES = "OTC_ORDER_CANCEL_TIMES_";
    public static final String MSG_NOTICE = "MSG_NOTICE_";
    public static final String MSG_CHATTING = "MSG_CHATTING_";
}
src/main/java/com/xcong/excoin/configurations/RabbitMqConfig.java
@@ -128,6 +128,9 @@
    public static final String QUEUE_DELAY = "queue.delay";
    public static final String EXCHANGE_DELAY = "exchange.delay";
    public static final String QUEUE_MSG_HISTORY = "queue_msg_history";
    public static final String ROUTING_KEY_MSG_HISTORY = "routing_key_msg_history";
    @Resource
    private ConnectionFactory connectionFactory;
@@ -183,6 +186,16 @@
    }
    @Bean
    public Queue msgHistoryQueue() {
        return new Queue(QUEUE_MSG_HISTORY);
    }
    @Bean
    public Binding msgHistoryBinding() {
        return BindingBuilder.bind(msgHistoryQueue()).to(defaultExchange()).with(ROUTING_KEY_MSG_HISTORY);
    }
    @Bean
    public Queue testQueue() {
        return new Queue(QUEUE_TEST, true);
    }
src/main/java/com/xcong/excoin/modules/otc/dao/OtcMsgUserListDao.java
@@ -14,4 +14,6 @@
    IPage<MsgListVo> getMsgList(@Param("record")OtcMsgUserListEntity otcMsgUserListEntity, Page<MsgListVo> page);
    List<OtcMsgUserListEntity> selectListByMemberIdAndTargetId(@Param("memberId")Long memberId, @Param("targetId")long targetId);
    OtcMsgUserListEntity selectChatListByToAndFrom(@Param("to") Long to, @Param("from") Long from);
}
src/main/java/com/xcong/excoin/netty/bean/ResponseBean.java
@@ -10,6 +10,9 @@
    private static final long serialVersionUID = 1L;
    /**
     *  1-鉴权 2-发送消息 3-新消息提醒
     */
    private Integer type;
    private Integer status;
@@ -45,4 +48,11 @@
        res.setStatus(0);
        return res;
    }
    public static ResponseBean ok(Object data) {
        ResponseBean res = new ResponseBean();
        res.setStatus(1);
        res.setData(data);
        return res;
    }
}
src/main/java/com/xcong/excoin/netty/common/ChannelManager.java
@@ -56,6 +56,10 @@
    public static Channel findWsChannel(Long id){
        ChannelId channelId = MEMBER_CHANNEL.get(id.toString());
        if (channelId == null) {
            return null;
        }
        return WEBSOCKET_GROUP.find(channelId);
    }
src/main/java/com/xcong/excoin/netty/dispatch/MsgDispatch.java
@@ -34,7 +34,9 @@
        // 判断当前通道用户是否已经登陆
        if (StrUtil.isEmpty(ChannelManager.findWsMemberId(ctx.channel())) && requestBean.getType() != Contans.AUTH_CHECK) {
            ctx.channel().writeAndFlush(NettyTools.webSocketBytes("123"));
            ResponseBean res = ResponseBean.fail();
            res.setType(requestBean.getType());
            ctx.channel().writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(res)));
            return;
        }
@@ -42,7 +44,7 @@
            requestBean.setChannelId(ctx.channel().id().asShortText());
            msgLogic.webSocketMsgLogic(requestBean);
        } catch (Exception e) {
            log.info("#websocket json error:{}#", e);
            log.info("#websocket json error:#", e);
            ctx.channel().writeAndFlush(NettyTools.wsSendMsg(ResponseBean.fail()));
        }
    }
src/main/java/com/xcong/excoin/netty/handler/WebSocketServerHandler.java
@@ -78,6 +78,7 @@
                content = ((TextWebSocketFrame) frame).text();
                if (content.contains(Contans.HEART_BEAT)) {
                    resetTimes(ctx.channel());
                    ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
                } else {
                    SpringContextHolder.getBean(MsgDispatch.class).webSocketDispatch(ctx, content);
                }
@@ -90,11 +91,27 @@
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        log.info("[触发器触发]");
//        log.info("[触发器触发]");
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                Integer times = pingTimes.get(ctx.channel().id().asShortText());
                if (times == null) {
                    times = 0;
                }
                /*读超时*/
//                log.info("===服务端===({}读超时, {})", ctx.channel().id().asShortText(), times);
                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
                if (times >= MAX_UN_REC_PING_TIMES) {
                    log.info("===服务端===(读超时,关闭chanel)");
                    // 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
                    ctx.channel().close();
                } else {
                    // 失败计数器加1
                    times++;
                    pingTimes.remove(ctx.channel().id().asShortText());
                    pingTimes.put(ctx.channel().id().asShortText(), times);
                }
            } else if (event.state() == IdleState.WRITER_IDLE) {
                /*写超时*/
                ctx.channel().writeAndFlush(NettyTools.webSocketBytes(Contans.HEART_BEAT));
src/main/java/com/xcong/excoin/netty/initalizer/WebSocketServerInitializer.java
@@ -19,9 +19,6 @@
@Component
public class WebSocketServerInitializer extends ChannelInitializer<NioSocketChannel> {
//    @Autowired
//    private WebSocketServerHandler webSocketServerHandler;
    @Override
    protected void initChannel(NioSocketChannel ch) throws Exception {
        ChannelPipeline cp = ch.pipeline();
@@ -32,7 +29,7 @@
        cp.addLast(new HttpObjectAggregator(65536));
        cp.addLast(new ChunkedWriteHandler());
        // 心跳
//        ch.pipeline().addLast(new IdleStateHandler(0, 10, 0));
        ch.pipeline().addLast(new IdleStateHandler(10, 0, 0));
        // 自定义业务handler
        cp.addLast(new WebSocketServerHandler());
    }
src/main/java/com/xcong/excoin/netty/logic/WebSocketLogic.java
@@ -1,11 +1,21 @@
package com.xcong.excoin.netty.logic;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.asymmetric.KeyType;
import cn.hutool.crypto.asymmetric.RSA;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.configurations.properties.SecurityProperties;
import com.xcong.excoin.netty.bean.ChatRequest;
import com.xcong.excoin.netty.bean.RequestBean;
import com.xcong.excoin.netty.bean.ResponseBean;
import com.xcong.excoin.netty.common.ChannelManager;
import com.xcong.excoin.netty.common.NettyTools;
import com.xcong.excoin.rabbit.producer.ChatProducer;
import com.xcong.excoin.utils.RedisUtils;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -14,19 +24,64 @@
 * @email wangdoubleone@gmail.com
 * @date 2019-05-09
 */
@Slf4j
@Component
public class WebSocketLogic {
    @Autowired
    private RedisUtils redisUtils;
    @Autowired
    private ChatProducer chatProducer;
    @Autowired
    private SecurityProperties securityProperties;
    public void authCheck(RequestBean requestBean) {
        Channel channel = ChannelManager.findWebSocketChannel(requestBean.getChannelId());
        ChannelManager.addWsChannel(channel, Long.parseLong(requestBean.getData().toString()));
        ResponseBean responseBean = new ResponseBean();
        responseBean.setType(requestBean.getType());
        responseBean.setStatus(1);
//        String bearerToken = requestBean.getData().toString();
//        String rsaToken = bearerToken.replace(AppContants.TOKEN_START_WITH, "");
//        RSA rsa = new RSA(securityProperties.getPrivateKey(), null);
//        String[] tokens = StrUtil.split(rsa.decryptStr(rsaToken, KeyType.PrivateKey), "_");
//
//        Long memberId = Long.parseLong(tokens[0]);
        Long memberId = Long.parseLong(requestBean.getData().toString());
        channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
        ChannelManager.addWsChannel(channel, memberId);
        String value = redisUtils.getString(AppContants.MSG_NOTICE + memberId);
        if (StrUtil.isNotBlank(value)) {
            responseBean.setType(3);
            responseBean.setData(Integer.parseInt(value));
            channel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(responseBean)));
            redisUtils.del(AppContants.MSG_NOTICE + memberId);
        }
    }
    public void sendMsg(RequestBean requestBean) {
        String chatStr = requestBean.getData().toString();
        ChatRequest chat = JSONObject.parseObject(chatStr, ChatRequest.class);
        // 判断是否在线
        Channel targetChannel = ChannelManager.findWsChannel(Long.parseLong(chat.getTo()));
        if (targetChannel != null) {
            targetChannel.writeAndFlush(NettyTools.webSocketBytes(JSONObject.toJSONString(ResponseBean.ok(chat))));
            chatProducer.sendMsgHistory(chat);
        } else {
            // 在redis中保存用户未在线时,给该用户发送的消息条数
            String key = AppContants.MSG_NOTICE + chat.getTo();
            String value = redisUtils.getString(key);
            if (StrUtil.isEmpty(value)) {
                redisUtils.set(key , 1);
            } else {
                redisUtils.set(key, Integer.parseInt(value) + 1);
            }
        }
    }
}
src/main/java/com/xcong/excoin/netty/logic/impl/MsgLogicImpl.java
@@ -29,7 +29,8 @@
            case Contans.AUTH_CHECK:
                webSocketLogic.authCheck(requestBean);
                break;
//            case Contans.HOME_SYMBOLS:
            case Contans.MESSAGE:
                webSocketLogic.sendMsg(requestBean);
            default:
                break;
        }
src/main/java/com/xcong/excoin/rabbit/consumer/ChatConsumer.java
@@ -1,7 +1,97 @@
package com.xcong.excoin.rabbit.consumer;/**
*
* @author wzy
* @date 2021-05-27
**/
package com.xcong.excoin.rabbit.consumer;
import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.common.contants.AppContants;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.modules.otc.dao.OtcMsgHistoryDao;
import com.xcong.excoin.modules.otc.dao.OtcMsgUserListDao;
import com.xcong.excoin.modules.otc.entity.OtcMsgHistoryEntity;
import com.xcong.excoin.modules.otc.entity.OtcMsgUserListEntity;
import com.xcong.excoin.netty.bean.ChatRequest;
import com.xcong.excoin.utils.RedisUtils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.util.Date;
@Slf4j
@Component
//@ConditionalOnProperty(prefix = "app", name = "rabbit-consumer", havingValue = "true")
public class ChatConsumer {
    @Autowired
    private OtcMsgUserListDao otcMsgUserListDao;
    @Autowired
    private OtcMsgHistoryDao otcMsgHistoryDao;
    @Autowired
    private RedisUtils redisUtils;
    @RabbitListener(queues = RabbitMqConfig.QUEUE_MSG_HISTORY)
    @Transactional(rollbackFor = Exception.class)
    public void msgHistoryConsumer(String content) {
        log.info("收到历史消息处理:{}", content);
        ChatRequest chat = JSONObject.parseObject(content, ChatRequest.class);
        Long toId = Long.parseLong(chat.getTo());
        Long fromId = Long.parseLong(chat.getFrom());
        // 发送人是否存在聊天框
        OtcMsgUserListEntity fromList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getTo()), Long.parseLong(chat.getFrom()));
        if (fromList == null) {
            OtcMsgUserListEntity from = new OtcMsgUserListEntity();
            from.setMemberId(Long.parseLong(chat.getFrom()));
            from.setTargetId(Long.parseLong(chat.getTo()));
            from.setIsRead(OtcMsgUserListEntity.ISREAD_TWO);
            from.setLastMsgTime(new Date());
            otcMsgUserListDao.insert(from);
        }
        // 收件人是否存在聊天框
        OtcMsgUserListEntity toList = otcMsgUserListDao.selectChatListByToAndFrom(Long.parseLong(chat.getFrom()), Long.parseLong(chat.getTo()));
        if (toList == null) {
            OtcMsgUserListEntity from = new OtcMsgUserListEntity();
            from.setMemberId(Long.parseLong(chat.getTo()));
            from.setTargetId(Long.parseLong(chat.getFrom()));
            from.setIsRead(OtcMsgUserListEntity.ISREAD_ONE);
            from.setLastMsgTime(new Date());
            otcMsgUserListDao.insert(from);
        } else {
            // 收件人正在聊的用户
            String value = redisUtils.getString(AppContants.MSG_CHATTING + chat.getTo());
            if (StrUtil.isNotBlank(value) && value.equals(chat.getFrom())) {
                toList.setLastMsgTime(new Date());
                otcMsgUserListDao.updateById(toList);
            } else {
                toList.setIsRead(OtcMsgUserListEntity.ISREAD_TWO);
                toList.setLastMsgTime(new Date());
                otcMsgUserListDao.updateById(toList);
            }
        }
        OtcMsgHistoryEntity toHistory = new OtcMsgHistoryEntity();
        toHistory.setMemberId(toId);
        toHistory.setFromMemberId(fromId);
        toHistory.setTargetId(toId);
        toHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_TWO);
        toHistory.setMsgType(chat.getMsgType());
        toHistory.setMsg(chat.getContent());
        OtcMsgHistoryEntity fromHistory = new OtcMsgHistoryEntity();
        BeanUtil.copyProperties(toHistory, fromHistory);
        fromHistory.setIsSelf(OtcMsgHistoryEntity.ISSELF_ONE);
        fromHistory.setMemberId(fromId);
        otcMsgHistoryDao.insert(fromHistory);
        otcMsgHistoryDao.insert(toHistory);
    }
}
src/main/java/com/xcong/excoin/rabbit/producer/ChatProducer.java
@@ -1,5 +1,33 @@
package com.xcong.excoin.rabbit.producer;
public class ChatProducer {
import com.alibaba.fastjson.JSONObject;
import com.xcong.excoin.configurations.RabbitMqConfig;
import com.xcong.excoin.netty.bean.ChatRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Slf4j
@Component
public class ChatProducer implements RabbitTemplate.ConfirmCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMsgHistory(ChatRequest chatRequest) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        log.info("消息持久化消息: {}, {}", chatRequest, correlationData.getId());
        String str = JSONObject.toJSONString(chatRequest);
        rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_ONE, RabbitMqConfig.ROUTING_KEY_MSG_HISTORY, str, correlationData);
    }
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
    }
}