package com.matrix.component.rabbitmq; 
 | 
  
 | 
import com.matrix.core.tools.LogUtil; 
 | 
import com.matrix.core.tools.StringUtils; 
 | 
import com.rabbitmq.client.Channel; 
 | 
import com.rabbitmq.client.Connection; 
 | 
  
 | 
import java.io.IOException; 
 | 
import java.util.HashMap; 
 | 
import java.util.List; 
 | 
import java.util.Map; 
 | 
import java.util.concurrent.TimeoutException; 
 | 
  
 | 
/** 
 | 
 * mq消息模板 
 | 
 * 
 | 
 * @Author jyy 
 | 
 */ 
 | 
public class RabiitMqTemplate { 
 | 
  
 | 
    private Connection connection; 
 | 
  
 | 
    private Map<String, MqTask> taskMap = new HashMap<>(); 
 | 
  
 | 
    public RabiitMqTemplate(Connection connection) { 
 | 
        this.connection = connection; 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 申明一个交换机 
 | 
     * 
 | 
     * @param exchangeName 
 | 
     * @param direct 
 | 
     * @throws IOException 
 | 
     */ 
 | 
    public void exchangeDeclare(String exchangeName, String direct) throws IOException { 
 | 
        Channel channel = connection.createChannel(); 
 | 
        channel.exchangeDeclare(exchangeName, direct); 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 绑定任务队列以及交换机的关系 
 | 
     * 
 | 
     * @param taskList 
 | 
     * @throws IOException 
 | 
     */ 
 | 
    public void binding(List<MqTask> taskList) throws IOException { 
 | 
        for (MqTask task : taskList) { 
 | 
            LogUtil.info("绑定任务{}", task.getRoutingKey()); 
 | 
            Channel channel = connection.createChannel(); 
 | 
            //申明队列 
 | 
            channel.queueDeclare(task.getQueue(), true, false, false, null); 
 | 
            //绑定队列 
 | 
            channel.queueBind(task.getQueue(), task.getExchange(), task.getRoutingKey()); 
 | 
  
 | 
            if (task.getHander() != null) { 
 | 
                LogUtil.info("消费者不为空{}", task.getHander()); 
 | 
                channel.basicConsume(task.getQueue(), task.getAutoAck(), task.getHander(), consumerTag -> { 
 | 
                    LogUtil.debug("客户端取消"); 
 | 
                }); 
 | 
  
 | 
            } else { 
 | 
                LogUtil.info("消费者未定义,跳过绑定"); 
 | 
            } 
 | 
            taskMap.put(task.getRoutingKey(), task); 
 | 
  
 | 
        } 
 | 
  
 | 
  
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 根据路由key发送消息到交换机 
 | 
     * 
 | 
     * @param routingKey 
 | 
     * @param content 
 | 
     */ 
 | 
    public void sendMsg(String routingKey, String content) { 
 | 
        Channel channel = null; 
 | 
        try { 
 | 
            channel = connection.createChannel(); 
 | 
            if (channel != null) { 
 | 
                MqTask task = taskMap.get(routingKey); 
 | 
                if (task != null) { 
 | 
                    // 消息内容 
 | 
                    if (StringUtils.isNotBlank(content)) { 
 | 
                        LogUtil.info("本次发送消息task.getExchange()="+task.getExchange()+";routingKey="+routingKey+";content="+content); 
 | 
                        channel.basicPublish(task.getExchange(), routingKey, false, null, content.getBytes()); 
 | 
                    } else { 
 | 
                        LogUtil.info("本次发送空消息"); 
 | 
                        channel.basicPublish(task.getExchange(), routingKey, false, null, null); 
 | 
                    } 
 | 
                    //关闭通道和连接 
 | 
                    channel.close(); 
 | 
                } else { 
 | 
                    throw new IllegalArgumentException("根据【" + routingKey + "】未获取到绑定的MqTask,请检查路由key是否正确"); 
 | 
                } 
 | 
            } else { 
 | 
                LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); 
 | 
            } 
 | 
  
 | 
        } catch (IOException | TimeoutException e) { 
 | 
            LogUtil.error("通道IO异常", e); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    /** 
 | 
     * 发送消息到指定的交换机 
 | 
     * 可以试用与自定义消息和topic广播消息 
 | 
     * 
 | 
     * @param routingKey 
 | 
     * @param content 
 | 
     */ 
 | 
    public void sendTopicMsg(String exchangeName, String routingKey, String content) { 
 | 
        Channel channel = null; 
 | 
        try { 
 | 
            channel = connection.createChannel(); 
 | 
            if (channel != null) { 
 | 
  
 | 
                    // 消息内容 
 | 
                    if (StringUtils.isNotBlank(content)) { 
 | 
                        channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes()); 
 | 
                    } else { 
 | 
                        LogUtil.info("本次发送空消息"); 
 | 
                        channel.basicPublish(exchangeName, routingKey, false, null, null); 
 | 
                    } 
 | 
                    //关闭通道和连接 
 | 
                    channel.close(); 
 | 
  
 | 
            } else { 
 | 
                LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); 
 | 
            } 
 | 
  
 | 
        } catch (IOException | TimeoutException e) { 
 | 
            LogUtil.error("通道IO异常", e); 
 | 
        } 
 | 
    } 
 | 
  
 | 
  
 | 
  
 | 
    /** 
 | 
     * 发送消息到指定的交换机 
 | 
     * 可以适用与自定义消息和topic广播消息 
 | 
     * 
 | 
     * @param routingKey 
 | 
     * @param content 
 | 
     */ 
 | 
    public void sendMsg(String exchangeName, String routingKey, String content) { 
 | 
        Channel channel = null; 
 | 
        try { 
 | 
            channel = connection.createChannel(); 
 | 
            if (channel != null) { 
 | 
  
 | 
                // 消息内容 
 | 
                if (StringUtils.isNotBlank(content)) { 
 | 
                    channel.basicPublish(exchangeName, routingKey, false, null, content.getBytes()); 
 | 
                } else { 
 | 
                    LogUtil.info("本次发送空消息"); 
 | 
                    channel.basicPublish(exchangeName, routingKey, false, null, null); 
 | 
                } 
 | 
                //关闭通道和连接 
 | 
                channel.close(); 
 | 
  
 | 
            } else { 
 | 
                LogUtil.error("消息发送失败,通道获取失败 chnnel={},routingKey={},content={}", channel, routingKey, content); 
 | 
            } 
 | 
  
 | 
        } catch (IOException | TimeoutException e) { 
 | 
            LogUtil.error("通道IO异常", e); 
 | 
        } 
 | 
    } 
 | 
  
 | 
    public Connection getConnection() { 
 | 
        return connection; 
 | 
    } 
 | 
  
 | 
    public void setConnection(Connection connection) { 
 | 
        this.connection = connection; 
 | 
    } 
 | 
} 
 |