/**
|
* projectName: zq-erp
|
* fileName: MessageManager.java
|
* packageName: com.matrix.component.asyncmessage
|
* date: 2021-10-18 14:01
|
* copyright(c) 2021 http://www.hydee.cn/ Inc. All rights reserved.
|
*/
|
package com.matrix.component.asyncmessage;
|
|
import cn.hutool.core.collection.CollectionUtil;
|
import com.matrix.core.tools.LogUtil;
|
import com.matrix.core.tools.StringUtils;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.core.Ordered;
|
import org.springframework.core.annotation.Order;
|
import org.springframework.stereotype.Component;
|
|
import java.util.*;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.regex.Pattern;
|
import java.util.stream.Collectors;
|
|
/**
|
* @version: V1.0
|
* @author: JiangYouYao
|
* @className: AsyncMessageManager
|
* @packageName: com.matrix.component.asyncmessage
|
* @description: 异步消息管理者
|
* @data: 2021-10-18 14:01
|
**/
|
@Component
|
@Order(Ordered.HIGHEST_PRECEDENCE)
|
public class AsyncMessageManager implements ApplicationRunner {
|
|
@Autowired
|
private List<MessageHandler> obs;
|
|
private Map<String, List<MessageHandler>> routes;
|
|
|
@Override
|
public void run(ApplicationArguments args) throws Exception {
|
if (CollectionUtil.isNotEmpty(obs)) {
|
routes = obs.stream().collect(Collectors.groupingBy(MessageHandler::getRouteKey));
|
LogUtil.info("异步消息绑定成功,检测到{} 个消费者,共计{}组", obs.size(), routes.size());
|
} else {
|
LogUtil.info("未检测到异步消息处理类");
|
}
|
}
|
|
|
|
|
/**
|
* map 参数的字符串表示,方便快速拼装消息参数
|
* @param routeKey
|
* @param mapStr
|
*/
|
public void sendMsg(String routeKey, String mapStr,Object... args){
|
|
if(StringUtils.isBlank(mapStr)){
|
throw new IllegalArgumentException("mapStr格式错误:例如:\\\"orderId=123,price=88\\\",mapStr="+mapStr);
|
}
|
mapStr=String.format(mapStr,args);
|
|
Map<String, Object> param =new HashMap<>();
|
String[] paramStr = mapStr.split(",");
|
Arrays.asList(paramStr).forEach(item->{
|
String[] keyValueArr = item.split("=");
|
param.put(keyValueArr[0],keyValueArr[1]);
|
});
|
sendMsg(routeKey,param);
|
}
|
|
/**
|
* 根据route 发送消息到对应的消费者
|
* 这个方法本质上还是同步执行,没有完成效率上的异步解耦,这里做观察者模式主要是为了扩展业务,减少对第三方消息组件的依赖
|
* 而不是解决性能问题,如果后续需要解决性能问题,在加一个消息队列,然后启动线程从队列中进行消息的消费。
|
* @param routeKey
|
* @param param
|
*/
|
public void sendMsg(String routeKey, Map<String, Object> param) {
|
|
if(StringUtils.isBlank(routeKey)){
|
LogUtil.warn("发送异步消息失败:routeKey为空");
|
return;
|
}
|
|
//匹配观察者
|
List<MessageHandler> lisener = new ArrayList<>();
|
for (Map.Entry<String, List<MessageHandler>> routesEntry : routes.entrySet()) {
|
if (Pattern.matches(routesEntry.getKey(), routeKey)) {
|
lisener.addAll(routesEntry.getValue());
|
}
|
}
|
|
//通知观察者
|
if (CollectionUtil.isNotEmpty(lisener)) {
|
LogUtil.info("发送异步消息,routeKey={},匹配观察者{}个",routeKey,lisener.size());
|
for (MessageHandler messageHandler : lisener) {
|
try{
|
messageHandler.handle(param);
|
}catch (Throwable t){
|
LogUtil.error("{},处理类执行异常:routeKey={},message={}", messageHandler.getName(),routeKey,t.getMessage());
|
}
|
}
|
}else{
|
LogUtil.warn("未匹配到routeKey={},的消费者",routeKey);
|
}
|
|
}
|
|
|
}
|