fix
Helius
2021-11-08 4c2b61c51a5c2eec4c7e07945847517ddfb6914b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
 * projectName: zq-erp
 * fileName: MessageManager.java
 * packageName: com.matrix.component.asyncmessage
 * date: 2021-10-18 14:01
 * copyright(c) 2021 http://www.hydee.cn/ Inc. All rights reserved.
 */
package com.matrix.component.asyncmessage;
 
import cn.hutool.core.collection.CollectionUtil;
import com.matrix.core.tools.LogUtil;
import com.matrix.core.tools.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
 
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
 
/**
 * @version: V1.0
 * @author: JiangYouYao
 * @className: AsyncMessageManager
 * @packageName: com.matrix.component.asyncmessage
 * @description: 异步消息管理者
 * @data: 2021-10-18 14:01
 **/
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class AsyncMessageManager implements ApplicationRunner {
 
    @Autowired
    private List<MessageHandler> obs;
 
    private Map<String, List<MessageHandler>> routes;
 
 
    @Override
    public void run(ApplicationArguments args) throws Exception {
        if (CollectionUtil.isNotEmpty(obs)) {
            routes = obs.stream().collect(Collectors.groupingBy(MessageHandler::getRouteKey));
            LogUtil.info("异步消息绑定成功,检测到{} 个消费者,共计{}组", obs.size(), routes.size());
        } else {
            LogUtil.info("未检测到异步消息处理类");
        }
    }
 
 
 
 
    /**
     * map 参数的字符串表示,方便快速拼装消息参数
     * @param routeKey
     * @param mapStr
     */
    public void sendMsg(String routeKey, String mapStr,Object... args){
 
        if(StringUtils.isBlank(mapStr)){
            throw new IllegalArgumentException("mapStr格式错误:例如:\\\"orderId=123,price=88\\\",mapStr="+mapStr);
        }
        mapStr=String.format(mapStr,args);
 
        Map<String, Object> param =new HashMap<>();
        String[] paramStr = mapStr.split(",");
        Arrays.asList(paramStr).forEach(item->{
            String[] keyValueArr = item.split("=");
            param.put(keyValueArr[0],keyValueArr[1]);
        });
        sendMsg(routeKey,param);
    }
 
    /**
     * 根据route 发送消息到对应的消费者
     * 这个方法本质上还是同步执行,没有完成效率上的异步解耦,这里做观察者模式主要是为了扩展业务,减少对第三方消息组件的依赖
     * 而不是解决性能问题,如果后续需要解决性能问题,在加一个消息队列,然后启动线程从队列中进行消息的消费。
     * @param routeKey
     * @param param
     */
    public void sendMsg(String routeKey, Map<String, Object> param) {
 
        if(StringUtils.isBlank(routeKey)){
            LogUtil.warn("发送异步消息失败:routeKey为空");
            return;
        }
 
        //匹配观察者
        List<MessageHandler> lisener = new ArrayList<>();
        for (Map.Entry<String, List<MessageHandler>> routesEntry : routes.entrySet()) {
            if (Pattern.matches(routesEntry.getKey(), routeKey)) {
                lisener.addAll(routesEntry.getValue());
            }
        }
 
        //通知观察者
        if (CollectionUtil.isNotEmpty(lisener)) {
            LogUtil.info("发送异步消息,routeKey={},匹配观察者{}个",routeKey,lisener.size());
            for (MessageHandler messageHandler : lisener) {
                try{
                    messageHandler.handle(param);
                }catch (Throwable t){
                    LogUtil.error("{},处理类执行异常:routeKey={},message={}", messageHandler.getName(),routeKey,t.getMessage());
                }
            }
        }else{
            LogUtil.warn("未匹配到routeKey={},的消费者",routeKey);
        }
 
    }
 
 
}