• RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)


    RabbitMQ第三个实操小案例——发布者/订阅者(Publish/Subscribe)、广播交换器(FanoutExchange)

    发布者/订阅者 模型如下:
    在这里插入图片描述

    他与前面两个小案例最大的区别就是,他的消息不是阅完即焚的。他允许将同一条消息发送给多个消费者。而实现此操作的原因是加入了我们的交换机(exchange)。

    在发布者和订阅者的模型中,各个组件的功能如下

    • Publisher:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
    • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有以下3种类型:
      • Fanout:广播,将消息交给所有绑定到交换机的队列
      • Direct:定向,把消息交给符合指定routing key 的队列
      • Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
    • Queue:消息队列也与以前一样,接收消息、缓存消息。
    • Consumer:消费者,与以前一样,订阅队列,没有变化

    注意:交换机他只负责消息的转发,并不存储消息,如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!!

    OK,这么解释肯定是不够的,下面我们就来说一下第一种交换机类型——Fanout(广播)在Java中的具体使用方式

    写法一、配置类配置方式

    步骤一、在消费者服务中,利用代码声明队列、交换机,并将两者进行绑定。
    SpringAMQP提供的**交换机(Exchange)、队列(Queue)、绑定(Binding)**的API如下:
    在这里插入图片描述
    要将我们的队列绑定到交换机,我们需要编写我们的配置类如下:

    package com.demo.mq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutConfig {
        /**
         *  声明FanoutExchange(广播交换机)
         */
        @Bean
        public FanoutExchange fanoutExchange(){
            //交换机的名称
            return new FanoutExchange("exchange.fanout");
        }
    
        /**
         *  声明第一个队列
         */
        @Bean
        public Queue fanoutQueue1(){
            return new Queue("fanout.queue1");
        }
    
        /**
         *  声明第二个队列
         */
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        /**
         *  绑定 队列1 到 交换机
         */
        @Bean
        public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
        }
    
        /**
         *  绑定 队列2 到 交换机
         */
        @Bean
        public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    写完配置类,我们重启一下我们的消费者服务类,然后我们到RabbitMQ上看一下我们的交换机和队列。

    可以看到,确实多了一个交换机叫 exchange.fanout。
    在这里插入图片描述

    我们再看一下队列,可以看到,我们两个队列也都注册成功了。
    在这里插入图片描述
    点击我们刚才新增的交换机,打开它的Bindings,可以看到这个交换机他告诉我们,他的消息是会转发到 fanout.queue1 和 fanout.queue2中:
    在这里插入图片描述

    ok,我们接着往下写:

    **步骤二、在消费者服务中,编写两个消费者方法,分别监听 fanout.queue1 和 fanout.queue2。 **.

    监听的方法,现在应该已经写得滚瓜烂熟了吧,这里就直接贴代码了。
    1、编写的类记得加 @Component 将这个监听的类注册到 Spring容器中。
    2、监听哪个queue,那么就写对应的方法,并在方法上方添加@RabbitListener注解,用queues属性标明要监听的queue即可。(如果有多个,那么用 @RabbitListener(queues = {“queueName1”, “queueName2”})表示即可。

    @Component
    public class SpringRabbitListener {
        @RabbitListener(queues = "fanout.queue1")
        public void listenFanoutQueue1(String msg){
            System.out.println("监听到 fanout.queue1 的消息为:【"+ msg +"】");
        }
    
        @RabbitListener(queues = "fanout.queue2")
        public void listenFanoutQueue2(String msg){
            System.out.println("监听到 fanout.queue2 的消息为:【"+ msg +"】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    步骤三、在发布者服务中,编写测试方法,向交换机 exchange.fanout 发送消息。

        @Test
        public void testFanoutExchange(){
            //交换机名称
            String exchangeName = "exchange.fanout";
            //消息
            String msg = "Hello,av8d!";
            //发送消息
            rabbitTemplate.convertAndSend(exchangeName, "", msg);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这里的rabbitTemplate.convertAndSend接受三个参数,分别是

    public void convertAndSend(String exchange,
                               String routingKey,
                               Object object)
    
    • 1
    • 2
    • 3
    1. exchange:交换机的名称
    2. routeKey:routeKey值(还不需要用到,先不管他,给个"")
    3. object:发送的消息

    写完测试方法,我们跑一下我们的测试方法,然后看一下我们消费者的控制台如下:
    在这里插入图片描述
    可以看到,只发布了一条消息,但是通过交换机发布给两个Queue后,我们消费者的两个方法都监听到了我们同一条消息。

    写法二、注解方式(@RabbitListener)

    如果以前尝试了上面的写法,记得把配置类的 @Configuration 注释掉

    //@Configuration
    public class FanoutConfig {
    ...
    }
    
    • 1
    • 2
    • 3
    • 4

    然后把刚才写的两个方法注释掉。

    /**
        @RabbitListener(queues = "fanout.queue1")
        public void listenFanoutQueue1(String msg){
            System.out.println("监听到 fanout.queue1 的消息为:【"+ msg +"】");
        }
    
        @RabbitListener(queues = "fanout.queue2")
        public void listenFanoutQueue2(String msg){
            System.out.println("监听到 fanout.queue2 的消息为:【"+ msg +"】");
        }
    **/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    接下来,我们就开始写我们使用注解声明队列的方法。

    步骤一、配置我们的RabbitMQ。(只要使用RabbitMQ,都必须要配置)

    spring:
      rabbitmq:
        host: 192.168.83.134
        port: 5672
        virtual-host: /
        username: admin
        password: root
        listener:
          simple:
            prefetch: 1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    步骤二、直接写我们的监听方法。(使用@RabbitListener注解写我们的路由方式、路由名称以及我们的队列名即可)

    @Component
    public class SpringRabbitListener {
    	@RabbitListener(bindings = @QueueBinding(
    	        value = @Queue(name = "fanout.queue1"),
    	        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
    	))
    	public void listenFanoutQueue1(String msg){
    	    System.out.println("监听到 fanout.queue1 的消息为:【" + msg+"】");
    	}
    	
    	@RabbitListener(bindings = @QueueBinding(
    	        value = @Queue(name = "fanout.queue2"),
    	        exchange = @Exchange(name = "exchange.fanout", type = ExchangeTypes.FANOUT)
    	))
    	public void listenFanoutQueue2(String msg){
    	    System.out.println("监听到 fanout.queue2 的消息为:【" + msg+"】");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    工作流的例子
    一文解开主流开源变更数据捕获技术之Flink CDC的入门使用
    数据结构八大排序Java源码
    ASP.NET Core - 依赖注入(四)
    npm error File “xxx\.node-gyp\18.18.2\include\node\common.gypi“, line 1
    基于单片机的太阳能灯(声控)电路设计(#0221)
    FasterRCNN入门案例水稻图像目标检测新手友好入门案例
    已解决Python配置环境变量失效问题
    【花雕动手做】有趣好玩的音乐可视化系列小项目(15)--横排LED方管灯
    STC89C51基础及项目第10天:LCD显示字符(非标协议外设)
  • 原文地址:https://blog.csdn.net/weixin_44741023/article/details/127824789