RabbitMQ的Return消息机制
Return消息机制的作用
Return Listener用于处理一些不可路由的消息.
我们的消息生产者,通过指定一个Exchange和RoutingKey,把消息送达到某一个队列中去,然后我们的消费者
监听队列,进行消费处理操作.但是在某些情况下,如果我们发送息的时消候,当前的Exchange不存在或者指定的路由Key路由不到,这个时候
如果我们需要监听这种不可到达的消息,就需要使用Return Listener.
启用Return消息机制的关键配置
- 在基础API中有一个关键的配置项:
Mandatory:如果为true,则监听器会接收到路由不可到达的消息,然后进行后续处理;如果为false,那么
broker端会自动删除该消息.
Return消息机制流程
Return消息机制流程图如下所示:
Return消息机制代码演示
消费端实现
消费者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @Classname Consumer
* @Description Return消息机制:消费者
* @Date 2019/9/8 10:48
* @Created by Jiavg
*/
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_Return_Exchange";
String routingKey = "return.#";
String queueName = "test_Return_Queue";
// 声明交换机、声明队列、绑定队列到交换机
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者、消费队列
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
byte[] body = delivery.getBody();
System.out.printf("消费端:" + new String(body));
}
}
}
启动消费端-管控台变化
交换机变化
交换机绑定情况
注意:此交换机与队列绑定的路由Key我们在生产者端时并不遵守此路由key,即让消息无法正确路由,以触发
ReturnListener.
生产端实现
生产者代码
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Classname Producer
* @Description Return消息机制:生产者
* @Date 2019/9/8 10:50
* @Created by Jiavg
*/
public class Producer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_Return_Exchange";
String routingKey = "unReturn.save";
String msg = "Hello World send Return Message";
// 添加一个Return监听
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----------handle return-----------");
System.out.println("replyCode:" + replyCode);
System.out.println("replyText:" + replyText);
System.out.println("exchange:" + exchange);
System.out.println("routingKey:" + routingKey);
System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
});
// 发送消息:第三个参数mandatory:true,默认为false
channel.basicPublish(exchangeName, routingKey, true, null, msg.getBytes());
}
}
注意:消息的路由key我们设置为unReturn.save,在相应交换机上并没有符合此路由key的队列,此条消息将
无法正确路由到队列上.
启动生产端-管控台变化
Overview页面变化
由上图可知,虽然RabbitMQ接收到了一条消息(Message rates发生变化),但是并没有被路由到正确队列
中(Queued messages的Total为0).
生产者控制台变化
可见,由于消息无法正确路由到相应的队列中,触发了Return Listener监听器(打印了相应的信息).
注意:要使RabbitMQ回送给生产者无法路由的消息,在生产者端basicPublish(String exchange, String
routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;的
mandatory要设置为true,否则Broker端默认会自动删除该消息.