Exchanges(交换机)
Exchange:接收消息,并根据路由键转发消息所绑定的队列.
下图为RabbitMQ的整体工作示意图:
交换机属性:
- Name: 交换机名称
- Type: 交换机类型:direct,topic,fanout,headers
- Durability: 是否需要持久化,true为持久化
- Auto Delete:当最后一个绑定到ExChange的队列删除后,自动删除该Exchange
- Internal:当前Exchange是否用于RabbitMQ内部使用,默认为false
- Arguments:扩展参数,用于扩展AMQP协议自制定化使用
交换机类型
- direct
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的
匹配。如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发
abc.def,也不会转发dog.ghi,只会转发abc。 - topic
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多
不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.” 只会匹配到“abc.def”。 - fanout
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机
绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最
快的。 - headers
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组
键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完
全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值
对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。
匹配规则x-match有下列两种类型:
x-match = all :表示所有的键值对都匹配才能接受到消息
x-match = any :表示只要有键值对匹配就能接受到消息
Direct Exchange
所有发送到Direct Exchange的消息被转发到RouteKey中指定的Queue.
以下为Direct模式下Exchange的工作示意图:
注意:Direct模式可以使用RabbitMQ自带的Exchange:default Exchange,所以不需要将Exchange进行任何
绑定(binding)操作,消息传递时,RoutKey必须完全匹配才会被队列接收,否则该消息会被抛弃.
Direct Exchange演示
消费者代码
消费者代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @Classname Consumer
* @Description RabbitMQ的Direct模式:消费者
* @Date 2019/9/3 13:37
* @Created by Jiavg
*/
public class Consumer {
public static void main(String[] args) throws Exception{
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.通过连接工厂获得连接
Connection connection = connectionFactory.newConnection();
// 3.通过连接创建信道
Channel channel = connection.createChannel();
String exchangeName = "test_Direct_Exchange";
String exchangeType = "direct";
String queueName = "test_Direct_Queue";
String routeKey = "test.direct";
// 4.声明一个交换机
channel.exchangeDeclare(exchangeName,exchangeType,true, false, false, null);
// 5.声明一个队列
channel.queueDeclare(queueName, true, false, false, null);
// 6. 建立一个绑定关系
channel.queueBind(queueName, exchangeName, routeKey);
// 7.创建一个消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 8.设置信道参数:队列,是否自动ACK,Consumer
channel.basicConsume(queueName, true, consumer);
// 9.读取消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.println(msg);
}
}
}
消费者运行
在运行消费者后,RabbitMQ的管控台变化如下:
Connections视口
Channel视口
Exchanges视口
由上图可以观察到,声明的交换机(“test_Direct_Exchange”)为direct模式.
Queues视口
生产者代码
生产者代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Classname Producer
* @Description RabbitMQ的Direct模式:生产者
* @Date 2019/9/3 13:27
* @Created by Jiavg
*/
public class Producer {
public static void main(String[] args) throws Exception{
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.通过连接工厂获得连接
Connection connection = connectionFactory.newConnection();
// 3.通过连接创建信道
Channel channel = connection.createChannel();
// 4.声明交换机名称及路由键
String exchangeName = "test_Direct_Exchange";
String routeKey = "test.direct";
// 5.发送消息
String msg = "Hello World RabbitMQ 4 Direct Exchange message...";
channel.basicPublish(exchangeName, routeKey, null, msg.getBytes());
// 6.关闭资源
channel.close();
connection.close();
}
}
生产者运行
在运行生产者代码后,RabbitMQ的管控台界面如下图所示:
并且消费者控制台输出如下
可见,在交换机类型为direct模式时,RabbitMQ的交换机根据路由键将接受到的消息路由到相应的队列中,并
被相应的监听消费者消费该消息.
Topic Exchange
所有发送到Topic Exchange的消息被转发到所有关心RouteKey中指定的Topic Queue上
Exchange将RouteKey和某Topic进行模糊匹配,此时队列需要绑定一个Topic
以下为Topic模式下Exchange的工作示意图:
注意:可以使用通配符进行模糊匹配
符号: “#”匹配一个或多个词
符号: “*”匹配不多不少一个词
例如: “log.#”能够匹配到”log.info.oa”
“log.*”只会匹配到”log.error”
Topic Exchange演示
消费者代码
消费者代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @Classname Consumer
* @Description RabbitMQ的Topic模式:消费者
* @Date 2019/9/3 16:02
* @Created by Jiavg
*/
public class Consumer {
public static void main(String[] args) throws Exception{
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.通过连接工厂获得连接
Connection connection = connectionFactory.newConnection();
// 3.通过连接创建信道
Channel channel = connection.createChannel();
String exchangeName = "test_Topic_Exchange";
String exchangeType = "topic";
String routeKey = "test.*";
String queueName = "test_Topic_Queue";
// 4.声明交换机
channel.exchangeDeclare(exchangeName, exchangeType, true, false,false,null);
// 5.声明队列
channel.queueDeclare(queueName, true, false, false, null);
// 6.创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 7.绑定队列到交换机上(利用topic模式的routeKey)
channel.queueBind(queueName, exchangeName, routeKey);
// 8.设置参数
channel.basicConsume(queueName, true, queueingConsumer);
// 8.接收消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.printf(msg);
}
}
}
消费者运行
在运行消费者后,RabbitMQ的管控台变化如下:
Exchanges视口
由上图可以观察到,声明的交换机(“test_Topic_Exchange”)为topic模式.
生产者代码
生产者代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Classname Producer
* @Description RabbitMQ的Topic模式:生产者
* @Date 2019/9/3 15:56
* @Created by Jiavg
*/
public class Producer {
public static void main(String[] args) throws Exception{
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.通过连接工厂获得连接
Connection connection = connectionFactory.newConnection();
// 3.通过连接创建信道
Channel channel = connection.createChannel();
String msg1 = "Hello World Topic Exchange with routeKey for test.hello";
String msg2 = "Hello World Topic Exchange with routeKey for test.hello.world";
String msg3 = "Hello World Topic Exchange with routeKey for hello.world";
String routeKey1 = "test.hello";
String routeKey2 = "test.hello.world";
String routeKey3 = "hello.world";
String exchangeName = "test_Topic_Exchange";
// 4.发送消息
channel.basicPublish(exchangeName, routeKey1, null, msg1.getBytes());
channel.basicPublish(exchangeName, routeKey2, null, msg2.getBytes());
channel.basicPublish(exchangeName, routeKey3, null, msg3.getBytes());
// 5.关闭资源
channel.close();
connection.close();
}
}
生产者运行
消费者控制台输出如下
可见,在交换机类型为topic模式时,RabbitMQ的交换机根据生产者发送消息的路由键将接受到的消息根据
topic模式的路由键路由到相应的队列中,并被相应的监听消费者消费该消息.(注意通配符的模糊匹配)
Fanout Exchange
- 不处理路由键,只需要简单的将队列绑定到交换机上
- 发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
- Fanout交换机转发消息是最快的
以下为Fanout模式下Exchange的工作示意图:
Fanout Exchange演示
消费者代码
消费者代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
/**
* @Classname Consumer
* @Description RabbitMQ的Fanout模式:消费者
* @Date 2019/9/3 17:14
* @Created by Jiavg
*/
public class Consumer {
public static void main(String[] args) throws Exception{
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.通过连接工厂获得连接
Connection connection = connectionFactory.newConnection();
// 3.通过连接创建信道
Channel channel = connection.createChannel();
String exchangeName = "test_Fanout_Exchange";
String exchangeType = "fanout";
String queueName = "test_Fanout_Queue";
String routeKey = "";
channel.exchangeDeclare(exchangeName, exchangeType, true, false,false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routeKey);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
byte[] body = delivery.getBody();
String msg = new String(body);
System.out.printf(msg);
}
}
}
消费者运行
在运行消费者后,RabbitMQ的管控台变化如下:
Exchanges视口
由上图可以观察到,声明的交换机(“test_Fanout_Queue”)为fanout模式.
生产者代码
生产者代码如下
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Classname Producer
* @Description RabbitMQ的Fanout模式:生产者
* @Date 2019/9/3 22:14
* @Created by Jiavg
*/
public class Producer {
public static void main(String[] args) throws Exception{
// 1.创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 2.通过连接工厂获得连接
Connection connection = connectionFactory.newConnection();
// 3.通过连接创建信道
Channel channel = connection.createChannel();
String exchangeName = "test_Fanout_Exchange";
String routeKey = "";
String msg = "Hello World Fanout Exchange";
channel.basicPublish(exchangeName, routeKey, null, msg.getBytes());
}
}
生产者运行
消费者控制台输出如下
可见,在交换机类型为fanout模式时,RabbitMQ不处理路由键,只需要简单的将队列绑定到交换机上,发送
到交换机的消息都会被转发到与该交换机绑定的所有队列上.