| package com.matrix.component.rabbitmq; | 
|   | 
| import com.matrix.core.tools.LogUtil; | 
| import com.matrix.core.tools.StringUtils; | 
| import com.rabbitmq.client.Channel; | 
| import com.rabbitmq.client.Connection; | 
|   | 
| import java.io.IOException; | 
| import java.util.HashMap; | 
| import java.util.List; | 
| import java.util.Map; | 
| import java.util.concurrent.TimeoutException; | 
|   | 
| /** | 
|  * mq消息模板 | 
|  * | 
|  * @Author jyy | 
|  */ | 
| public class RabiitMqTemplate { | 
|   | 
|     private Connection connection; | 
|   | 
|     private Map<String, MqTask> taskMap = new HashMap<>(); | 
|   | 
|     public RabiitMqTemplate(Connection connection) { | 
|         this.connection = connection; | 
|     } | 
|   | 
|     /** | 
|      * 申明一个交换机 | 
|      * | 
|      * @param exchangeName | 
|      * @param direct | 
|      * @throws IOException | 
|      */ | 
|     public void exchangeDeclare(String exchangeName, String direct) throws IOException { | 
|         Channel channel = connection.createChannel(); | 
|         channel.exchangeDeclare(exchangeName, direct); | 
|     } | 
|   | 
|     /** | 
|      * 绑定任务队列以及交换机的关系 | 
|      * | 
|      * @param taskList | 
|      * @throws IOException | 
|      */ | 
|     public void binding(List<MqTask> taskList) throws IOException { | 
|         for (MqTask task : taskList) { | 
|             LogUtil.info("绑定任务{}", task.getRoutingKey()); | 
|             Channel channel = connection.createChannel(); | 
|             //申明队列 | 
|             channel.queueDeclare(task.getQueue(), true, false, false, null); | 
|             //绑定队列 | 
|             channel.queueBind(task.getQueue(), task.getExchange(), task.getRoutingKey()); | 
|   | 
|             if (task.getHander() != null) { | 
|                 LogUtil.info("消费者不为空{}", task.getHander()); | 
|                 channel.basicConsume(task.getQueue(), task.getAutoAck(), task.getHander(), consumerTag -> { | 
|                     LogUtil.debug("客户端取消"); | 
|                 }); | 
|   | 
|             } else { | 
|                 LogUtil.info("消费者未定义,跳过绑定"); | 
|             } | 
|             taskMap.put(task.getRoutingKey(), task); | 
|   | 
|         } | 
|   | 
|   | 
|     } | 
|   | 
|     /** | 
|      * 根据路由key发送消息到交换机 | 
|      * | 
|      * @param routingKey | 
|      * @param content | 
|      */ | 
|     public void sendMsg(String routingKey, String content) { | 
|         Channel channel = null; | 
|         try { | 
|             channel = connection.createChannel(); | 
|             if (channel != null) { | 
|                 MqTask task = taskMap.get(routingKey); | 
|                 if (task != null) { | 
|                     // 消息内容 | 
|                     if (StringUtils.isNotBlank(content)) { | 
|                         LogUtil.info("本次发送消息task.getExchange()="+task.getExchange()+";routingKey="+routingKey+";content="+content); | 
|                         channel.basicPublish(task.getExchange(), routingKey, false, null, content.getBytes()); | 
|                     } else { | 
|                         LogUtil.info("本次发送空消息"); | 
|                         channel.basicPublish(task.getExchange(), routingKey, false, null, null); | 
|                     } | 
|                     //关闭通道和连接 | 
|                     channel.close(); | 
|                 } else { | 
|                     throw new IllegalArgumentException("根据【" + routingKey + "】未获取到绑定的MqTask,请检查路由key是否正确"); | 
|                 } | 
|             } else { | 
|                 LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); | 
|             } | 
|   | 
|         } catch (IOException | TimeoutException e) { | 
|             LogUtil.error("通道IO异常", e); | 
|         } | 
|     } | 
|   | 
|     /** | 
|      * 发送消息到指定的交换机 | 
|      * 可以试用与自定义消息和topic广播消息 | 
|      * | 
|      * @param routingKey | 
|      * @param content | 
|      */ | 
|     public void sendTopicMsg(String exchangeName, String routingKey, String content) { | 
|         Channel channel = null; | 
|         try { | 
|             channel = connection.createChannel(); | 
|             if (channel != null) { | 
|   | 
|                     // 消息内容 | 
|                     if (StringUtils.isNotBlank(content)) { | 
|                         channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes()); | 
|                     } else { | 
|                         LogUtil.info("本次发送空消息"); | 
|                         channel.basicPublish(exchangeName, routingKey, false, null, null); | 
|                     } | 
|                     //关闭通道和连接 | 
|                     channel.close(); | 
|   | 
|             } else { | 
|                 LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); | 
|             } | 
|   | 
|         } catch (IOException | TimeoutException e) { | 
|             LogUtil.error("通道IO异常", e); | 
|         } | 
|     } | 
|   | 
|   | 
|   | 
|     /** | 
|      * 发送消息到指定的交换机 | 
|      * 可以适用与自定义消息和topic广播消息 | 
|      * | 
|      * @param routingKey | 
|      * @param content | 
|      */ | 
|     public void sendMsg(String exchangeName, String routingKey, String content) { | 
|         Channel channel = null; | 
|         try { | 
|             channel = connection.createChannel(); | 
|             if (channel != null) { | 
|   | 
|                 // 消息内容 | 
|                 if (StringUtils.isNotBlank(content)) { | 
|                     channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes()); | 
|                 } else { | 
|                     LogUtil.info("本次发送空消息"); | 
|                     channel.basicPublish(exchangeName, routingKey, false, null, null); | 
|                 } | 
|                 //关闭通道和连接 | 
|                 channel.close(); | 
|   | 
|             } else { | 
|                 LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); | 
|             } | 
|   | 
|         } catch (IOException | TimeoutException e) { | 
|             LogUtil.error("通道IO异常", e); | 
|         } | 
|     } | 
|   | 
|     public Connection getConnection() { | 
|         return connection; | 
|     } | 
|   | 
|     public void setConnection(Connection connection) { | 
|         this.connection = connection; | 
|     } | 
| } |