RabbitMQ-Exchange

Exchanges(交换机)

Exchange:接收消息,并根据路由键转发消息所绑定的队列.
下图为RabbitMQ的整体工作示意图:

交换机属性:

  • Name: 交换机名称
  • Type: 交换机类型:direct,topic,fanout,headers
  • Durability: 是否需要持久化,true为持久化
  • Auto Delete:当最后一个绑定到ExChange的队列删除后,自动删除该Exchange
  • Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
  • Arguments:扩展参数,用于扩展AMQP协议自制定化使用

交换机类型

  • direct
    处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的
    匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发
    abc.def,也不会转发dog.ghi,只会转发abc。
  • topic
    将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多
    不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。
  • fanout
    不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机
    绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最
    快的。
  • headers
    不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组
    键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完
    全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值
    对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
    匹配规则x-match有下列两种类型:
    x-match = all :表示所有的键值对都匹配才能接受到消息
    x-match = any :表示只要有键值对匹配就能接受到消息

Direct Exchange

所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue.
以下为Direct模式下Exchange的工作示意图:

注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何
绑定(binding)操作,消息传递时,RoutKey必须完全匹配才会被队列接收,否则该消息会被抛弃.

Direct Exchange演示

消费者代码

消费者代码如下


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @Classname Consumer
 * @Description RabbitMQ的Direct模式:消费者
 * @Date 2019/9/3 13:37
 * @Created by Jiavg
 */
public class Consumer {

    public static void main(String[] args) throws Exception{

        // 1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2.通过连接工厂获得连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接创建信道
        Channel channel = connection.createChannel();

        String exchangeName = "test_Direct_Exchange";
        String exchangeType = "direct";
        String queueName = "test_Direct_Queue";
        String routeKey = "test.direct";

        // 4.声明一个交换机
        channel.exchangeDeclare(exchangeName,exchangeType,true, false, false, null);

        // 5.声明一个队列
        channel.queueDeclare(queueName, true, false, false, null);

        // 6. 建立一个绑定关系
        channel.queueBind(queueName, exchangeName, routeKey);

        // 7.创建一个消费者
        QueueingConsumer consumer = new QueueingConsumer(channel);

        // 8.设置信道参数:队列,是否自动ACK,Consumer
        channel.basicConsume(queueName, true, consumer);

        // 9.读取消息
        while (true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();

            byte[] body = delivery.getBody();
            String msg = new String(body);
            System.out.println(msg);
        }                        
    }    
}

消费者运行

在运行消费者后,RabbitMQ的管控台变化如下:
Connections视口

Channel视口

Exchanges视口

由上图可以观察到,声明的交换机(“test_Direct_Exchange”)为direct模式.

Queues视口

生产者代码

生产者代码如下


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Classname Producer
 * @Description RabbitMQ的Direct模式:生产者
 * @Date 2019/9/3 13:27
 * @Created by Jiavg
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        // 1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2.通过连接工厂获得连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接创建信道
        Channel channel = connection.createChannel();

        // 4.声明交换机名称及路由键
        String exchangeName = "test_Direct_Exchange";
        String routeKey = "test.direct";

        // 5.发送消息
        String msg = "Hello World RabbitMQ 4 Direct Exchange message...";
        channel.basicPublish(exchangeName, routeKey, null, msg.getBytes());

        // 6.关闭资源
        channel.close();
        connection.close();
    }    
}

生产者运行

在运行生产者代码后,RabbitMQ的管控台界面如下图所示:

并且消费者控制台输出如下

可见,在交换机类型为direct模式时,RabbitMQ的交换机根据路由键将接受到的消息路由到相应的队列中,并
被相应的监听消费者消费该消息.

Topic Exchange

  • 所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定的Topic Queue上

  • Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
    以下为Topic模式下Exchange的工作示意图:

    注意:可以使用通配符进行模糊匹配
    符号: “#”匹配一个或多个词
    符号: “*”匹配不多不少一个词
    例如: “log.#”能够匹配到”log.info.oa”
    “log.*”只会匹配到”log.error”

Topic Exchange演示

消费者代码

消费者代码如下


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @Classname Consumer
 * @Description RabbitMQ的Topic模式:消费者
 * @Date 2019/9/3 16:02
 * @Created by Jiavg
 */
public class Consumer {

