| /** | 
|  * projectName: zq-erp | 
|  * fileName: MessageManager.java | 
|  * packageName: com.matrix.component.asyncmessage | 
|  * date: 2021-10-18 14:01 | 
|  * copyright(c) 2021 http://www.hydee.cn/ Inc. All rights reserved. | 
|  */ | 
| package com.matrix.component.asyncmessage; | 
|   | 
| import cn.hutool.core.collection.CollectionUtil; | 
| import com.matrix.core.tools.LogUtil; | 
| import com.matrix.core.tools.StringUtils; | 
| import org.springframework.beans.factory.annotation.Autowired; | 
| import org.springframework.boot.ApplicationArguments; | 
| import org.springframework.boot.ApplicationRunner; | 
| import org.springframework.core.Ordered; | 
| import org.springframework.core.annotation.Order; | 
| import org.springframework.stereotype.Component; | 
|   | 
| import java.util.*; | 
| import java.util.concurrent.ConcurrentLinkedQueue; | 
| import java.util.concurrent.locks.ReentrantLock; | 
| import java.util.regex.Pattern; | 
| import java.util.stream.Collectors; | 
|   | 
| /** | 
|  * @version: V1.0 | 
|  * @author: JiangYouYao | 
|  * @className: AsyncMessageManager | 
|  * @packageName: com.matrix.component.asyncmessage | 
|  * @description: 异步消息管理者 | 
|  * @data: 2021-10-18 14:01 | 
|  **/ | 
| @Component | 
| @Order(Ordered.HIGHEST_PRECEDENCE) | 
| public class AsyncMessageManager implements ApplicationRunner { | 
|   | 
|     @Autowired | 
|     private List<MessageHandler> obs; | 
|   | 
|     private Map<String, List<MessageHandler>> routes; | 
|   | 
|   | 
|     @Override | 
|     public void run(ApplicationArguments args) throws Exception { | 
|         if (CollectionUtil.isNotEmpty(obs)) { | 
|             routes = obs.stream().collect(Collectors.groupingBy(MessageHandler::getRouteKey)); | 
|             LogUtil.info("异步消息绑定成功,检测到{} 个消费者,共计{}组", obs.size(), routes.size()); | 
|         } else { | 
|             LogUtil.info("未检测到异步消息处理类"); | 
|         } | 
|     } | 
|   | 
|   | 
|   | 
|   | 
|     /** | 
|      * map 参数的字符串表示,方便快速拼装消息参数 | 
|      * @param routeKey | 
|      * @param mapStr | 
|      */ | 
|     public void sendMsg(String routeKey, String mapStr,Object... args){ | 
|   | 
|         if(StringUtils.isBlank(mapStr)){ | 
|             throw new IllegalArgumentException("mapStr格式错误:例如:\\\"orderId=123,price=88\\\",mapStr="+mapStr); | 
|         } | 
|         mapStr=String.format(mapStr,args); | 
|   | 
|         Map<String, Object> param =new HashMap<>(); | 
|         String[] paramStr = mapStr.split(","); | 
|         Arrays.asList(paramStr).forEach(item->{ | 
|             String[] keyValueArr = item.split("="); | 
|             param.put(keyValueArr[0],keyValueArr[1]); | 
|         }); | 
|         sendMsg(routeKey,param); | 
|     } | 
|   | 
|     /** | 
|      * 根据route 发送消息到对应的消费者 | 
|      * 这个方法本质上还是同步执行,没有完成效率上的异步解耦,这里做观察者模式主要是为了扩展业务,减少对第三方消息组件的依赖 | 
|      * 而不是解决性能问题,如果后续需要解决性能问题,在加一个消息队列,然后启动线程从队列中进行消息的消费。 | 
|      * @param routeKey | 
|      * @param param | 
|      */ | 
|     public void sendMsg(String routeKey, Map<String, Object> param) { | 
|   | 
|         if(StringUtils.isBlank(routeKey)){ | 
|             LogUtil.warn("发送异步消息失败:routeKey为空"); | 
|             return; | 
|         } | 
|   | 
|         //匹配观察者 | 
|         List<MessageHandler> lisener = new ArrayList<>(); | 
|         for (Map.Entry<String, List<MessageHandler>> routesEntry : routes.entrySet()) { | 
|             if (Pattern.matches(routesEntry.getKey(), routeKey)) { | 
|                 lisener.addAll(routesEntry.getValue()); | 
|             } | 
|         } | 
|   | 
|         //通知观察者 | 
|         if (CollectionUtil.isNotEmpty(lisener)) { | 
|             LogUtil.info("发送异步消息,routeKey={},匹配观察者{}个",routeKey,lisener.size()); | 
|             for (MessageHandler messageHandler : lisener) { | 
|                 try{ | 
|                     messageHandler.handle(param); | 
|                 }catch (Throwable t){ | 
|                     LogUtil.error("{},处理类执行异常:routeKey={},message={}", messageHandler.getName(),routeKey,t.getMessage()); | 
|                 } | 
|             } | 
|         }else{ | 
|             LogUtil.warn("未匹配到routeKey={},的消费者",routeKey); | 
|         } | 
|   | 
|     } | 
|   | 
|   | 
| } |