package com.matrix.component.rabbitmq; import com.matrix.core.tools.LogUtil; import com.matrix.core.tools.StringUtils; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; /** * mq消息模板 * * @Author jyy */ public class RabiitMqTemplate { private Connection connection; private Map taskMap = new HashMap<>(); public RabiitMqTemplate(Connection connection) { this.connection = connection; } /** * 申明一个交换机 * * @param exchangeName * @param direct * @throws IOException */ public void exchangeDeclare(String exchangeName, String direct) throws IOException { Channel channel = connection.createChannel(); channel.exchangeDeclare(exchangeName, direct); } /** * 绑定任务队列以及交换机的关系 * * @param taskList * @throws IOException */ public void binding(List taskList) throws IOException { for (MqTask task : taskList) { LogUtil.info("绑定任务{}", task.getRoutingKey()); Channel channel = connection.createChannel(); //申明队列 channel.queueDeclare(task.getQueue(), true, false, false, null); //绑定队列 channel.queueBind(task.getQueue(), task.getExchange(), task.getRoutingKey()); if (task.getHander() != null) { LogUtil.info("消费者不为空{}", task.getHander()); channel.basicConsume(task.getQueue(), task.getAutoAck(), task.getHander(), consumerTag -> { LogUtil.debug("客户端取消"); }); } else { LogUtil.info("消费者未定义,跳过绑定"); } taskMap.put(task.getRoutingKey(), task); } } /** * 根据路由key发送消息到交换机 * * @param routingKey * @param content */ public void sendMsg(String routingKey, String content) { Channel channel = null; try { channel = connection.createChannel(); if (channel != null) { MqTask task = taskMap.get(routingKey); if (task != null) { // 消息内容 if (StringUtils.isNotBlank(content)) { channel.basicPublish(task.getExchange(), routingKey, false, null, content.getBytes()); } else { LogUtil.info("本次发送空消息"); channel.basicPublish(task.getExchange(), routingKey, false, null, null); } //关闭通道和连接 channel.close(); } else { throw new IllegalArgumentException("根据【" + routingKey + "】未获取到绑定的MqTask,请检查路由key是否正确"); } } else { LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); } } catch (IOException | TimeoutException e) { LogUtil.error("通道IO异常", e); } } /** * 发送消息到指定的交换机 * 可以试用与自定义消息和topic广播消息 * * @param routingKey * @param content */ public void sendTopicMsg(String exchangeName, String routingKey, String content) { Channel channel = null; try { channel = connection.createChannel(); if (channel != null) { // 消息内容 if (StringUtils.isNotBlank(content)) { channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes()); } else { LogUtil.info("本次发送空消息"); channel.basicPublish(exchangeName, routingKey, false, null, null); } //关闭通道和连接 channel.close(); } else { LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); } } catch (IOException | TimeoutException e) { LogUtil.error("通道IO异常", e); } } /** * 发送消息到指定的交换机 * 可以适用与自定义消息和topic广播消息 * * @param routingKey * @param content */ public void sendMsg(String exchangeName, String routingKey, String content) { Channel channel = null; try { channel = connection.createChannel(); if (channel != null) { // 消息内容 if (StringUtils.isNotBlank(content)) { channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes()); } else { LogUtil.info("本次发送空消息"); channel.basicPublish(exchangeName, routingKey, false, null, null); } //关闭通道和连接 channel.close(); } else { LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); } } catch (IOException | TimeoutException e) { LogUtil.error("通道IO异常", e); } } public Connection getConnection() { return connection; } public void setConnection(Connection connection) { this.connection = connection; } }