    public static void main(String[] args) throws Exception{

        // 1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2.通过连接工厂获得连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接创建信道
        Channel channel = connection.createChannel();

        String exchangeName = "test_Topic_Exchange";
        String exchangeType = "topic";
        String routeKey = "test.*";
        String queueName = "test_Topic_Queue";

        //  4.声明交换机 
        channel.exchangeDeclare(exchangeName, exchangeType, true, false,false,null);

        // 5.声明队列
        channel.queueDeclare(queueName, true, false, false, null);

        // 6.创建消费者
        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        // 7.绑定队列到交换机上(利用topic模式的routeKey)
        channel.queueBind(queueName, exchangeName, routeKey);

        // 8.设置参数
        channel.basicConsume(queueName, true, queueingConsumer);

        // 8.接收消息
        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

            byte[] body = delivery.getBody();
            String msg = new String(body);
            System.out.printf(msg);
        }        
    }    
}

消费者运行

在运行消费者后,RabbitMQ的管控台变化如下:

Exchanges视口

由上图可以观察到,声明的交换机(“test_Topic_Exchange”)为topic模式.

生产者代码

生产者代码如下


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Classname Producer
 * @Description RabbitMQ的Topic模式:生产者
 * @Date 2019/9/3 15:56
 * @Created by Jiavg
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        // 1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2.通过连接工厂获得连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接创建信道
        Channel channel = connection.createChannel();

        String msg1 = "Hello World Topic Exchange with routeKey for test.hello";
        String msg2 = "Hello World Topic Exchange with routeKey for test.hello.world";
        String msg3 = "Hello World Topic Exchange with routeKey for hello.world";

        String routeKey1 = "test.hello";
        String routeKey2 = "test.hello.world";
        String routeKey3 = "hello.world";

        String exchangeName = "test_Topic_Exchange";

        // 4.发送消息
        channel.basicPublish(exchangeName, routeKey1, null, msg1.getBytes());
        channel.basicPublish(exchangeName, routeKey2, null, msg2.getBytes());
        channel.basicPublish(exchangeName, routeKey3, null, msg3.getBytes());

        // 5.关闭资源
        channel.close();
        connection.close();        
    }    
}

生产者运行

消费者控制台输出如下

可见,在交换机类型为topic模式时,RabbitMQ的交换机根据生产者发送消息的路由键将接受到的消息根据
topic模式的路由键路由到相应的队列中,并被相应的监听消费者消费该消息.(注意通配符的模糊匹配)

Fanout Exchange

  • 不处理路由键,只需要简单的将队列绑定到交换机上
  • 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
  • Fanout交换机转发消息是最快的
    以下为Fanout模式下Exchange的工作示意图:

Fanout Exchange演示

消费者代码

消费者代码如下


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * @Classname Consumer
 * @Description RabbitMQ的Fanout模式:消费者
 * @Date 2019/9/3 17:14
 * @Created by Jiavg
 */
public class Consumer {

    public static void main(String[] args) throws Exception{

        // 1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2.通过连接工厂获得连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接创建信道
        Channel channel = connection.createChannel();   

        String exchangeName = "test_Fanout_Exchange";
        String exchangeType = "fanout";

        String queueName = "test_Fanout_Queue";
        String routeKey = "";

        channel.exchangeDeclare(exchangeName, exchangeType, true, false,false, null);

        channel.queueDeclare(queueName, true, false, false, null);

        channel.queueBind(queueName, exchangeName, routeKey);

        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

        channel.basicConsume(queueName, true, queueingConsumer);

        while (true) {
            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

            byte[] body = delivery.getBody();
            String msg = new String(body);
            System.out.printf(msg);
        }        
    }    
}

消费者运行

在运行消费者后,RabbitMQ的管控台变化如下:

Exchanges视口

由上图可以观察到,声明的交换机(“test_Fanout_Queue”)为fanout模式.

生产者代码

生产者代码如下


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Classname Producer
 * @Description RabbitMQ的Fanout模式:生产者
 * @Date 2019/9/3 22:14
 * @Created by Jiavg
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        // 1.创建ConnectionFactory
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        // 2.通过连接工厂获得连接
        Connection connection = connectionFactory.newConnection();

        // 3.通过连接创建信道
        Channel channel = connection.createChannel();

        String exchangeName = "test_Fanout_Exchange";
        String routeKey = "";

        String msg = "Hello World Fanout Exchange";

        channel.basicPublish(exchangeName, routeKey, null, msg.getBytes());     
    }   
}

生产者运行

消费者控制台输出如下

可见,在交换机类型为fanout模式时,RabbitMQ不处理路由键,只需要简单的将队列绑定到交换机上,发送
到交换机的消息都会被转发到与该交换机绑定的所有队列上.


   转载规则


《RabbitMQ-Exchange》 Jiavg 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录