• RabbitMQ 总结


    Rabbitmq

    架构简介

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n2oJNzGY-1667380578167)(C:\Users\谢顺\AppData\Roaming\Typora\typora-user-images\image-20220611165218458.png)]

    1. 生产者(Publisher):发布消息到 RabbitMQ 中的交换机(Exchange)上。
    2. 交换机(Exchange):和生产者建立连接并接收生产者的消息。
    3. 消费者(Consumer):监听 RabbitMQ 中的 Queue 中的消息。
    4. 队列(Queue):Exchange 将消息分发到指定的 Queue,Queue 和消费者进行交互。
    5. 路由(Routes):交换机转发消息到队列的规则。

    六种消息分发方式和一种消息确认

    1.hello word

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Mvmgk9Xq-1667380578171)(C:\Users\谢顺\AppData\Roaming\Typora\typora-user-images\image-20220611165004200.png)]

    提供一个生产者一个队列以及一个消费者,有一个默认的交换机无需自己写

    默认的直连交换机(DirectExchange),DirectExchange 的路由策略是将消息 队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。

    2.Work queues

    提供一个生产者,一个默认的交换机(DirectExchange),一个队列,两个消费者

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X9u5D4qv-1667380578173)(C:\Users\谢顺\AppData\Roaming\Typora\typora-user-images\image-20220611165658962.png)]

    一个队列对应了多个消费者,默认情况下,由队列对消息进行平均分配,消息会被分到不同的消费者手 中。消费者可以配置各自的并发能力,进而提高消息的消费能力,也可以配置手动 ack,来决定是否要 消费某一条消息。

    @RabbitListener(queues = HelloWorldConfig.HELLO_WORLD_QUEUE_NAME,concurrency = "10")

    我配置了 concurrency 为 10,将会同时存在 10 个子线程去消费消息,即产生10个通道。此时生产者发送 10 条消息,就会一下都被消费掉。

    3.publish/subscrite(订阅模式)

    一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。需要注意的是,如果将消息发送到一个没有队列绑定的 Exchange上面,那么 该消息将会丢失,这是因为在 RabbitMQ 中 Exchange 不具备存储消息的能力,只有队列具备存储消息 的能力

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EAt6OxJR-1667380578176)(C:\Users\谢顺\AppData\Roaming\Typora\typora-user-images\image-20220611170326825.png)]

    有四种交换机可供选择,分别是: Direct, Fanout, Topic, Header

    3.1 Direct(直连)

    DirectExchange 的路由策略是将消息队列绑定到一个 DirectExchange 上,当一条消息到达 DirectExchange 时会被转发到与该条消息 routing key 相同名的 Queue 上,例如消息队列名为 “hello-queue”,则 routingkey 为 “hello-queue” 的消息会被该消息队列接收。

    交换机和多个队列以及路由规则

    package com.qfedu.producer01.config;
    
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class DirectConfig {
        public static final String MY_DIRECT_EXCHANGE_NAME ="my_direct_exchange_name";
        public static final String MY_DIRECT_QUEUE_NAME_01 ="my_direct_queue_name_01";
        public static final String MY_DIRECT_QUEUE_NAME_02 ="my_direct_queue_name_02";
    
        @Bean
        Queue directQueue01(){
        /**参数解释
             * 队列的名字
             * 队列是否持久化
             * 排他性,具有排他性的队列,只能是哪个连接创建的该队列,哪个连接才能操作该队列
             * 是否自动删除:如果没有人监听这个队列,是否自动删除这个队列
             */
            return new Queue(MY_DIRECT_QUEUE_NAME_01, true, false, false);
        }
        @Bean
        Queue directQueue02(){
            return new Queue(MY_DIRECT_QUEUE_NAME_02, true, false, false);
        }
        @Bean
        DirectExchange directExchange(){
            //交换机的名字
            //是否具有持久性,重启后依然有效
            //交换机上没有绑定队列时,是否自动删除该交换机
            return new DirectExchange(MY_DIRECT_EXCHANGE_NAME, true, false);
        }
    //设置交换机和队列关系,路由规则
        @Bean
        Binding directBing01(){
            return BindingBuilder
                    //指定要绑定的队列
                    .bind(directQueue01())
                    //指定交换机
                    .to(directExchange())
                    //就用队列名作为routingKey
                    .with(MY_DIRECT_QUEUE_NAME_01);
        }
    
        @Bean
        Binding directBing02(){
            return BindingBuilder
                    //指定要绑定的队列
                    .bind(directQueue02())
                    //指定交换机
                    .to(directExchange())
                    //就用队列名作为rotingKey
                    .with(MY_DIRECT_QUEUE_NAME_02);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    多个消费者

    package com.qfedu.consumer01.consumer;
    
    import com.qfedu.consumer01.config.DirectConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class DirectConsumer {
        @RabbitListener(queues=DirectConfig.MY_DIRECT_QUEUE_NAME_02)
        public void handleMsg2(String msg){
            System.out.println("handleMsg2 = " + msg);
        }
        @RabbitListener(queues=DirectConfig.MY_DIRECT_QUEUE_NAME_01)
        public void handleMsg1(String msg){
            System.out.println("handleMsg1 = " + msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    生产者

    @Test
    	public void test02() {
    		rabbitTemplate.convertAndSend(DirectConfig.MY_DIRECT_EXCHANGE_NAME,DirectConfig.MY_DIRECT_QUEUE_NAME_01,"hello queue01");
    	}
    	@Test
    	public void test03() {
              rabbitTemplate.convertAndSend(DirectConfig.MY_DIRECT_EXCHANGE_NAME,DirectConfig.MY_DIRECT_QUEUE_NAME_02,"hello queue02");
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    3.2 扇形交换机

    FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用

    3.3 主题交换机

    3.4 header交换机

    处理高并发,用消息发送机制

    Confirm 消息确认机制 确认生产者消息发送到交换机

    发送单个消息 channel.waitForconfirems() 来判断单个消息是否消费成功

    批量Confirm方式,for循环发送多个消息,channel.waitForConfirmsOrDie(); 来确定是否成功, 当你发送的全部消息,有一个失败的时候,就直接全部失败抛出异常IOException

    异步Confirm 方式,发送多个消息,开启异步回调

    channel.addConfirmListener(new ConfirmListener() {
    
        @Override
        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
        }
    
        @Override
        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
            System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
        }
    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ToFI4Kzs-1667380578178)(Rabbitmq/image-20220701152952383.png)]

    Return 机制 确认交换机消息分发到队列

    Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。

    而且exchange是不能持久化消息的,queue是可以持久化消息。

    采用Return机制来监听消息是否从exchange送到了指定的queue中,没有发送到queue中,就会抛出错误信息

    避免消息重复消费

    重复消费消息的原因是,消费者没有给RabbitMQ一个ack,重复消费消息,会对非幂等性操作造成问题

    解决使用Redis的分布式锁,setnx,如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在(说明之前有人消费过该消息),获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。

    消息重试

    自带重试机制:如果发送方一开始就连 不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这 是利用 Spring 中的 retry 机制来完成的

    业务重试:业务重试主要是针对消息没有到达交换器的情况。如果消息没有成功到达交换器,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!

    思路:

    1. 首先创建一张表,用来记录发送到中间件上的消息,像下面这样

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JvvsG23s-1667380578180)(Rabbitmq/image-20220701155450480.png)]

      每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下: status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发 送失败。

      tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成 功,此时就可以开始重试了)。

      count:表示消息重试次数。 其他字段都很好理解,我就不一一啰嗦了。

    2. 在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。

    3. 在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消 息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。

    4. 另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次, 如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重 试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。

    当然这种思路有两个弊端:

    1. 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这 个应用时要看具体情况。
    2. 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时, 解决好幂等性问题就行了。

    消息有效期 TTL(Time-To-Live),消息存活的时间

    1. 在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有 一个相同的有效期
    2. 在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期

    死信交换机,Dead-Letter-Exchange 即 DLX。

    死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有 如下几种情况:

    a.消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false

    b.消息过期

    c.队列达到最大长度

    当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队 列

    DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时, RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死 信队列)。

    延迟队列实现思路

    假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置 死信交换机和死信 routing_key ,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由 于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死 信队列,就立马被消费了。

  • 相关阅读:
    tsne可视化cnn模型
    SpringBoot2.7.3 动态数据数据源以及多数据源自动配置
    Android自定义View之可拖拽悬浮控件 代码赏析
    Linux服务器安装Redis
    【设计模式】单例模式
    如何调用Metabase开放API
    Linux操作系统
    JVM虚拟机学习笔记之-5. 字节码执行引擎
    GPU服务器环境搭建
    asp.net基于net的冰淇淋订单管理系统-计算机毕业设计
  • 原文地址:https://blog.csdn.net/Xs943/article/details/127655314