package com.matrix.component.rabbitmq;
|
|
import com.matrix.core.tools.LogUtil;
|
import com.matrix.core.tools.StringUtils;
|
import com.rabbitmq.client.Channel;
|
import com.rabbitmq.client.Connection;
|
|
import java.io.IOException;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.TimeoutException;
|
|
/**
|
* mq消息模板
|
*
|
* @Author jyy
|
*/
|
public class RabiitMqTemplate {
|
|
private Connection connection;
|
|
private Map<String, MqTask> taskMap = new HashMap<>();
|
|
public RabiitMqTemplate(Connection connection) {
|
this.connection = connection;
|
}
|
|
/**
|
* 申明一个交换机
|
*
|
* @param exchangeName
|
* @param direct
|
* @throws IOException
|
*/
|
public void exchangeDeclare(String exchangeName, String direct) throws IOException {
|
Channel channel = connection.createChannel();
|
channel.exchangeDeclare(exchangeName, direct);
|
}
|
|
/**
|
* 绑定任务队列以及交换机的关系
|
*
|
* @param taskList
|
* @throws IOException
|
*/
|
public void binding(List<MqTask> taskList) throws IOException {
|
for (MqTask task : taskList) {
|
LogUtil.info("绑定任务{}", task.getRoutingKey());
|
Channel channel = connection.createChannel();
|
//申明队列
|
channel.queueDeclare(task.getQueue(), true, false, false, null);
|
//绑定队列
|
channel.queueBind(task.getQueue(), task.getExchange(), task.getRoutingKey());
|
|
if (task.getHander() != null) {
|
LogUtil.info("消费者不为空{}", task.getHander());
|
channel.basicConsume(task.getQueue(), task.getAutoAck(), task.getHander(), consumerTag -> {
|
LogUtil.debug("客户端取消");
|
});
|
|
} else {
|
LogUtil.info("消费者未定义,跳过绑定");
|
}
|
taskMap.put(task.getRoutingKey(), task);
|
|
}
|
|
|
}
|
|
/**
|
* 根据路由key发送消息到交换机
|
*
|
* @param routingKey
|
* @param content
|
*/
|
public void sendMsg(String routingKey, String content) {
|
Channel channel = null;
|
try {
|
channel = connection.createChannel();
|
if (channel != null) {
|
MqTask task = taskMap.get(routingKey);
|
if (task != null) {
|
// 消息内容
|
if (StringUtils.isNotBlank(content)) {
|
channel.basicPublish(task.getExchange(), routingKey, false, null, content.getBytes());
|
} else {
|
LogUtil.info("本次发送空消息");
|
channel.basicPublish(task.getExchange(), routingKey, false, null, null);
|
}
|
//关闭通道和连接
|
channel.close();
|
} else {
|
throw new IllegalArgumentException("根据【" + routingKey + "】未获取到绑定的MqTask,请检查路由key是否正确");
|
}
|
} else {
|
LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content);
|
}
|
|
} catch (IOException | TimeoutException e) {
|
LogUtil.error("通道IO异常", e);
|
}
|
}
|
|
/**
|
* 发送消息到指定的交换机
|
* 可以试用与自定义消息和topic广播消息
|
*
|
* @param routingKey
|
* @param content
|
*/
|
public void sendTopicMsg(String exchangeName, String routingKey, String content) {
|
Channel channel = null;
|
try {
|
channel = connection.createChannel();
|
if (channel != null) {
|
|
// 消息内容
|
if (StringUtils.isNotBlank(content)) {
|
channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes());
|
} else {
|
LogUtil.info("本次发送空消息");
|
channel.basicPublish(exchangeName, routingKey, false, null, null);
|
}
|
//关闭通道和连接
|
channel.close();
|
|
} else {
|
LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content);
|
}
|
|
} catch (IOException | TimeoutException e) {
|
LogUtil.error("通道IO异常", e);
|
}
|
}
|
|
|
|
/**
|
* 发送消息到指定的交换机
|
* 可以适用与自定义消息和topic广播消息
|
*
|
* @param routingKey
|
* @param content
|
*/
|
public void sendMsg(String exchangeName, String routingKey, String content) {
|
Channel channel = null;
|
try {
|
channel = connection.createChannel();
|
if (channel != null) {
|
|
// 消息内容
|
if (StringUtils.isNotBlank(content)) {
|
channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes());
|
} else {
|
LogUtil.info("本次发送空消息");
|
channel.basicPublish(exchangeName, routingKey, false, null, null);
|
}
|
//关闭通道和连接
|
channel.close();
|
|
} else {
|
LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content);
|
}
|
|
} catch (IOException | TimeoutException e) {
|
LogUtil.error("通道IO异常", e);
|
}
|
}
|
|
public Connection getConnection() {
|
return connection;
|
}
|
|
public void setConnection(Connection connection) {
|
this.connection = connection;
|
}
|
}
|