RabbitMQ快速入门

RabbitMQ入门

RabbitMQ 中 Connection 和 Channel

我们知道无论是生产者还是消费者,都需要和 RabbitMQ Broker 建立连接,这个连接就是一条 TCP 连接,也就是 Connection。

一旦 TCP 连接建立起来,客户端紧接着可以创建一个 AMQP 信道(Channel),每个信道都会被指派一个唯一的 ID。

信道是建立在 Connection 之上的虚拟连接,RabbitMQ 处理的每条 AMQP 指令都是通过信道完成的。

我们完全可以使用 Connection 就能完成信道的工作,为什么还要引入信道呢?

试想这样一个场景,一个应用程序中有很多个线程需要从 RabbitMQ 中消费消息,或者生产消息,那么必然需要建立很多个 Connection,也就是多个 TCP 连接。然而对于操作系统而言,建立和销毁 TCP 连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。

RabbitMQ 采用类似 NIO(Non-blocking I/O)的做法,选择 TCP 连接复用,不仅可以减少性能开销,同时也便于管理。

每个线程把持一个信道,所以信道复用了 Connection 的 TCP 连接。同时 RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的
流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是信道本身的流量很大时,这时候多个信道复用一个
Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关
的调优策略需要根据业务自身的实际情况进行调节。

信道在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。
比如 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。
RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

名词解释:
NIO,也称非阻塞 I/O,包含三大核心部分:Channel(信道)、Buffer(缓冲区)和 Selector(选择器)。
NIO 基于 Channel 和 Buffer 进行操作,数据总是从信道读取数据到缓冲区中,或者从缓冲区写入到信道中。
Selector 用于监听多个信道的时间(比如连接打开,数据到达等)。因此,单线程可以监听多个数据的信道。

创建Java项目

在pom文件中引入RabbitMQ依赖项:

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>3.6.5</version>
</dependency>

创建消费者

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @Classname Consumer
 * @Description RabbitMQ消费者
 * @Date 2019/9/2 11:02
 * @Created by Jiavg
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1.创建一个ConnectionFactory,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        // 2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过connection创建一个Channel
        Channel channel = connection.createChannel();

        // 4.声明(创建)一个队列
        String queueName = "test001";
        // queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments)
        // param queue the name of the queue
        //  @param durable true if we are declaring a durable queue (the queue will survive a server restart)
        //  @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
        //  @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
        //  @param arguments other properties (construction arguments) for the queue
        channel.queueDeclare(queueName, true, false,false,null);

        // 5.创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 6.设置Channel
        channel.basicConsume(queueName, true, queueingConsumer);

        // 7.获取消息
        while (true){
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

            byte[] body = delivery.getBody();
            String msg = new String(body);
            System.out.println("消费者:" + msg);
        }
    }
}

运行消费者类

在运行消费者类Consumer后,RabbitMQ的浏览器管控台信息状况如下图所示:

运行消费者类之前:

由上图可以看到:

  • 在全局参数(Global counts)中:连接数量(Connections)为0,信道(Channels)数量为0,队列(Queues)数量为0,以及消费者(Consumers)数量为0

    运行消费者之后

    由上图可以看到:

  • 在全局参数(Global counts)中:连接数量(Connections)为1,信道(Channels)数量为1,队列(Queues)数
    量为1,以及消费者(Consumers)数量为1.

可以了解到:

  • channel.queueDeclare(queueName, true, false,false,null);
    此行代码创建了一个名字为”test001”(即queueName变量值)的队列,且与RabbitMQ保持一个连接,此连接内包含一个信道.

  • QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, queueingConsumer);
    这两行代码创建一个消费者,并把消费者绑定到已创建的队列上.

创建生产者类

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Classname Producer
 * @Description RabbitMQ生产者
 * @Date 2019/9/2 10:54
 * @Created by Jiavg
 */
public class Producer {

    public static void main(String[] args) throws Exception { 

        // 1.创建一个ConnectionFactory,并进行配置
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");

        // 2.通过连接工厂创建连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过connection创建一个Channel
        Channel channel = connection.createChannel();

        for (int i = 0; i < 5; i++){
            String message = "Hello RabbitMQ!";
            // 4.通过Channel发送数据
            // basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            channel.basicPublish("", "test001", null, message.getBytes());
        }

        // 5.关闭相关连接
        channel.close();
        connection.close();;

    }

}

运行生产者类

在运行生产者类Producer后,RabbitMQ的浏览器管控台信息状况如下图所示:

运行生产者类之前:

由上图可以看到:

  • 消息队列(Queued messages)为空,消息消费速率(Message rates)也为0

    运行生产者之后

    且消费者控制台显示:

    由上图可以看到:

  • 消息队列(Queued messages)为空,消息消费速率(Message rates)迅速上升并下降

  • 消费者接收到生产者发送的信息

以上便是RabbitMQ快速入门.


   转载规则


《RabbitMQ快速入门》 Jiavg 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录