• 消息队列-RabbitMQ


    一、消息队列

    1.1 什么是消息队列?

    • 消息(Message)是指在应用之间传送的数据,它既可以是简单的文本字符串,也可以是复杂的嵌入对象。
    • 消息队列(Message Queue)简称MQ,它是一种应用间的通信方式(中间件),消息发送后可以立即返回,有消息系统来确保信息的可靠传递,消息发布者和使用者都不需要知道对方的存在,发布者只管发布,而使用者只管取消息进行使用。
    • 示意图

    1.2 三大使用场景

    1.2.1 场景一: 异步处理

    • 此处以用户注册为例,分为以下三个步骤:
    • 假设我们采用同步模式(如图):用户注册后,每个步骤都会花费50ms,总共加起来150ms。但发送邮件和短信这两步操作是没有强先后顺序要求的,并且用户也基本不会关心多久会发送邮件和短信,所以这种耗时等待也是没有必要的。
    • 我们采用第二种异步模式:用户成功注册后,我们将注册成功的消息保存至消息队列服务中,直接给用户返回,此时总共花费55ms。之后再让对应的后台服务去消息队列中取消息,完成发短信和邮件的动作,这样就带来了更好的用户体验

    1.2.2 场景二: 应用解耦

    • 此处以订单系统和库存系统为例:
    • 假设订单系统依赖于库存系统,而库存系统接口经常发生变化,若采用耦合的方式,一旦库存系统升级,则订单系统必须修改它的代码,重新部署,这样会变得非常麻烦。
    • 而加入消息队列作为中间件后,订单系统无需关心库存系统接口是什么样的,我们只需要往消息队列中写消息就行了,让库存系统自己去取消息进行分析操作。并且以后无论什么系统,想知道订单系统成功之后做什么,只需要再去订阅消息队列中订单的消息即可。故再也没有这种直接调用的关系,完成了应用解耦。

    1.2.3 场景三: 流量削峰

    • 对于一些秒杀业务来说,瞬间流量是非常大的,甚至能达到百万级。若每一笔秒杀业务要完成下单等一些列流程操作,会给后台造成非常大的压力,任务会一直阻塞,导致服务器资源不断被消耗,最终宕机。
    • 而我们使用消息队列作为中间件,将请求放进来后存到消息队列,直接完成响应。然后让各自的业务去订阅消息队列中的秒杀请求,接下来进行挨个处理,这么做的好处就是永远都不会导致机器的资源被耗尽,导致宕机。

    二、RabbitMQ概述

    RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。

    2.1 两个重要概念

    • 消息代理(message broker)和目的地(destination):当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。

    • 两种形式的目的地:

      • 队列(queue):点对点消息通信(point-to-point) 。
      • 主题(topic):发布(publish)/ 订阅(subscribe)消息通信。
      • 只要是消息中间件,一定会有这两种模式

    2.2 点对点模式(队列式)

    • 消息发送者发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列

    • 消息只有唯一的发送者和接受者,但并不是说只能有一个接收者,队列可以允许很多人同时监听消息。但该模式将消息放到队列后,最终只会交给一个对象,谁先抢到就是谁的

    2.3 发布订阅式

    • 发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息

    2.4 消息代理规范

    • JMS(Java Message Service)JAVA消息服务:基于JVM消息代理的规范。ActiveMQHornetMQJMS实现。

    2.5 消息队列协议

    • AMQP(Advanced Message Queuing Protocol):高级消息队列协议,也是一个消息代理的规范,兼容JMSRabbitMQAMQP的实现。

    2.6 规范与协议对比

    JMS(Java Message Service)AMQP(Advanced Message Queuing Protocol)
    定义Java api网络线级协议
    跨语言
    跨平台
    Model提供两种消息模型:
    (1)、Peer-2-Peer
    (2)、Pub/sub
    提供了五种消息模型:
    (1)、direct exchange
    (2)、fanout exchange
    (3)、topic change
    (4)、headers exchange
    (5)、system exchange
    本质来讲,后四种和JMS的pub/sub模型没有太大差别,
    仅是在路由机制上做了更详细的划分;
    支持消息类多种消息类型:
    TextMessage
    MapMessage
    BytesMessage
    StreamMessage
    ObjectMessage
    Message (只有消息头和属性)
    byte[]
    当实际应用时,有复杂的消息,可以将消息序列化后发
    送。
    综合评价JMS 定义了JAVA API层面的标准;在java体系中,
    多个client均可以通过JMS进行交互,不需要应用修
    改代码,但是其对跨平台的支持较差;
    AMQP定义了wire-level层的协议标准;天然具有跨平
    台、跨语言特性。

    2.7 RabbitMQ工作流程

    • 示意图

    • 流程说明
      • 无论是发布者(Publisher)想要发消息(Message),还是消费者(Consumer)要接消息(Message),它们都必须RabbitMQ 建立一条连接(Connection),且一个客户端只会建立一条连接
      • 所有的收发数据都需要在连接(Connection)里面开辟信道(Channel)进行收发,想要收发的都是消息(Message),所以我们要构造一个消息(Message)。
      • 消息有头有体:相当于是对参数的一些设置、命令,就是消息的真正内容,而消息里面最重要的一个就是路由键(routing-key)
      • 我们将消息指定好路由键(即要发给谁)以后,消息(Message)先来到消息代理(Broker)指定的一个虚拟主机(Virtual Host)里边, 由虚拟主机(Virtual Host)指定交换机(Exchange)。
      • 由指定的交换机(Exchange)收到消息以后,它根据我们指定的路由键(routing-key),通过交换机跟其它队列(Queue)的绑定关系,将这个消息放到指定队列(Queue)。
      • 然后消费者(Consumer)就会监听这个队列,队列里面的内容就会被消费者(Consumer)通过信道实时拿到。
    • 建立长连接的好处:一旦消费者(Consumer)出现了问题,宕机或者各种,连接中断了,RabbitMQ就会实时的感知有消费者下线,消息没办法派发,它就会再次存储到队列(Queue)中,不会造成大面积的消息丢失
    • 名词释义
      • Publisher(发布者):是一个向交换器发布消息的客户端应用程序。
      • Message(消息):由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
      • Exchange(交换机): 用来接收发布者发送的消息,并将这些消息路由给服务器中的队列。Exchange有4种类型:direct(默认)、fanout、 topic、headers,不同类型的Exchange转发消息的策略有所区别。
      • Binding(绑定):用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
      • Queue(消息队列):用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
      • Connection(网络连接):比如一个TCP连接。
      • Channel(信道):多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内的虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
      • Consumer(消费者):表示一个从消息队列中取得消息的客户端应用程序。
      • Virtual Host(虚拟主机):表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhostAMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost/
      • Broker:表示消息队列服务器实体

    三、Docker下安装RabbitMQ

    # 安装
    $ docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
    
    # 设置自启动
    $ docker update rabbitmq --restart=always
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 端口映射说明

      • 4369, 25672 (Erlang发现&集群端口)。
      • 5672, 5671 (AMQP端口)。
      • 15672 (web管理后台端口)。
      • 61613, 61614 (STOMP协议端口)。
      • 1883, 8883 (MQTT协议端口)。
    • 访问管理后台(可视化操作界面):

    # 浏览器访问
    ip=http://yourIp:15672/
    
    # 用户名密码
    Username=admin
    Password=admin
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    四、Exchange(交换机概念)

    Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型

    • direct(直接)
    • fanout(广播)
    • topic(主题)
    • headers :headers 匹配 AMQP 消息的 header 而不是路由键,headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类。
    • direct、header 是一致的,它们都是JMS中说的点对点通信方式实现,而fanout、topic则是发布订阅的一些实现。

    4.1 Direct Exchange(直接交换机)

    • 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配,这是一个完整的匹配。
    • 如果一个队列绑定到该交换机上要求路由键 “abc”,则只有被标记为“abc”的消息才被转发,不会转发abc.def,也不会转发dog.ghi,只会转发abc。
    • 示意图

    4.2 Fanout Exchange(广播类型交换机)

    • 不处理路由键。只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
    • Fanout交换机转发消息是最快的。
    • 示意图

    4.3 Topic Exchange(主题类型交换机)

    • 路由键和某模式进行匹配。虽然它也是广播模式,比如它绑定了几个交换机,但是它可以指定某些交换机来发送消息,其余没指定的,则不会收到消息,所以它是部分广播,主要是根据路由键匹配将消息发个队列,这就是主题-发布订阅模式。
    • 它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开,# 表示匹配 0 个或多个单词,* 表示匹配一个单词。
    • 备注
      • 当一个队列以”#”作为绑定键时,它将接收所有消息,而不管路由键如何,类似于fanout型交换器。
      • 当特殊字符”*”、”#”没有用到绑定时,topic型交换器就好比direct型交换器了。
    • 举个例子(如图):
      • usa.# 为例,所有 usa 开头的路由键会进入这个队列。
      • #.news ,则是所有以 news 结尾的路由键会进入这个队列。
    • 示意图

    五、消息确认机制

    5.1 可靠抵达

    背景说明

    • 分布式系统中,比如现在有很多微服务,微服务连接上消息队列服务器,其它微服务可能还要监听这些消息。
    • 但是可能会因为服务器抖动(宕机,MQ 的宕机、资源耗尽等情况),以及无论是发消息的发布者、还是收消息的消费者,它们的卡顿、宕机等各种问题,都会导致消息的丢失。(比如发送者发消息的时候,给弄丢了 ,看起来消息是发出去了,MQ网络抖动没接到, 或者MQ接到了,但是它消费消息的时候,因为网络抖动又没拿到,等等各种问题…)
    • 所以在分布式系统里面,一些关键环节,我们需要保证消息一定不能丢失,比如:订单消息发出去之后,该算库存的、该算积分的、该算优惠的等等 ,这些消息千万不能丢,因为这都是涉及经济上的问题。
    • 所以,想要保证不丢失,也就是可靠抵达,无论是发消息,可靠的抵达MQ,还是收消息,MQ的消息可靠抵达到我们的消费端,我们一定要保证消息可靠抵达,包括如果出现错误,我们也应该知道哪些消息丢失了
    • 以前我们要做这种事情,可以使用事务消息,比如我们在发消息的时候,我们发消息的客户端首先会跟 MQ 建立一个连接,会在通道里面发消息,可以将通道设置成事务模式,这样发消息,只有整个消息发送过去MQ消费成功给我们有完全的响应以后,我们才算消息成功,但是使用事务消息,会使性能下降的很严重
    • 所以为了保证在高并发期间能很快速的,确认哪些消息成功、哪些消息失败,我们引入了消息确认机制

    5.2 消息准确送达的流程

    • 示意图

    • 分析:首先发布者准备一个消息,消息只要投递给 MQ 服务器,服务器收到以后,消息该怎么存怎么存,该投给哪投给哪,所以 Broker 首先会将消息交给 Exhchange,再有 Exchange 送达给 Queue,所以整个发送消息的过程,牵扯到两个:
      • P端到B端的过程。
      • E端到Q端的过程。
    • 流程说明
      • 确认回调:叫 confirmCallback,就是P端给B端 发送消息的过程,Broker 一旦收到了消息,就会回调我们的方法 confirmCallback,这是第一个回调时机,这个时机就可以知道哪些消息到达服务器了。
      • 但是服务器收到消息以后,要使用 Exchange 交换机,最终投递给 Queue,但是投递给队列这个过程可能也会失败,比如我们指定的路由键有问题,或者我们队列正在使用的过程中,被其它的一些客户端删除等操作,可能都会投递失败,投递失败就会调用 returnCallback
      • 提示:上述这两种回调都是针对的发送端。
      • 接下来就由我们消费端进行消费了,但是消费端引用消费,会引入 ack 机制(消息确认机制),这个机制能保证, 让 Broker 知道哪些消息都被消费者正确的拿到了,如果消费者正确接到,这个消息就要从队列里面删除,如果没有正确接到,可能就需要重新投递消息。
    • 总结:整个可靠抵达,分为两端处理,第一种是发送端的两种确认模式,第二个是消费端ack机制

    5.3 如何保证消息的可靠送达

    5.3.1 发送端-确认回调ConfimCallback

    # 确认消息已发送到交换机
    spring.rabbitmq.publisher-confirm-type=correlated
    
    • 1
    • 2

    5.3.2 发送端-退回回调ReturnCallback

    # 开启发送端消息抵达队列的确认,此处必须设置为true,否则消息消息路由失败也无法触发Return回调
    spring.rabbitmq.publisher-returns=true
    # 只要消息抵达了队列,以异步发送优先回调这个ReturnCallback
    spring.rabbitmq.template.mandatory=true
    
    • 1
    • 2
    • 3
    • 4

    5.3.3 消费端-Ack消息确认机制

    • 保证每个消息被正确消费,此时Broker才可以删除这个消息,消费端默认自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息。

    • 存在问题:消费端收到很多消息,自动回复给服务器ack,只有一个消息处理成功,消费端突然宕机了,结果MQ中剩下的消息全部丢失

    • 解决方式:消费端如果无法确定此消息是否被处理完成,可以手动确认消息,即处理一个确认一个,未确认的消息不会被删除。

    • 手动确认模式:只要我们没有明确告诉MQ收到消息。没有 Ack,消息就一直是 Unacked 状态,即使 Consumer 宕机,消息也不会丢失,会重新变为 Ready,等待下次有新的 Consumer 连接进来时,再发给新的 Consumer。消费者获取到消息,成功处理,可以回复 AckBroker

      • ack() 用于肯定确认,Broker 将移除此消息。
      • nack() 用于否定确认,可以指定 Broker 是否丢弃此消息,可以批量。
      • reject() 用于否定确认,同上,但不能批量。

    六、Spring Boot整合RabbitMQ

    6.1 依赖引入

            <!--RabbitMQ-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    6.2 添加配置

    # 默认服务端口号为5672,而15672为可视化操作界面端口
    spring.rabbitmq.host=yourIp
    spring.rabbitmq.port=5672
    
    # 配置虚拟机
    spring.rabbitmq.virtual-host=/
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    6.3 启动类开启注解

    @EnableRabbit
    
    • 1

    6.4 基本功能测试

    @Slf4j
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class ApplicationTests {
    
        /**
         * amqp管理组件
         */
    
        @Autowired
        private AmqpAdmin amqpAdmin;
    
        /**
         * 消息发送处理组件
         */
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        /**
         * 1. 创建交换机
         */
    
        @Test
        public void createExchange() {
    
            Exchange directExchange = new DirectExchange("jan-exchange", true, false);
            amqpAdmin.declareExchange(directExchange);
            log.info("Exchange[{}]创建成功:", "jan-exchange");
        }
        
        /**
         * 2. 创建队列
         */
    
        @Test
        public void createQueue() {
            Queue queue = new Queue("jan-queue", true, false, false);
            amqpAdmin.declareQueue(queue);
            log.info("Queue[{}]创建成功:", "jan-queue");
        }
        
        /**
         * 3. 交换机与队列绑定
         */
    
        @Test
        public void createBinding() {
    
            Binding binding = new Binding("jan-queue",
                    Binding.DestinationType.QUEUE,
                    "jan-exchange",
                    "hello.jan",
                    null);
            amqpAdmin.declareBinding(binding);
            log.info("Binding[{}]创建成功:", "jan-binding");
    
        }
    
        /**
         * 4. 发送消息测试
         * 如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口
         * 发送的对象类型的消息,可以是一个json
         */
    
        @Test
        public void sendMessageTest() {
            OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
            reasonEntity.setId(1L);
            reasonEntity.setCreateTime(new Date());
            reasonEntity.setName("jan");
            reasonEntity.setStatus(1);
            reasonEntity.setSort(2);
            rabbitTemplate.convertAndSend("jan-exchange", "hello.jan",
                    reasonEntity, new CorrelationData(UUID.randomUUID().toString()));
            log.info("消息发送完成:{}", reasonEntity);
        }
    
        /**
         * 5. 创建延时队列
         */
    
        @Test
        public void createDelayQueue() {
            HashMap<String, Object> arguments = new HashMap<>();
            arguments.put("x-dead-letter-exchange", "delay-exchange");
            arguments.put("x-dead-letter-routing-key", "delay.key");
            // 消息过期时间 1分钟(单位毫秒)
            arguments.put("x-message-ttl", 60000);
            Queue queue = new Queue("delay-queue", true, false, false, arguments);
            amqpAdmin.declareQueue(queue);
            log.info("Queue[{}]创建成功:", "delay-queue");
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95

    七、延时队列

    7.1 解锁库存场景

    解锁库存场景:当订单操作失败(超时未支付 / 被取消等)后,我们需要将库存进行解锁并回滚

    7.2 方案一:设置定时任务

    • 处理思路
      • 每隔30分钟轮巡扫描数据库查看订单状态,若订单不存在或者被取消,则将改库存进行解锁。
    • 存在问题
      • 定时任务一定程度上消耗系统内存
      • 隔一段时间就要做一个全盘扫描,增加了数据库压力
      • 时间差问题(主要问题):①.假设9:00刚执行了定时任务,而订单是在9:01下的;②.当9:30再次执行定时任务时,发现该订单还不满足30分钟未支付的关闭订单条件(距离下订单时隔29分钟),结果定时任务刚一结束,这个订单就相当于超时了;③.而当到10:00定时任务再次执行时,该笔订单才会执行库存解锁逻辑(相比预期执行时间晚了29分钟)。这就是定时任务可能会出现的时效性问题。
    • 示意图

    • 总结:基于上述这些考虑,我们就不会在这个场景下采用定时任务。我们采用 MQ 的延时队列(RabbitMQ的消息TTL与死信Exchange结合)。

    7.3 方案二:设置延时队列

    • 处理思路

      • 锁成功的消息发给消息队列,并让其暂存一段时间(比如30分钟)。
      • 30分钟后订单不支付或中途被直接取消,订单状态都算操作失败。
      • 订单是保存30分钟之后,再对其进行彻底检查,这个检查是需要时间的,我们需要确保所有订单都处理完了,再对库存进行操作,所以设置为40分钟。
      • 40分钟后将消息发给解锁库存服务,库存服务检查订单状态若订单不存在或者被取消,则将该库存解锁
    • 解决问题

      • 首要目的是解决事务的最终一致性
      • 不占用系统的任何资源,只是多搭建一台 MQ 服务器。
      • 解决定时任务的大面积时效性问题,延时队列可能时效性差那么一秒、五秒乃至于一分钟,但是都不可能差上二十分钟。

    7.4 TTL与死信路由

    7.4.1 TTL(Time To Live)

    消息的TTL就是消息的存活时间。

    • RabbitMQ可以对队列消息分别设置TTL

    • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信

    • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息TTL,因为它才是实现延迟任务的关键。可以通过设置消息的expiration字段或者x- message-ttl属性来设置时间,两者是一样的效果。

    7.4.2 死信路由(Dead Letter Exchanges(DLX))

    • 一个消息如果满足以下条件,那它就会成为死信

      • Consumer拒收了,并且reject方法的参数里requeuefalse。也就是说不会被再次放在队列里,被其他消费者使用。 ( basic.reject/ basic.nack ) requeue=false
      • 消息的TTL到了,消息过期了。
      • 假设队列的长度限制满了,排在前面的消息就会被丢弃或者扔到死信路由上。
    • Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

    • 手动ack与异常消息统一放在一个队列处理建议的两种方式

      • catch异常后,手动发送到指定队列,然后使用channelRabbitMQ确认消息已消费。
      • Queue绑定延时队列,使用nackrequque=false)确认消息消费失败。
    • 总结:我们既可以控制消息在一段时间后变成死信(TTL),又可以控制变成死信的消息被路由到某一个指定的交换机(死信路由),二者结合就可以实现一个延时队列。

    7.5 延时队列实现

    7.5.1 设置队列过期时间

    • 示意图

    • 首先,发布者将消息通过路由键发送给指定的普通交换机
    • 接着,交换机会按照路由键把它交给一个延时队列(特殊设置):
      • x-message-ttl消息的存活时间,它以毫秒为单位,相当于300秒,也就是5分钟以后消息过期 。
      • x-dead-letter-exchange死信交换机,就是死信路由,意思就是告诉服务器当前这个队列里边的消息死了,别乱丢,需要转给隔壁的死信路由 。
      • x-dead-letter-routing-key:将消息转给死信队列的路由键
    • 最后,死信消息通过路由键进入指定队列(即死信队列),只要有消费者监听了这个队列就会收到过期消息。

    7.5.2 设置消息过期时间

    • 示意图

    • 首先,为消息单独设置一个过期时间
    • 接着,当消息过期后,会经过交换机进入延时队列。由于消息存到延时队列以后,没有人会永远去监听里边的内容。所以这个消息就会一直呆在延时队列里边。服务器就会来检查这个消息,结果发现它是五分钟以后过期的,所以五分钟以后服务器就会把第一个消息拿出来,经过死信路由到我们指定的队列
    • 最后,消费者收到的消息也都是五分钟以后的过期消息。

    7.5.3 两种方式推荐

    • 如果给消息设置过期时间的话:
      • 假设给MQ发送三条消息,并分别指定过期时间为5分钟、1分钟、1秒钟;
      • 一般情况下,1秒钟过期的会优先出列;
      • 但是Rabbit MQ 采用的是惰性检查机制,也就是懒检查:服务器来拿第一条消息,发现5分钟后过期,于是就放回了该消息,5分钟后将该消息路由到死信队列,最终被消费者拿到。此时又先后来拿后面两条过期设置为1分钟、1秒钟的消息发现都早已经过期,于是就将后面两条消息加入死信队列,但是后两条消息处理已经是五分钟以后了。
    • 结论推荐使用队列设置过期时间。给整个队列设置一个过期时间,这样队列里边所有的消息都是这个过期时间,我们服务器直接批量全部拿出来,往后放就行了。

    八、消息丢失、重复、积压问题

    8.1 解决消息丢失

    8.1.1 场景一:消息发送出去,由于网络问题没有抵达服务器

    • 做好容错方法(try-catch),发送消息可能会网络失败,失败后要有重试机制。可记录到数据库,如果消息没有发送成功,定期去数据库扫描未成功的消息进行定期重发
    • 做好日志记录,每个消息状态是否都被服务器收到都应该记录,可以创建一张关于消息的数据表,存到数据库里。
    CREATE TABLE `mq_message` (
    `message_id` char(32) NOT NULL,
    `content` text,
    `to_exchane` varchar(255) DEFAULT NULL,
    `routing_key` varchar(255) DEFAULT NULL,
    `class_type` varchar(255) DEFAULT NULL,
    `message_status` int(1) DEFAULT '0' COMMENT '0-新建 1-已发送 2-错误抵达 3-已抵达',
    `create_time` datetime DEFAULT NULL,
    `update_time` datetime DEFAULT NULL,
    PRIMARY KEY (`message_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    8.1.2 场景二:消息抵达Broker之后,Broker要将消息写入磁盘(持久化)时宕机

    • publisher必须加入确认回调机制,确认成功的消息,修改数据库消息状态。

    8.1.3 场景三:自动ACK的状态下。消费者收到消息,但没来得及消息然后宕机

    • 一定开启手动ACK,消费成功再移除,失败或者没来得及处理就noAck重新入队

    8.2 解决消息重复

    • 出现场景
      • 消息消费成功,事务已经提交,ack时,机器宕机。导致没有ack成功,Broker的消息重新由unack变为ready,并发送给其他消费。
      • 消息消费失败,由于重试机制,自动又将消息发送出去。
      • 成功消费,ack时宕机,消息由unack变为readyBroker又重新发送。
    • 解决方式
      • 消费者的业务消费接口应该设计为幂等性的。比如扣库存有工作单的状态。
      • 使用防重表Redis/Mysql),发送消息每一个都有业务的唯一标识,处理过就不用处理。
      • RabbitMQ的每一个消息都有redelivered字段,可以获取是否是被重新投递过来的,而不是第一次投递过来的。

    8.3 解决消息积压

    • 出现场景
      • 消费者宕机积压。
      • 消费者消费能力不足积压。
      • 发送者发送流量太大。
    • 解决方式
      • 上线更多的消费者,进行正常消费。
      • 上线专门的队列消费服务,将消息先批量取出来,记录数据库,离线慢慢处理

    九、常见消息队列对比

    十、结束语


    “-------怕什么真理无穷,进一寸有一寸的欢喜。”

    微信公众号搜索:饺子泡牛奶

  • 相关阅读:
    lv7 嵌入式开发-网络编程开发 04 IP地址与端口号
    LCA几种算法
    git-secret:在 Git 存储库中加密和存储密钥(下)
    Java---数据库---数据库约束,设计,多表,事务
    几款实用的照片变漫画免费软件,千万别错过
    小程序拼团业务,并发支付导致重复给用户返利bug
    Linux Centos系统 磁盘分区和文件系统管理 (深入理解)
    Spring源码分析(二):底层架构核心概念解析
    图神经网络之预训练大模型结合:ERNIESage在链接预测任务应用
    【开源打印组件】vue-plugin-hiprint初体验
  • 原文地址:https://blog.csdn.net/weixin_48776531/article/details/125568569