死信队列
- 利用DLX,当消息在一个队列中变成死信(dead message)之后,他能够被重新publish到另一个Exchange,这
个Exchange就是DLX.
消息变成死信有以下几种情况
消息被拒绝(basic.reject/basic.nack)并且requeue=false
消息TTL过期
队列到达最大长度
死信队列特点
DLX也是一个正常的Exchange,和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个
队列的属性.当这个队列中有死信时,RabbitMQ就会自动将这个消息重新发布到设置的这个Exchange上去,进而被路由到另
一个队列.可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能.
死信队列设置
首先需要设置死信队列的exchange和queue,然后进行绑定;例:
- Exchange:dlx.exchange
- Queue:dlx.queue
- RoutingKey:#
然后我们进行正常声明交换机、队列、绑定,只不过我们需要在队列上加一个参数即可:arguments.put
(“x-dead-letter-exchange”, “dlx.exchange”);这样消息在过期、requeue、队列在到达最大长度时,消息就可以直接路由到死信队列!
死信队列实现
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lylgjiavg.step02.limit.MyConsumer;
import java.util.HashMap;
import java.util.Map;
/**
* @Classname Consumer
* @Description 死信队列:消费端
* @Date 2019/9/10 15:48
* @Created by Jiavg
*/
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_DLX_Exchange";
String routingKey = "DLX.save";
String queueName = "test_DLX_Queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
// 这个arguments属性要设置到声明队列上
channel.queueDeclare(queueName, true,false,false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
// 要进行死信队列的声明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}