New file |
| | |
| | | /** |
| | | * projectName: zq-erp |
| | | * fileName: MessageManager.java |
| | | * packageName: com.matrix.component.asyncmessage |
| | | * date: 2021-10-18 14:01 |
| | | * copyright(c) 2021 http://www.hydee.cn/ Inc. All rights reserved. |
| | | */ |
| | | package com.matrix.component.asyncmessage; |
| | | |
| | | import cn.hutool.core.collection.CollectionUtil; |
| | | import com.matrix.core.tools.LogUtil; |
| | | import com.matrix.core.tools.StringUtils; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.boot.ApplicationArguments; |
| | | import org.springframework.boot.ApplicationRunner; |
| | | import org.springframework.core.Ordered; |
| | | import org.springframework.core.annotation.Order; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.util.*; |
| | | import java.util.concurrent.ConcurrentLinkedQueue; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | import java.util.regex.Pattern; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * @version: V1.0 |
| | | * @author: JiangYouYao |
| | | * @className: AsyncMessageManager |
| | | * @packageName: com.matrix.component.asyncmessage |
| | | * @description: 异步消息管理者 |
| | | * @data: 2021-10-18 14:01 |
| | | **/ |
| | | @Component |
| | | @Order(Ordered.HIGHEST_PRECEDENCE) |
| | | public class AsyncMessageManager implements ApplicationRunner { |
| | | |
| | | @Autowired |
| | | private List<MessageHandler> obs; |
| | | |
| | | private Map<String, List<MessageHandler>> routes; |
| | | |
| | | |
| | | @Override |
| | | public void run(ApplicationArguments args) throws Exception { |
| | | if (CollectionUtil.isNotEmpty(obs)) { |
| | | routes = obs.stream().collect(Collectors.groupingBy(MessageHandler::getRouteKey)); |
| | | LogUtil.info("异步消息绑定成功,检测到{} 个消费者,共计{}组", obs.size(), routes.size()); |
| | | } else { |
| | | LogUtil.info("未检测到异步消息处理类"); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | |
| | | /** |
| | | * map 参数的字符串表示,方便快速拼装消息参数 |
| | | * @param routeKey |
| | | * @param mapStr |
| | | */ |
| | | public void sendMsg(String routeKey, String mapStr,Object... args){ |
| | | |
| | | if(StringUtils.isBlank(mapStr)){ |
| | | throw new IllegalArgumentException("mapStr格式错误:例如:\\\"orderId=123,price=88\\\",mapStr="+mapStr); |
| | | } |
| | | mapStr=String.format(mapStr,args); |
| | | |
| | | Map<String, Object> param =new HashMap<>(); |
| | | String[] paramStr = mapStr.split(","); |
| | | Arrays.asList(paramStr).forEach(item->{ |
| | | String[] keyValueArr = item.split("="); |
| | | param.put(keyValueArr[0],keyValueArr[1]); |
| | | }); |
| | | sendMsg(routeKey,param); |
| | | } |
| | | |
| | | /** |
| | | * 根据route 发送消息到对应的消费者 |
| | | * 这个方法本质上还是同步执行,没有完成效率上的异步解耦,这里做观察者模式主要是为了扩展业务,减少对第三方消息组件的依赖 |
| | | * 而不是解决性能问题,如果后续需要解决性能问题,在加一个消息队列,然后启动线程从队列中进行消息的消费。 |
| | | * @param routeKey |
| | | * @param param |
| | | */ |
| | | public void sendMsg(String routeKey, Map<String, Object> param) { |
| | | |
| | | if(StringUtils.isBlank(routeKey)){ |
| | | LogUtil.warn("发送异步消息失败:routeKey为空"); |
| | | return; |
| | | } |
| | | |
| | | //匹配观察者 |
| | | List<MessageHandler> lisener = new ArrayList<>(); |
| | | for (Map.Entry<String, List<MessageHandler>> routesEntry : routes.entrySet()) { |
| | | if (Pattern.matches(routesEntry.getKey(), routeKey)) { |
| | | lisener.addAll(routesEntry.getValue()); |
| | | } |
| | | } |
| | | |
| | | //通知观察者 |
| | | if (CollectionUtil.isNotEmpty(lisener)) { |
| | | LogUtil.info("发送异步消息,routeKey={},匹配观察者{}个",routeKey,lisener.size()); |
| | | for (MessageHandler messageHandler : lisener) { |
| | | try{ |
| | | messageHandler.handle(param); |
| | | }catch (Throwable t){ |
| | | LogUtil.error("{},处理类执行异常:routeKey={},message={}", messageHandler.getName(),routeKey,t.getMessage()); |
| | | } |
| | | } |
| | | }else{ |
| | | LogUtil.warn("未匹配到routeKey={},的消费者",routeKey); |
| | | } |
| | | |
| | | } |
| | | |
| | | |
| | | } |