Confirm消息确认机制
- 消息的确认,是指生产者投递消息后,如果Broker接收到消息,则会给我们生产者一个应答.
- 生产者进行接收应答,用来确认这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保
障.
Confirm确认消息流程解析
确认机制流程图
如何实现Confirm确认消息?
- Step1:在channel上开启确认模式:channel.confirmSelect();
- Step2:在Channel上添加监听:addConfirmListener,监听成功和失败的返回结果,根据具体的结果对消息进
行重新发送、或记录日志等后续处理。
Confirm确认消息代码演示
消费者实现
消费者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @Classname Consumer
* @Description Confirm确认机制:消费者
* @Date 2019/9/7 10:48
* @Created by Jiavg
*/
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_Confirm_Exchange";
String routingKey = "confirm.#";
String queueName = "test_Confirm_Queue";
// 声明交换机、声明队列、绑定队列到交换机
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 创建消费者、消费队列
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
byte[] body = delivery.getBody();
System.out.printf("消费端:" + new String(body));
}
}
}
消费者运行-管控台变化
新增对应交换机
新增对应队列
队列绑定关系
生产者实现
生产者代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
/**
* @Classname Producer
* @Description Confirm确认机制:生产者
* @Date 2019/9/7 10:42
* @Created by Jiavg
*/
public class Producer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 指定消息投递模式:消息的确认模式
channel.confirmSelect();
String exchangeName = "test_Confirm_Exchange";
String routingKey = "confirm.save";
String msg = "Hello World send Confirm Message";
// 发送消息
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("-------Ack-------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.printf("------No Ack-----");
}
});
}
}
生产者运行-控制台变化
消费端接收到监听队列的消息.
生产端的确认监听方法打印出相应信息.
注意事项
Point1
Confirm确认机制是生产者和Broker(RabbitMQ)之间的确认机制,不涉及到消费者是否消费了此条消息.
为了更好的理解,我们进行了如下实验:
紧接着上面的实验:我们关闭消费者端,并重新运行生产者端:
管控台变化
通过上图我们可以看出由于关闭了消费者端,该条信息并未被消费,故存留在RabbitMQ的队列中.
生产者控制台变化
通过上图可以看出,Broker给生产者发送了Ack消息确认.由此可见,只要是Broker正确的接收到消息就会给
生产端发送Ack确认,而不管消费端是否正常消费了此条消息.
Point2
Confirm确认机制是生产者和Broker(RabbitMQ)之间的确认机制,甚至Broker没有将此消息正确路由到对
应的队列时,还是会给生产者端发送Ack确认.
为了更好的理解,我们进行了如下实验:
紧接着上面的实验,我们修改生产者的发送消息的路由键为”unConfirm.save”(此时RabbitMQ中并没有对应
的队列),此时我们运行生产端.
管控台变化
通过上图我们可以看出由于修改了生产者的消息的路由键,此时虽然Message Rates虽然发生改变(Broker接
收到消息),但是并没有将此条消息正确的路由到对应的队列中(Queued Messages的总消息数量没有发生改
变).
生产者控制台变化
通过上图可以看出,Broker给生产者发送了Ack消息确认.由此可见,虽然消息并未正确的路由到队列中,但是
Broker接收到了消息就会给生产者端发送Ack确认.
Confirm确认监听情况说明
什么时候会走 handleNack 方法呢,比如磁盘写满了,MQ出现了一些异常,或者Queue容量到达上限了等等
也有可能两个方法都不走,比如生产端发送消息就失败了,或者Broker端收到消息在返回ack时中途出现了
网络闪断。这种情况就需要定时任务去抓取中间状态的消息进行最大努力尝试次数的补偿重发,从而保障消息投递的可
靠性。