/** * projectName: zq-erp * fileName: MessageManager.java * packageName: com.matrix.component.asyncmessage * date: 2021-10-18 14:01 * copyright(c) 2021 http://www.hydee.cn/ Inc. All rights reserved. */ package com.matrix.component.asyncmessage; import cn.hutool.core.collection.CollectionUtil; import com.matrix.core.tools.LogUtil; import com.matrix.core.tools.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.core.Ordered; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; import java.util.*; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; import java.util.stream.Collectors; /** * @version: V1.0 * @author: JiangYouYao * @className: AsyncMessageManager * @packageName: com.matrix.component.asyncmessage * @description: 异步消息管理者 * @data: 2021-10-18 14:01 **/ @Component @Order(Ordered.HIGHEST_PRECEDENCE) public class AsyncMessageManager implements ApplicationRunner { @Autowired private List obs; private Map> routes; @Override public void run(ApplicationArguments args) throws Exception { if (CollectionUtil.isNotEmpty(obs)) { routes = obs.stream().collect(Collectors.groupingBy(MessageHandler::getRouteKey)); LogUtil.info("异步消息绑定成功,检测到{} 个消费者,共计{}组", obs.size(), routes.size()); } else { LogUtil.info("未检测到异步消息处理类"); } } /** * map 参数的字符串表示,方便快速拼装消息参数 * @param routeKey * @param mapStr */ public void sendMsg(String routeKey, String mapStr,Object... args){ if(StringUtils.isBlank(mapStr)){ throw new IllegalArgumentException("mapStr格式错误:例如:\\\"orderId=123,price=88\\\",mapStr="+mapStr); } mapStr=String.format(mapStr,args); Map param =new HashMap<>(); String[] paramStr = mapStr.split(","); Arrays.asList(paramStr).forEach(item->{ String[] keyValueArr = item.split("="); param.put(keyValueArr[0],keyValueArr[1]); }); sendMsg(routeKey,param); } /** * 根据route 发送消息到对应的消费者 * 这个方法本质上还是同步执行,没有完成效率上的异步解耦,这里做观察者模式主要是为了扩展业务,减少对第三方消息组件的依赖 * 而不是解决性能问题,如果后续需要解决性能问题,在加一个消息队列,然后启动线程从队列中进行消息的消费。 * @param routeKey * @param param */ public void sendMsg(String routeKey, Map param) { if(StringUtils.isBlank(routeKey)){ LogUtil.warn("发送异步消息失败:routeKey为空"); return; } //匹配观察者 List lisener = new ArrayList<>(); for (Map.Entry> routesEntry : routes.entrySet()) { if (Pattern.matches(routesEntry.getKey(), routeKey)) { lisener.addAll(routesEntry.getValue()); } } //通知观察者 if (CollectionUtil.isNotEmpty(lisener)) { LogUtil.info("发送异步消息,routeKey={},匹配观察者{}个",routeKey,lisener.size()); for (MessageHandler messageHandler : lisener) { try{ messageHandler.handle(param); }catch (Throwable t){ LogUtil.error("{},处理类执行异常:routeKey={},message={}", messageHandler.getName(),routeKey,t.getMessage()); } } }else{ LogUtil.warn("未匹配到routeKey={},的消费者",routeKey); } } }