消费端自定义监听
我们一般就是在代码中编写while循环,进行consumer.nextDelivery()方法进行获取下一条消息,然后进行
消费处理.但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!
实现消费端自定义监听的两种方式
实现com.rabbitmq.client.Consumer接口
import java.io.IOException;
/**
* Interface for application callback objects to receive notifications and messages from
* a queue by subscription.
* Most implementations will subclass {@link DefaultConsumer}.
* <p/>
* The methods of this interface are invoked in a dispatch
* thread which is separate from the {@link Connection}'s thread. This
* allows {@link Consumer}s to call {@link Channel} or {@link
* Connection} methods without causing a deadlock.
* <p/>
* The {@link Consumer}s on a particular {@link Channel} are invoked serially on one or more
* dispatch threads. {@link Consumer}s should avoid executing long-running code
* because this will delay dispatch of messages to other {@link Consumer}s on the same
* {@link Channel}.
*
* @see Channel#basicConsume(String, boolean, String, boolean, boolean, java.util.Map, Consumer)
* @see Channel#basicCancel
*/
public interface Consumer {
/**
* Called when the consumer is registered by a call to any of the
* {@link Channel#basicConsume} methods.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleConsumeOk(String consumerTag);
/**
* Called when the consumer is cancelled by a call to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleCancelOk(String consumerTag);
/**
* Called when the consumer is cancelled for reasons <i>other than</i> by a call to
* {@link Channel#basicCancel}. For example, the queue has been deleted.
* See {@link #handleCancelOk} for notification of consumer
* cancellation due to {@link Channel#basicCancel}.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @throws IOException
*/
void handleCancel(String consumerTag) throws IOException;
/**
* Called when either the channel or the underlying connection has been shut down.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param sig a {@link ShutdownSignalException} indicating the reason for the shut down
*/
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
/**
* Called when a <code><b>basic.recover-ok</b></code> is received
* in reply to a <code><b>basic.recover</b></code>. All messages
* received before this is invoked that haven't been <i>ack</i>'ed will be
* re-delivered. All messages received afterwards won't be.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
*/
void handleRecoverOk(String consumerTag);
/**
* Called when a <code><b>basic.deliver</b></code> is received for this consumer.
* @param consumerTag the <i>consumer tag</i> associated with the consumer
* @param envelope packaging data for the message
* @param properties content header data for the message
* @param body the message body (opaque, client-specific byte array)
* @throws IOException if the consumer encounters an I/O error while processing the message
* @see Envelope
*/
void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException;
}
继承com.rabbitmq.client.DefaultConsumer类
import java.io.IOException;
/**
* Convenience class providing a default implementation of {@link Consumer}.
* We anticipate that most Consumer implementations will subclass this class.
*/
public class DefaultConsumer implements Consumer {
/** Channel that this consumer is associated with. */
private final Channel _channel;
/** Consumer tag for this consumer. */
private volatile String _consumerTag;
/**
* Constructs a new instance and records its association to the passed-in channel.
* @param channel the channel to which this consumer is attached
*/
public DefaultConsumer(Channel channel) {
_channel = channel;
}
/**
* Stores the most recently passed-in consumerTag - semantically, there should be only one.
* @see Consumer#handleConsumeOk
*/
public void handleConsumeOk(String consumerTag) {
this._consumerTag = consumerTag;
}
/**
* No-op implementation of {@link Consumer#handleCancelOk}.
* @param consumerTag the defined consumer tag (client- or server-generated)
*/
public void handleCancelOk(String consumerTag) {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleCancel(String)}
* @param consumerTag the defined consumer tag (client- or server-generated)
*/
public void handleCancel(String consumerTag) throws IOException {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleShutdownSignal}.
*/
public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleRecoverOk}.
*/
public void handleRecoverOk(String consumerTag) {
// no work to do
}
/**
* No-op implementation of {@link Consumer#handleDelivery}.
*/
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
// no work to do
}
/**
* Retrieve the channel.
* @return the channel this consumer is attached to.
*/
public Channel getChannel() {
return _channel;
}
/**
* Retrieve the consumer tag.
* @return the most recently notified consumer tag.
*/
public String getConsumerTag() {
return _consumerTag;
}
}
消费端自定义监听代码演示
消费端自定义监听
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Classname MyConsumer
* @Description 自定义消费端
* @Date 2019/9/9 16:03
* @Created by Jiavg
*/
public class MyConsumer extends DefaultConsumer {
private Channel channel;
/**
* Constructs a new instance and records its association to the passed-in channel.
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// super.handleDelivery(consumerTag, envelope, properties, body);
System.out.println("consumerTag:" + consumerTag + "--此条消息已处理");
System.out.println("\t消息内容:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消费端代码实现
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Classname Consumer
* @Description 消费端自定义监听:消费端
* @Date 2019/9/8 10:48
* @Created by Jiavg
*/
public class Consumer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_Custom_Exchange";
String routingKey = "Custom.#";
String queueName = "test_Custom_Queue";
// 声明交换机、声明队列、绑定队列到交换机
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
MyConsumer myConsumer = new MyConsumer(channel);
channel.basicConsume(queueName, false, myConsumer);
}
}
生产端代码实现
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Classname Producer
* @Description 消费端自定义监听:生产端
* @Date 2019/9/8 10:50
* @Created by Jiavg
*/
public class Producer {
public static void main(String[] args) throws Exception{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_Custom_Exchange";
String routingKey = "Custom.save";
String msg = "Hello World send Custom Message";
for (int i = 0; i < 4; i++){
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
}
消费端控制台变换
