RabbitMQ-死信队列

死信队列

  • 利用DLX,当消息在一个队列中变成死信(dead message)之后,他能够被重新publish到另一个Exchange,这
    个Exchange就是DLX.

消息变成死信有以下几种情况

  • 消息被拒绝(basic.reject/basic.nack)并且requeue=false

  • 消息TTL过期

  • 队列到达最大长度

死信队列特点

  • DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个
    队列的属性.

  • 当这个队列中有死信时,RabbitMQ就会自动将这个消息重新发布到设置的这个Exchange上去,进而被路由到另
    一个队列.

  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能.

死信队列设置

  • 首先需要设置死信队列的exchange和queue,然后进行绑定;例:

    • Exchange:dlx.exchange
    • Queue:dlx.queue
    • RoutingKey:#
  • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加一个参数即可:arguments.put
    (“x-dead-letter-exchange”, “dlx.exchange”);

  • 这样消息在过期、requeue、队列在到达最大长度时,消息就可以直接路由到死信队列!

死信队列实现


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lylgjiavg.step02.limit.MyConsumer;

import java.util.HashMap;
import java.util.Map;

/**
 * @Classname Consumer
 * @Description 死信队列:消费端
 * @Date 2019/9/10 15:48
 * @Created by Jiavg
 */
public class Consumer {

    public static void main(String[] args) throws Exception{

        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        Connection connection = connectionFactory.newConnection();

        Channel channel = connection.createChannel();

        String exchangeName = "test_DLX_Exchange";
        String routingKey = "DLX.save";
        String queueName = "test_DLX_Queue";

        channel.exchangeDeclare(exchangeName, "topic", true, false, null);

        Map<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "dlx.exchange");
        // 这个arguments属性要设置到声明队列上
        channel.queueDeclare(queueName, true,false,false, arguments);
        channel.queueBind(queueName, exchangeName, routingKey);

        // 要进行死信队列的声明
        channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
        channel.queueDeclare("dlx.queue", true, false, false, null);
        channel.queueBind("dlx.queue", "dlx.exchange", "#");

        channel.basicConsume(queueName, true, new MyConsumer(channel));        
    }    
}

   转载规则


《RabbitMQ-死信队列》 Jiavg 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录