From c253b555c7905c5136d47cd615ef545fa50cc6ad Mon Sep 17 00:00:00 2001 From: 935090232@qq.com <ak473600000> Date: Sun, 20 Feb 2022 21:24:16 +0800 Subject: [PATCH] Merge branch 'api_score_meger' --- zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 117 insertions(+), 0 deletions(-) diff --git a/zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java b/zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java new file mode 100644 index 0000000..a27cf0a --- /dev/null +++ b/zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java @@ -0,0 +1,117 @@ +/** + * projectName: zq-erp + * fileName: MessageManager.java + * packageName: com.matrix.component.asyncmessage + * date: 2021-10-18 14:01 + * copyright(c) 2021 http://www.hydee.cn/ Inc. All rights reserved. + */ +package com.matrix.component.asyncmessage; + +import cn.hutool.core.collection.CollectionUtil; +import com.matrix.core.tools.LogUtil; +import com.matrix.core.tools.StringUtils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationArguments; +import org.springframework.boot.ApplicationRunner; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** + * @version: V1.0 + * @author: JiangYouYao + * @className: AsyncMessageManager + * @packageName: com.matrix.component.asyncmessage + * @description: 异步消息管理者 + * @data: 2021-10-18 14:01 + **/ +@Component +@Order(Ordered.HIGHEST_PRECEDENCE) +public class AsyncMessageManager implements ApplicationRunner { + + @Autowired + private List<MessageHandler> obs; + + private Map<String, List<MessageHandler>> routes; + + + @Override + public void run(ApplicationArguments args) throws Exception { + if (CollectionUtil.isNotEmpty(obs)) { + routes = obs.stream().collect(Collectors.groupingBy(MessageHandler::getRouteKey)); + LogUtil.info("异步消息绑定成功,检测到{} 个消费者,共计{}组", obs.size(), routes.size()); + } else { + LogUtil.info("未检测到异步消息处理类"); + } + } + + + + + /** + * map 参数的字符串表示,方便快速拼装消息参数 + * @param routeKey + * @param mapStr + */ + public void sendMsg(String routeKey, String mapStr,Object... args){ + + if(StringUtils.isBlank(mapStr)){ + throw new IllegalArgumentException("mapStr格式错误:例如:\\\"orderId=123,price=88\\\",mapStr="+mapStr); + } + mapStr=String.format(mapStr,args); + + Map<String, Object> param =new HashMap<>(); + String[] paramStr = mapStr.split(","); + Arrays.asList(paramStr).forEach(item->{ + String[] keyValueArr = item.split("="); + param.put(keyValueArr[0],keyValueArr[1]); + }); + sendMsg(routeKey,param); + } + + /** + * 根据route 发送消息到对应的消费者 + * 这个方法本质上还是同步执行,没有完成效率上的异步解耦,这里做观察者模式主要是为了扩展业务,减少对第三方消息组件的依赖 + * 而不是解决性能问题,如果后续需要解决性能问题,在加一个消息队列,然后启动线程从队列中进行消息的消费。 + * @param routeKey + * @param param + */ + public void sendMsg(String routeKey, Map<String, Object> param) { + + if(StringUtils.isBlank(routeKey)){ + LogUtil.warn("发送异步消息失败:routeKey为空"); + return; + } + + //匹配观察者 + List<MessageHandler> lisener = new ArrayList<>(); + for (Map.Entry<String, List<MessageHandler>> routesEntry : routes.entrySet()) { + if (Pattern.matches(routesEntry.getKey(), routeKey)) { + lisener.addAll(routesEntry.getValue()); + } + } + + //通知观察者 + if (CollectionUtil.isNotEmpty(lisener)) { + LogUtil.info("发送异步消息,routeKey={},匹配观察者{}个",routeKey,lisener.size()); + for (MessageHandler messageHandler : lisener) { + try{ + messageHandler.handle(param); + }catch (Throwable t){ + LogUtil.error("{},处理类执行异常:routeKey={},message={}", messageHandler.getName(),routeKey,t.getMessage()); + } + } + }else{ + LogUtil.warn("未匹配到routeKey={},的消费者",routeKey); + } + + } + + +} \ No newline at end of file -- Gitblit v1.9.1