From c253b555c7905c5136d47cd615ef545fa50cc6ad Mon Sep 17 00:00:00 2001
From: 935090232@qq.com <ak473600000>
Date: Sun, 20 Feb 2022 21:24:16 +0800
Subject: [PATCH] Merge branch 'api_score_meger'

---
 zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java |  117 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 117 insertions(+), 0 deletions(-)

diff --git a/zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java b/zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java
new file mode 100644
index 0000000..a27cf0a
--- /dev/null
+++ b/zq-erp/src/main/java/com/matrix/component/asyncmessage/AsyncMessageManager.java
@@ -0,0 +1,117 @@
+/**
+ * projectName: zq-erp
+ * fileName: MessageManager.java
+ * packageName: com.matrix.component.asyncmessage
+ * date: 2021-10-18 14:01
+ * copyright(c) 2021 http://www.hydee.cn/ Inc. All rights reserved.
+ */
+package com.matrix.component.asyncmessage;
+
+import cn.hutool.core.collection.CollectionUtil;
+import com.matrix.core.tools.LogUtil;
+import com.matrix.core.tools.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.ApplicationArguments;
+import org.springframework.boot.ApplicationRunner;
+import org.springframework.core.Ordered;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * @version: V1.0
+ * @author: JiangYouYao
+ * @className: AsyncMessageManager
+ * @packageName: com.matrix.component.asyncmessage
+ * @description: 异步消息管理者
+ * @data: 2021-10-18 14:01
+ **/
+@Component
+@Order(Ordered.HIGHEST_PRECEDENCE)
+public class AsyncMessageManager implements ApplicationRunner {
+
+    @Autowired
+    private List<MessageHandler> obs;
+
+    private Map<String, List<MessageHandler>> routes;
+
+
+    @Override
+    public void run(ApplicationArguments args) throws Exception {
+        if (CollectionUtil.isNotEmpty(obs)) {
+            routes = obs.stream().collect(Collectors.groupingBy(MessageHandler::getRouteKey));
+            LogUtil.info("异步消息绑定成功,检测到{} 个消费者,共计{}组", obs.size(), routes.size());
+        } else {
+            LogUtil.info("未检测到异步消息处理类");
+        }
+    }
+
+
+
+
+    /**
+     * map 参数的字符串表示,方便快速拼装消息参数
+     * @param routeKey
+     * @param mapStr
+     */
+    public void sendMsg(String routeKey, String mapStr,Object... args){
+
+        if(StringUtils.isBlank(mapStr)){
+            throw new IllegalArgumentException("mapStr格式错误:例如:\\\"orderId=123,price=88\\\",mapStr="+mapStr);
+        }
+        mapStr=String.format(mapStr,args);
+
+        Map<String, Object> param =new HashMap<>();
+        String[] paramStr = mapStr.split(",");
+        Arrays.asList(paramStr).forEach(item->{
+            String[] keyValueArr = item.split("=");
+            param.put(keyValueArr[0],keyValueArr[1]);
+        });
+        sendMsg(routeKey,param);
+    }
+
+    /**
+     * 根据route 发送消息到对应的消费者
+     * 这个方法本质上还是同步执行,没有完成效率上的异步解耦,这里做观察者模式主要是为了扩展业务,减少对第三方消息组件的依赖
+     * 而不是解决性能问题,如果后续需要解决性能问题,在加一个消息队列,然后启动线程从队列中进行消息的消费。
+     * @param routeKey
+     * @param param
+     */
+    public void sendMsg(String routeKey, Map<String, Object> param) {
+
+        if(StringUtils.isBlank(routeKey)){
+            LogUtil.warn("发送异步消息失败:routeKey为空");
+            return;
+        }
+
+        //匹配观察者
+        List<MessageHandler> lisener = new ArrayList<>();
+        for (Map.Entry<String, List<MessageHandler>> routesEntry : routes.entrySet()) {
+            if (Pattern.matches(routesEntry.getKey(), routeKey)) {
+                lisener.addAll(routesEntry.getValue());
+            }
+        }
+
+        //通知观察者
+        if (CollectionUtil.isNotEmpty(lisener)) {
+            LogUtil.info("发送异步消息,routeKey={},匹配观察者{}个",routeKey,lisener.size());
+            for (MessageHandler messageHandler : lisener) {
+                try{
+                    messageHandler.handle(param);
+                }catch (Throwable t){
+                    LogUtil.error("{},处理类执行异常:routeKey={},message={}", messageHandler.getName(),routeKey,t.getMessage());
+                }
+            }
+        }else{
+            LogUtil.warn("未匹配到routeKey={},的消费者",routeKey);
+        }
+
+    }
+
+
+}
\ No newline at end of file

--
Gitblit v1.9.1