在上文中,都没有交换机,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化:
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
交换机的类型有四种:
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:
Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。在广播模式下,消息发送流程是这样的:
我们的计划是这样的:
在publisher服务的SpringAmqpTest类中添加测试方法:
- @Test
- public void testFanoutExchange() {
- // 交换机名称
- String exchangeName = "Rys.fanout";
- // 消息
- String message = "hello, everyone!";
- rabbitTemplate.convertAndSend(exchangeName, "", message);
- }
在consumer服务的SpringRabbitListener中添加两个方法,作为消费者:
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1(String msg) {
- System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2(String msg) {
- System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
- }
交换机的作用是什么?
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在Direct模型下:
案例需求如图:
在consumer服务的SpringRabbitListener中添加方法:
- @RabbitListener(queues = "direct.queue1")
- public void listenDirectQueue1(String msg) {
- System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
- }
-
- @RabbitListener(queues = "direct.queue2")
- public void listenDirectQueue2(String msg) {
- System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
- }
在publisher服务的SpringAmqpTest类中添加测试方法:
- @Test
- public void testSendDirectExchange() {
- // 交换机名称
- String exchangeName = "Rys.direct";
- // 消息
- String message = "红色警报!!";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "red", message);
- }
由于使用的red这个key,所以两个消费者都收到了消息:我们再切换为blue这个key:
- @Test
- public void testSendDirectExchange() {
- // 交换机名称
- String exchangeName = "Rys.direct";
- // 消息
- String message = "蓝色预警";
- // 发送消息
- rabbitTemplate.convertAndSend(exchangeName, "blue", message);
- }
你会发现,只有消费者1收到了消息:
描述下Direct交换机与Fanout交换机的差异?
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!
BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert
通配符规则:
举例:
假如此时publisher发送的消息使用的RoutingKey共有四种:
在publisher服务的SpringAmqpTest类中添加测试方法:
@Test public void testTopicExchange() { // 交换机名称 String exchangeName = "RysTopic"; // 消息 String message = "明天下大暴雨!!!"; rabbitTemplate.convertAndSend(exchangeName, "china.news", message); }
@RabbitListener(queues = "topic.queue1") public void listenTopicQueue1(String msg) { System.out.println("消费者1收到了topic.queue1的消息......:【" + msg +"】"); } @RabbitListener(queues = "topic.queue2") public void listenTopicQueue2(String msg) { System.err.println("消费者2收到了topic.queue2的消息......:【" + msg +"】"); }
描述下Direct交换机与Topic交换机的差异?