RabbitMQ-Return消息机制

RabbitMQ的Return消息机制

Return消息机制的作用

  • Return Listener用于处理一些不可路由的消息.

  • 我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费者
    监听队列,进行消费处理操作.

  • 但是在某些情况下,如果我们发送息的时消候,当前的Exchange不存在或者指定的路由Key路由不到,这个时候
    如果我们需要监听这种不可到达的消息,就需要使用Return Listener.

启用Return消息机制的关键配置

  • 在基础API中有一个关键的配置项:
    Mandatory:如果为true,则监听器会接收到路由不可到达的消息,然后进行后续处理;如果为false,那么
    broker端会自动删除该消息.

Return消息机制流程

Return消息机制流程图如下所示:

Return消息机制代码演示

消费端实现

消费者代码


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @Classname Consumer
 * @Description Return消息机制:消费者
 * @Date 2019/9/8 10:48
 * @Created by Jiavg
 */
public class Consumer {

    public static void main(String[] args) throws Exception{

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_Return_Exchange";
        String routingKey = "return.#";
        String queueName = "test_Return_Queue";

        // 声明交换机、声明队列、绑定队列到交换机
        channel.exchangeDeclare(exchangeName, "topic", true, false, null);
        channel.queueDeclare(queueName, true, false, false, null);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 创建消费者、消费队列
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
            byte[] body = delivery.getBody();

            System.out.printf("消费端:" + new String(body));
        }
    }   
}

启动消费端-管控台变化

交换机变化

交换机绑定情况

注意:此交换机与队列绑定的路由Key我们在生产者端时并不遵守此路由key,即让消息无法正确路由,以触发
ReturnListener.

生产端实现

生产者代码


import com.rabbitmq.client.*;
import java.io.IOException;

/**
 * @Classname Producer
 * @Description Return消息机制:生产者
 * @Date 2019/9/8 10:50
 * @Created by Jiavg
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_Return_Exchange";
        String routingKey = "unReturn.save";
        String msg = "Hello World send Return Message";

        // 添加一个Return监听
        channel.addReturnListener(new ReturnListener() {
            @Override
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, 
                                     AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----------handle return-----------");
                System.out.println("replyCode:" + replyCode);
                System.out.println("replyText:" + replyText);
                System.out.println("exchange:" + exchange);
                System.out.println("routingKey:" + routingKey);
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        });

        // 发送消息:第三个参数mandatory:true,默认为false
        channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());               
    }    
}

注意:消息的路由key我们设置为unReturn.save,在相应交换机上并没有符合此路由key的队列,此条消息将
无法正确路由到队列上.

启动生产端-管控台变化

Overview页面变化

由上图可知,虽然RabbitMQ接收到了一条消息(Message rates发生变化),但是并没有被路由到正确队列
中(Queued messages的Total为0).

生产者控制台变化

可见,由于消息无法正确路由到相应的队列中,触发了Return Listener监听器(打印了相应的信息).

注意:要使RabbitMQ回送给生产者无法路由的消息,在生产者端basicPublish(String exchange, String
routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;的
mandatory要设置为true,否则Broker端默认会自动删除该消息.


   转载规则


《RabbitMQ-Return消息机制》 Jiavg 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录