• RabbitMQ(任务模型,交换机(广播,订阅,通配符订阅))


    一.WorkQueues模型

    WorkQueues(任务模式):让多个消费者绑定到一个队列,共同消费队列中的消息

    架构:

    所需场景:

    当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。 此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

    1.新建队work.queue

    2.生产者模块循环发送消息
    1. @Test
    2. void testWorkQueue() throws InterruptedException {
    3. String queueName = "work.queue";
    4. for (int i = 1; i <= 50; i++) {
    5. String msg = "hello, worker, message_" + i;
    6. rabbitTemplate.convertAndSend(queueName, msg);
    7. Thread.sleep(20);
    8. }
    9. }
    3.消费者模块模拟多个消费者绑定该队列
    1. @RabbitListener(queues = "work.queue")
    2. public void listenWorkQueueMsg1(String msg) throws InterruptedException {
    3. System.out.println("消费者一接收到work.queue的信息" + msg+ LocalDateTime.now());
    4. Thread.sleep(20);
    5. }
    6. @RabbitListener(queues = "work.queue")
    7. public void listenWorkQueueMsg2(String msg) throws InterruptedException {
    8. System.out.println("消费者2接收到work.queue的信息" + msg+LocalDateTime.now());
    9. Thread.sleep(200);
    10. }

    消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

    消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

    4.测试

    测试结果中:尽管给消费者二设置了比消费者一十倍长的休眠时间。但是两个消费者的处理信息个数是相同的。且消费者一很快就处理完消息,而剩下的消息只有消费者二去消费。并没有充分利用每一个消费者的能力。

    可以通过配置消费者模块的yml文件解决该问题

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    5.二次测试

    6.总结:

    Work模型的使用:

    • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

    • 通过设置prefetch来控制消费者预取的消息数量

    二.Exchange(交换机)

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失。

    交换机的类型有四种:

    • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

    • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

    • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

    • Headers:头匹配,基于MQ的消息头匹配,用的较少。

    三.Fanout交换机(广播)

    架构:

    广播模式发送流程:

    • 1)  可以有多个队列

    • 2)  每个队列都要绑定到Exchange(交换机)

    • 3)  生产者发送的消息,只能发送到交换机

    • 4)  交换机把消息发送给绑定过的所有队列

    • 5)  订阅队列的消费者都能拿到消息

    一.创建两个队列:fanout.queue1,fanout.queue2

    二.创建一个交互机:fanout

    三.将队列绑定至交互机

    四.编写测试代码

    生产者端:

    1. @Test
    2. void testSendFanout() {
    3. String exchangeName = "hmall.fanout";//交换机
    4. String msg = "hello, everyone!";
    5. rabbitTemplate.convertAndSend(exchangeName, null, msg);
    6. }

    消费端:

    1. @RabbitListener(queues = "fanout.queue1")
    2. public void listenFanoutQueue1(String msg) throws InterruptedException {
    3. System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg +"】");
    4. }
    5. @RabbitListener(queues = "fanout.queue2")
    6. public void listenFanoutQueue2(String msg) throws InterruptedException {
    7. System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg +"】");
    8. }

    测试结果:

    五.总结

    交换机的作用

    • 接收publisher发送的消息

    • 将消息按照规则路由到与之绑定的队列

    • 不能缓存消息,路由失败,消息丢失

    • FanoutExchange的会将消息路由到每个绑定的队列

    四.Direct交换机(订阅)

    在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

    结构:

    在Direct模型下

    • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

    • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

    • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

    一.测试逻辑:
    1. 声明一个名为hmall.direct的交换机

    2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred

    3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred

    4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

    5. 在publisher中编写测试方法,向hmall.direct发送消息

    二.创建两个队列direct.queue1和direct.queue2

    三.创建一个direct类型交换机 hmall.direct并绑定队列

    四.编写测试类

    消费模块:

    1. @RabbitListener(queues = "direct.queue1")
    2. public void listenDirectQueue1(String msg){
    3. System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg +"】");
    4. }
    5. @RabbitListener(queues = "direct.queue2")
    6. public void listenDirectQueue2(String msg){
    7. System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg +"】");
    8. }

    生产模块:

    1. @Test
    2. void testSendDirect() {
    3. String exchangeName = "hmall.direct";
    4. String msg = "蓝色通知,警报解除,哥斯拉是放的气球";
    5. rabbitTemplate.convertAndSend(exchangeName, "blue", msg);
    6. }

    测试结果:

    rabbitTemplate.convertAndSend(exchangeName, "blue", msg)改为rabbitTemplate.convertAndSend(exchangeName, "red", msg)

    测试结果:

    五.总结:

    Direct交换机与Fanout交换机的差异

    • Fanout交换机将消息路由给每一个与之绑定的队列

    • Direct交换机根据RoutingKey判断路由给哪个队列

    • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

    五.Topic交换机

    架构:

    Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。 只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

    BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

    通配符规则:

    • #:匹配一个或多个词

    • *:匹配不多不少恰好1个词

    举例:

    • item.#:能够匹配item.spu.insert 或者 item.spu

    • item.*:只能匹配item.spu

    一.测试逻辑

    假如此时生产者发送的消息使用的RoutingKey共有四种:

    • china.news代表有中国的新闻消息;

    • china.weather 代表中国的天气消息;

    • USA.news 则代表美国新闻

    • USA.weather 代表美国的天气消息;

    解释:

    • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

      • china.news

      • china.weather

    • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

      • china.news

      • USA.news

    二.绑定交互机与队列

    三.编写测试代码

    消费者:

    1. @RabbitListener(queues = "topic.queue1")
    2. public void listenTopicQueue1(String msg){
    3. System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
    4. }
    5. @RabbitListener(queues = "topic.queue2")
    6. public void listenTopicQueue2(String msg){
    7. System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
    8. }

    生产者:

    1. @Test
    2. void testSendTopic() {
    3. String exchangeName = "hmall.topic";
    4. String msg = "今天天气挺不错,我的心情的挺好的";
    5. rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);
    6. rabbitTemplate.convertAndSend(exchangeName,"USA.news","美国新闻");
    7. rabbitTemplate.convertAndSend(exchangeName,"china.news","中国新闻");
    8. }

    测试结果

    四.总结

    Direct交换机与Topic交换机的差异

    • Topic交换机接收的消息RoutingKey必须是多个单词,以 **.** 分割

    • Topic交换机与队列绑定时的bindingKey可以指定通配符

    • #:代表0个或多个词

    • *:代表1个词

  • 相关阅读:
    PaddleOCR服务化部署
    Redis常用的八种场景
    LeetCode每日一练 —— CM11 链表分割
    计算机网络(第一弹) --- 一篇关于协议的博客
    Roson的Qt之旅 #122 Qt信号与槽的所有connect方法说明
    第一课 概念介绍
    Java+Swing形成GUI图像界面
    【TS】Error: Property ‘click‘ does not exist on type ‘Element‘
    Docker学习路线
    linux apt-get安装Jenkins
  • 原文地址:https://blog.csdn.net/qq_52351978/article/details/136495234