• RabbitMQ从入门到入土


    同步与异步

    同步调用

    优势:

    • 时效性强,等到结果后就返回

    问题:

    • 扩展性差

    • 性能下降

    • 级联失败问题

    异步调用

    优势:

    • 耦合度低,扩展性强

    • 无需等待,性能好

    • 故障隔离,下游服务故障不影响上游

    • 缓存消息,削峰填谷

    问题:

    • 不能立刻获得结果,时效性差

    • 不确定下游业务执行是否成功

    • 业务安全依赖于Broker的可靠性

    Broker:(代理,常指一种中间件,用于协调和管理不同组件之间的通信、交互、服务调用

    异步调用基于消息推送的方式,一般包含3个角色。

    • 消息发送者

    • 消息代理:管理、暂存、转发消息

    • 消息接收者

    MQ技术选型

    MQ:消息队列,就是存放消息的队列,也就是异步调用中的Broker。

    RabbitMQ、ActiveMQ、Rocket MQ、Kafka对比

    RabbitMQActiveMQRocketMQKafka
    公司/社区RabbitApache阿里Apache
    开发语言ErlangJavaJavaScala&Java
    协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
    可用性一般
    单机吞吐量一般非常高
    消息延迟微秒级毫秒级毫秒级毫秒以内
    消息可靠性一般一般
    • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

    • 追求可靠性:RabbitMQ、RocketMQ

    • 追求吞吐能力:RocketMQ、Kafka

    • 追求消息低延迟:RabbitMQ、Kafka

    个人认为:如果在项目中使用的是RabbitMQ,面试官问你为啥使用这个可以从以下入手:

    • 可用性高

    • 消息可靠性高

    • 虽然单机吞吐量一般,但是消息延迟低。为什么消息延迟低呢?

      • RabbitMQ之所以被认为能够提供较低的消息延迟,主要归因于以下几个因素:

        1. 高性能的底层实现:RabbitMQ是用Erlang语言编写的,Erlang专为高并发、分布式系统设计,提供了轻量级进程和共享无锁数据结构,这使得RabbitMQ在处理大量并发连接和消息时表现得非常高效。

        2. 零拷贝技术:RabbitMQ利用操作系统提供的零拷贝特性,减少数据在内核空间和用户空间之间的复制次数,从而加快消息传输速度,降低延迟。

        3. 多路复用的TCP连接:通过使用Channel(信道)这一概念,RabbitMQ可以在单个TCP连接上复用多个逻辑连接,减少了网络连接的开销,提升了通信效率。

        4. 可配置的消息优先级:RabbitMQ允许为消息设置优先级,这在某些场景下可以帮助紧急或高优先级的消息更快地被消费,减少它们的等待时间。

        5. 社区支持和成熟度:作为一个成熟的开源项目,RabbitMQ拥有活跃的开发者社区和丰富的文档资源,这意味着它经过了广泛的测试和优化,能够提供稳定的低延迟表现。

    好,现在开始来介绍我们的RabbitMQ了

    RabbitMQ

    认识

    整体架构和核心概念如下:

    • publisher:消息发布者

    • consumer:消息消费者

    • queue:队列,存储消息

    • exchange:交换机:转发消息

    image-20240215142856543

    Java客户端使用步骤

    我们要知道的是RabbitMQ是根据AMQP协议来实现的:

    AMQP:用于应用程序之间传递业务消息的开放标准

    Spring AMQP:基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息

    使用之前你要从官网进行下载RabbitMQ这个中间件,并启动它。

    下载地址:https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.13.3

    官方使用地址:RabbitMQ Tutorials | RabbitMQ

    1. 引入Spring-amqp的依赖

      1. <dependency>
      2.   <groupId>com.rabbitmq</groupId>
      3.   <artifactId>amqp-client</artifactId>
      4.   <version>5.13.0</version> <!-- 或者使用最新版本 -->
      5. </dependency>

      2、配置RabbitMQ服务端信息

      1. spring:
      2. rabbitmq:
      3.   host: 127.0.0.1 #ip
      4.   port: 5672     #端口
      5.   username: guest #账号
      6.   password: guest #密码
      7.   virtualHost:   #链接的虚拟主机
      8.   addresses: 127.0.0.1:5672     #多个以逗号分隔,与host功能一样。
      9.   requestedHeartbeat: 60 #指定心跳超时,单位秒,0为不指定;默认60s
      10.   publisherConfirms: true #发布确认机制是否启用
      11.   #确认消息已发送到交换机(Exchange)
      12.   #publisher-confirm-type参数有三个可选值:
      13.   #SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。
      14.   #CORRELATED:消息从生产者发送到交换机后触发回调方法。
      15.   #NONE(默认):关闭发布确认模式。
      16.   #publisher-confirm-type: correlated #发布确认机制是否启用 高版本Springboot使用替换掉publisher-confirms:true
      17.   publisherReturns: true #发布返回是否启用
      18.   connectionTimeout: #链接超时。单位ms。0表示无穷大不超时

      3、利用RabbitTemplate发送消息

      1. import com.rabbitmq.client.Channel;
      2. import com.rabbitmq.client.Connection;
      3. import com.rabbitmq.client.ConnectionFactory;
      4. public class Producer {
      5.    private static final String QUEUE_NAME = "hello";
      6.    public static void main(String[] argv) throws Exception {
      7.        ConnectionFactory factory = new ConnectionFactory();
      8.        factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址
      9.        try (Connection connection = factory.newConnection();
      10.             Channel channel = connection.createChannel()) {
      11.            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
      12.            String message = "Hello World!";
      13.            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
      14.            System.out.println(" [x] Sent '" + message + "'");
      15.       }
      16.   }
      17. }

      4、利用@RabbitListener注解声明要监听的队列,监听消息

      1. import com.rabbitmq.client.*;
      2. import java.io.IOException;
      3. import java.util.concurrent.TimeoutException;
      4. public class Consumer {
      5.    private static final String QUEUE_NAME = "hello";
      6.    public static void main(String[] argv) throws IOException, TimeoutException {
      7.        ConnectionFactory factory = new ConnectionFactory();
      8.        factory.setHost("localhost"); // 如果RabbitMQ不在本地,请修改为主机地址
      9.        Connection connection = factory.newConnection();
      10.        Channel channel = connection.createChannel();
      11.        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
      12.            String message = new String(delivery.getBody(), "UTF-8");
      13.            System.out.println(" [x] Received '" + message + "'");
      14.       };
      15.        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
      16.        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      17.   }
      18. }

    WorkQueue

    • 一个队列绑定多个消费者,加快消息处理速度。

    • 同一个消息只会被一个消费者处理

    • 通过设置prefetch来控制消费者预取的消息数量,处理完一条再处理下一条,实现能者多劳

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. prefetch: 1 #每次只能获取一条消息,处理完才能获取下一条消息

    Java声明队列和交换机

    SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:

    • Queue:用于声明队列,可以用工厂类QueueBuilder构建

    • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建

    • Binding:用于声明队列和交换机绑 定关系,可以用工厂类BindingBuilder构建

    在consumer中创建一个类,声明队列和交换机:

    1、通过配置实现

    1. package cn.itcast.mq.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.FanoutExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. @Configuration
    9. public class FanoutConfig {
    10.   /**
    11.     * 声明交换机
    12.     * @return Fanout类型交换机
    13.     */
    14.   @Bean
    15.   public FanoutExchange fanoutExchange(){
    16.       return new FanoutExchange("itcast.fanout");
    17.   }
    18.   /**
    19.     *1个队列
    20.     */
    21.   @Bean
    22.   public Queue fanoutQueue1(){
    23.       return new Queue("fanout.queue1");
    24.   }
    25.   /**
    26.     * 绑定队列和交换机
    27.     */
    28.   @Bean
    29.   public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
    30.       return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    31.   }
    32.   /**
    33.     *2个队列
    34.     */
    35.   @Bean
    36.   public Queue fanoutQueue2(){
    37.       return new Queue("fanout.queue2");
    38.   }
    39.   /**
    40.     * 绑定队列和交换机
    41.     */
    42.   @Bean
    43.   public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
    44.       return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    45.   }
    46. }
    47. 2、通过注解实现
    48. @RabbitListener(bindings = @QueueBinding(
    49. value = @Queue(name = "direct.queue1", durable="true"),
    50. exchange = @Exchange(name = "test.direct", type = ExchangeTypes.DIRECT),
    51. key = {"red", "blue"}
    52. ))
    53. public void listenDirectQueue1(String msg) throws Exception{
    54. System.out.println("消费者1收到了 direct.queue1的消息:【" + msg + "】");
    55. }

    交换机

    消息一般都是通过exchange来发送消息的,而不是直接发送到队列中的。

    常用交换机的类型有以下三种:

    • Fanout(广播):将消息发送给所有跟该交换机绑定了的queue(就是每个人都能收到)

    • Direct(定向):消息根据规则路由到指定的queue

    • Topic(话题):根据类别来进行发送消息(类似Direct)

    Fouout交换机

    这里定义了一个生产者发送消息:

    1.    @Test
    2.    public void testSendFanout(){
    3.        String exchangeName = "test.fanout";
    4.        String msg = "hello, everyone!";
    5.        rabbitTemplate.convertAndSend(exchangeName,null,msg);
    6.   }

    下面是两个消费者

    1. package cn.itcast.mq.listener;
    2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    3. import org.springframework.stereotype.Component;
    4. import java.time.LocalTime;
    5. @Component
    6. public class SpringRabbitListener {
    7.    @RabbitListener(queues = "fanout.queue1")
    8.    public void listenFanout1(String msg) throws InterruptedException {
    9.        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
    10.        Thread.sleep(20);
    11.   }
    12.    @RabbitListener(queues = "fanout.queue2")
    13.    public void listenFanout2(String msg) throws InterruptedException {
    14.        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
    15.        Thread.sleep(200);
    16.   }
    17. }

    这里是接收到的消息,每个都接收到了。

    1. 消费者2........接收到消息:【hello, everyone!】18:50:53.628336200
    2. 消费者1接收到消息:【hello, everyone!】18:50:53.628336200

    定向交换机

    Direct Exchange会将收到的消息根据规则路由到指定的Queue,因此称为定向路由。

    • 每一个Queue都与Exchange设置一个BindingKey

    • 发布者发送消息时,指定消息的RoutingKey

    • Exchange将消息路由到BindingKey与消息RoutingKey一直的队列

    添加配置类,将交换机和队列进行绑定:

    1. @Configuration
    2. @EnableRabbit
    3. public class RabbitConfig {
    4.    @Bean
    5.    DirectExchange directExchange() {
    6.        return new DirectExchange("test.direct");
    7.   }
    8.    @Bean
    9.    Queue redQueue() {
    10.        return new Queue("redQueue", false); // false 表示队列不是持久化的
    11.   }
    12.    @Bean
    13.    Binding bindingRed(DirectExchange directExchange, Queue redQueue) {
    14.        return BindingBuilder.bind(redQueue()).to(directExchange).with("red"); // 绑定键为"red"
    15.   }
    16. }

    发送端:

    1.   @Test
    2.    public void testSendFanout(){
    3.        String exchangeName = "test.direct";
    4.        String msg = "hello, everyone!";
    5.        rabbitTemplate.convertAndSend(exchangeName,"red",msg);
    6.   } //这个red是以及绑定了的BindingKey

    消费端:

    1. @Service
    2. public class FanoutReceiver {
    3.    @RabbitListener(queues = "redQueue")
    4.    public void receiveMessage(String message) {
    5.        System.out.println("Received from redQueue: " + message);
    6.   }
    7. }

    话题交换机

    TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以 . 分隔。例如:

    China.news (代表中国新闻这个列表)

    Queue与Exchange指定BindingKey时可以使用通配符:

    • #: 代指0个或多个单词

    • *:代指一个单词

    配置类代码:

    1. @Configuration
    2. @EnableRabbit
    3. public class RabbitConfig {
    4.    @Bean
    5.    TopicExchange topicExchange() {
    6.        return new TopicExchange("test.topic");
    7.   }
    8.    @Bean
    9.    Queue chinaNewsQueue() {
    10.        return new Queue("china.news.queue", false);
    11.   }
    12.    @Bean
    13.    Queue chinaSportsQueue() {
    14.        return new Queue("china.sports.queue", false);
    15.   }
    16.    @Bean
    17.    Binding bindingChinaNews(TopicExchange topicExchange, Queue chinaNewsQueue) {
    18.        return BindingBuilder.bind(chinaNewsQueue()).to(topicExchange).with("China.news");
    19.   }
    20.    @Bean
    21.    Binding bindingChinaSports(TopicExchange topicExchange, Queue chinaSportsQueue) {
    22.        return BindingBuilder.bind(chinaSportsQueue()).to(topicExchange).with("China.sports.*"); // 匹配以"China.sports."开头的所有Routing Key
    23.   }
    24. }

    生产者代码:

    1. @Test
    2. public void testSendTopic() {
    3.    String exchangeName = "test.topic";
    4.    String routingKey = "China.news"; // 使用符合Topic规则的Routing Key
    5.    String msg = "Breaking news from China!";
    6.    rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    7. }

    消费者:

    1. @Service
    2. public class TopicReceiver {
    3.    @RabbitListener(queues = "china.news.queue")
    4.    public void receiveNews(String message) {
    5.        System.out.println("Received news from 'China.news': " + message);
    6.   }
    7.    @RabbitListener(queues = "china.sports.queue")
    8.    public void receiveSports(String message) {
    9.        System.out.println("Received sports update from 'China.sports.*': " + message);
    10.   }
    11. }

    消息转换器

    Spring会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

    • 数据体积过大

    • 有安全漏洞

    • 可读性差

    所以,我们就希望让消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

    1、引入依赖

    1. <dependency>
    2.    <groupId>com.fasterxml.jackson.dataformat</groupId>
    3.    <artifactId>jackson-dataformat-xml</artifactId>
    4.    <version>2.9.10</version>
    5. </dependency>

    2、配置Bean

    1. @Bean
    2. public MessageConverter jsonMessageConverter(){
    3.    return new Jackson2JsonMessageConverter();
    4. }

    可靠性

    发送者的可靠性

    生产者重连

    有时候由于网络波动,出现连接MQ失败情况。

    我们可以通过配置开启连接失败后的重连机制:

    1. spring:
    2. rabbitmq:
    3. connection-timeout: 1s #设置MQ的连接超时时间
    4. template:
    5. retry:
    6. enabled: true #开启超时重试机制
    7. inital-interval: 1000ms #失败后的初始等待时间
    8. multiplier: 1 #失败后下次等待时长的倍数,下次等待时长 = initial-interval * multipler
    9. max-attempts: 3 #最大重试次数

    注意:

    • 当网络不稳定的时候,利用重试机制可以有效的提高消息发送成功率,但是SpringAMQP的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是呗阻塞的,会影响性能。

    • 如果对于业务性能有要求的时候,建议禁用重试机制。如果一定要用,请进行适当的配置。

    生产者确认

    生产者确认的作用:为了让生产者知道消息是否被消费成功。

    RabbitMQ的Publisher Confirm 和Publisher Return两种确认机制:

    • ConfirmCallback:当RabbitMQ成功处理消息时调用

    • ReturnCallback:当消息无法路由到任何队列时调用,例如由于交换器找不到匹配的队列,此时会返回消息及原因。

    开启确认机制后,以下两种情况会返回消息被接收的ACK:

    • 消息被投递到匹配的队列

    • 持久化消息写入磁盘

    注意这一种情况:

    当消息被投递到MQ后,但是路由失败(没有匹配规则的队列),一样会返回ACK

    实现生产者确认
    1. 配置实现:

      1. spring:
      2. rabbitmq:
      3. publisher-confirm-type: correlated
      4. publisher-returns: true
      5. #publisher-confirm-type有3中模式:
      6. #- none:关闭confirm机制
      7. #- simple:同步阻塞等待MQ的回执消息
      8. #- correlated:MQ异步回调方式返回回执消息

      1. import com.rabbitmq.client.*;
      2. import java.io.IOException;
      3. import java.util.HashMap;
      4. import java.util.Map;
      5. import java.util.concurrent.TimeoutException;
      6. import java.util.concurrent.atomic.AtomicInteger;
      7. public class RabbitMQProducerConfirmAndReturn {
      8.    private static final String QUEUE_NAME = "my_queue";
      9.    private static final String EXCHANGE_NAME = "my_exchange";
      10.    private static final String ROUTING_KEY = "my_routing_key";
      11.    public static void main(String[] args) throws IOException, TimeoutException {
      12.        ConnectionFactory factory = new ConnectionFactory();
      13.        factory.setHost("localhost"); // 根据实际情况设置RabbitMQ服务器地址
      14.        Connection connection = factory.newConnection();
      15.        Channel channel = connection.createChannel();
      16.        // 开启发布确认
      17.        channel.confirmSelect();
      18.        AtomicInteger outstandingConfirms = new AtomicInteger(0);
      19.        // 添加ConfirmCallback
      20.        channel.addConfirmListener((deliveryTag, multiple) -> {
      21.            System.out.println("Confirmed delivery for message with tag: " + deliveryTag);
      22.            outstandingConfirms.decrementAndGet();
      23.       }, (deliveryTag, multiple) -> {
      24.            System.out.println("Negative acknowledgement received for message with tag: " + deliveryTag);
      25.            // 这里可以根据需要处理未确认消息的逻辑
      26.       });
      27.        // 添加ReturnCallback
      28.        channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
      29.            System.out.println("Returned message: " + new String(body));
      30.            Map<String, Object> headers = properties.getHeaders();
      31.            if (headers != null && headers.containsKey("messageId")) {
      32.                String messageId = (String) headers.get("messageId");
      33.                System.out.println("Message with ID " + messageId + " was not routed.");
      34.                // 这里可以添加消息未路由的重试逻辑
      35.           }
      36.       });
      37.        try {
      38.            String customMessageId = "msg-id-123";
      39.            sendMessageWithId(channel, EXCHANGE_NAME, ROUTING_KEY, "Hello, World!".getBytes(), customMessageId);
      40.       } finally {
      41.            channel.close();
      42.            connection.close();
      43.       }
      44.   }
      45.    private static void sendMessageWithId(Channel channel, String exchange, String routingKey, byte[] messageBody, String messageId) throws IOException {
      46.        Map<String, Object> headers = new HashMap<>();
      47.        headers.put("messageId", messageId); // 自定义消息ID作为header
      48.        BasicProperties props = new BasicProperties.Builder()
      49.               .deliveryMode(2) // 持久化消息
      50.               .headers(headers)
      51.               .build();
      52.        channel.basicPublish(exchange, routingKey, props, messageBody);
      53.        outstandingConfirms.incrementAndGet(); // 记录待确认的消息数
      54.        System.out.println("Published message with ID " + messageId);
      55.   }
      56. }

    MQ的可靠性

    数据持久化

    默认情况,Rabbit会将收到的消息存在内存中。

    RabbitMQ实现数据持久化(存放在磁盘中)有3个方面:

    • 交换机持久化

      • 声明的时候将 durable属性配置为true

    • 队列持久化

      • 声明的时候将 durable属性配置为true

    • 消息持久化

      • 设置消息属性中的deliveryMode2来标记消息为持久化

    注意事项:

    • 持久化消息并不保证零丢失,因为它们在内存中排队等待写入磁盘时仍有可能因系统崩溃而丢失。

    • 持久化会增加消息发布的延迟,因为消息必须等待被写入磁盘。

    • 队列和交换器的持久化并不会自动持久化其中的消息,消息的持久化需要单独设置。

    • 如果之前声明的队列或交换器是非持久化的,需要先删除原有队列或交换器,然后重新声明为持久化版本,否则会遇到错误。

    • 为了确保消息不丢失,除了持久化之外,还需要考虑消费者确认(Ack)机制,以及可能的死信队列和重试策略。

    Lazy Queue

    惰性队列特征:

    1. 接收到消息直接存入磁盘,非内存(内存中只保留最近的消息,默认2048条)

    2. 消费者要消费消息才会从磁盘读取消息并加载到内存中

    3. 支持数百万条的消息存储

    在3.12版本后,所有的队列都是Lazy Queue模式,无法更改。

    实现方式
    1. Java代码声明中实现

      1. @Bean
      2. public Queue lazyQueue() {
      3. return QueueBuilder
      4. .durable("lazy.queue") //持久化
      5. .lazy() //开启lazy模式
      6. .build();
      7. }

    2. 消费端实现:

      1. @RabbitListener(queuesToDeclare = @Queue(
      2. name = "lazy.queue",
      3. arguments = @Argument(name = "x-queue-mode", value = "lazy")))//lazy开启了消息持久化
      4. public void listenLazyQueue(String msg){
      5. log.info("接收到lazy.queue的消息:{}",msg);
      6. }

    消费者的确认机制

    • 为了确定消费者是否成功处理消息而提出的消费者确认机制。

    • 当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态:

      • ack:成功处理消息,RabbitMQ从队列中删除该消息

      • nack:消息处理失败,RabbitMQ需要再次投递消息

      • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息(消费者无法处理接收到的消息时;消费者在处理消息的过程中发生了异常)

    实现:

    SpringAMQP已经实现了消息确认功能,运行通过配置文件选择ACK处理方式,有3种方式:

    • none:不处理。即消息投递后,直接返回ack,不安全,不建议使用

    • manual :手动模式。需要自己在业务种调用api,发送ack或者reject,存在业务入侵,但是更灵活

    • auto:自动模式。SpringAMQP利用AOP对消息处理逻辑进行环绕增强,当业务正常执行,返回ack,出现异常

      • 业务异常,nack

      • 消息处理或校验异常,reject

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. prefetch: 1
    6. acknowledge-mode: manual

    建议一般采用manual实现。

    失败重试机制

    • 问题:消费者出现异常后,消息会不断的重新入队,发送给消费者,这样无限循环,导致mq消息处理飙升。

    • 解决方案:利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. prefetch: 1
    6. acknowledge-mode: auto
    7. retry:
    8. enable: true
    9. initial-interval: 1000ms #初始失败等待时长
    10. multiplier: 1
    11. max-attempts: 3 #最大重试次数
    12. st
    13. atuless: true #true无状态,false有状态。如果业务中包含事务,这里改成false

    如果重试次数耗尽的时候,如果消息依然失败,还有兜底的策略,可以实现MessageRecoverer接口实现:

    • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认这种方式)

    • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

    • RepublishMessageRecoverer:重试耗尽,将失败消息发送给指定交换机(专门用来处理失败消息的,死信交换机)

    RepublishMessageRecoverer的示例:

    1. 定义接收失败消息的交换机、队列,并绑定关系。此处实现略。

    2. 定义RepublishMessageRecoverer

      1. @ConditionalOnProperty(prefix = "spring.rabbitmq.retry",name = "enable", havingValue = "true")
      2. //可以在配置类上加入这个代码,只有上述条件满足的时候,配置才实现
      3. //这里还有配置交换机、队列、以及进行绑定
      4. @Bean
      5. public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){
      6. return new RepublishMessageRecoverer(rabbitTemplate, " error.direct", "error");
      7. }

    业务幂等性

    幂等:一个数学概念。f(x) = f( f(x) )。意思就是:同一个业务,执行一次或多次,对业务的影响是一致的。(比如说删除和查询)

    唯一消息id

    给每一消息都设置一个唯一id,利用唯一id区分是否重复消息。

    1. 每一条消息都生成一个唯一id,与消息一起投递给消费者

    2. 消费者接收到消息后处理自己的业务,业务完成将消息id保存到数据库

    3. 如果下次收到相同消息,去数据库中判断是否存储,存在则重复消息放弃处理

    1. @Bean
    2. public MessageConverter messageConverter(){
    3.    //1、定义消息转换器
    4.    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    5.    //2、配置启动自动创建消息id,用于识别不同消息,也可以在业务中基于id判断是否重复消息
    6.    jjmc.setCreateMessageIds(true);
    7. }

    这个消息id是保存在header中的

    1. @RabbitListener(queues = "yourQueueName")
    2. public void listen(Message message) {
    3.    MessageProperties messageProperties = message.getMessageProperties();
    4.    String messageId = messageProperties.getMessageId(); // 获取消息ID
    5.    if (messageId != null) {
    6.        System.out.println("Received message with ID: " + messageId);
    7.        // 进行幂等性检查或其他基于ID的处理逻辑
    8.   }
    9.    // 其他消息处理逻辑...
    10. }

    业务判断

    结合实际业务逻辑做判断。

    比如说一个订单业务,我们要防止订单状态修改后不再继续被修改,就可以对订单业务进行判断:

    • 如果订单是未支付状态,才变成已支付状态

    • 如果订单是已支付状态,状态不变

    延迟消息

    生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定消息后才收到消息。

    死信交换机

    死信:当队列中的消息满足下列情况之一后,就成为了死信。(dead letter)

    • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false

    • 消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费。

    • 要投递的队列消息堆积满了,最早的消息可能成为死信。

    在队列的dead-letter-exchange属性中指定一个A交换机,则该队列中的死信会投递到A交换机中,该A交换机就是死信交换机Dead Letter Exchange,简称DLX)。

    image-20240611130025919

    插件实现

    官方提供了一个插件实现延迟消息,网址如下:Community Plugins | RabbitMQ

    下载rabbitmq_delayed_message_exchage

    1、声明延迟交换机

    1)方式一

    1. @Bean
    2.    public DirectExchange delayExchange(){
    3.        return ExchangeBuilder
    4.               .directExchange("delay.direct")
    5.               .delayed()  //设置delay的属性为true
    6.               .durable(true)
    7.               .build();
    8.   }

    2)方式二

      
    1.  @RabbitListener(bindings = @QueueBinding(
    2.            value = @Queue(name = "delay.queue", durable = "true"),
    3.            exchange = @Exchange(value = "delay.direct",delayed = "true"), //delayed = true 开启延迟交换机
    4.            key = "delay"
    5.   ))
    6.    public void listenToDelayMessage(String msg){
    7.        log.info("接收到delay.queue的延迟消息:{}",msg);
    8.   }

    2、测试:

    1. @Test
    2.    void testSendDelayMessage(){
    3.        rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {
    4.            @Override
    5.            public Message postProcessMessage(Message message) throws AmqpException {
    6.                //设置消息延迟时间
    7.                message.getMessageProperties().setDelay(1000);
    8.                return message;
    9.           }
    10.       });
    11.        log.info("消息发送成功!");
    12.   }

    消息堆积

    为什么会出现消息堆积这种问题呢?

    RabbitMQ 消息堆积问题通常发生在生产者发送消息的速度远大于消费者处理消息的速度时,这可能导致队列中消息的累积,直至达到存储上限,进而影响系统性能甚至导致消息丢失。

    解决方案

    要生成解决方案的前提,我们首先要先确定对应的消息堆积产生场景,对症下药。

    • 消费者处理消息速度太慢

      • 增加消费者数量

      • 优化消费者性能,优化代码,增加资源

      • 消息预取限制(prefetch在配置文件中的配置),以避免一次处理过多消息导致处理缓慢

    • 队列容量太小

      • 增加队列容量

    • 网络故障,导致消息可能丢失,导致消息在队列中堆积

      • 监控 + 告警,确保网络故障发送时能快速发现并解决问题

      • 持久化 + 高可用:确保消息和队列持久化以避免消息丢失,并使用镜像队列提高可用性

    • 消费者故障

      • 使用死信队列:将无法处理的添加到死信队列中,避免阻塞主队列

      • 容错机制:消费者自动重复和错误处理逻辑

    • 队列配置不当

      • 优化队列配置:检测并优化消息确认模式,队列长度限制和其他相关配置

    • 消息太大了,处理时间较长

      • 消息分片:将大型消息分割成小的消息片段,加速处理

    • 业务逻辑复杂或耗时

      • 优化业务逻辑:简化消费者中的业务逻辑,减少处理每个消息所需的时间

    • 消息产生速度快于消费者速度

      • 限流

      • 负载均衡:消费者间公平分配,避免个别消费者加载

    • 其他配置优化

      • 设置消息优先级,确保优先级高的先处理

      • 配置文件描述符的限制,内存使用限制

    补充

    消息队列的路由模型

    消息队列的路由模型指的是消息从生产者到消费者的传递路径和方式。常见的消息队列路由模型包括:

    • 点对点模型(Point-to-Point):消息被发送到一个队列中,只有一个消费者可以接收并处理该消息。

    • 发布-订阅模型(Publish-Subscribe):消息被发送到一个主题(或交换机)中,多个消费者可以订阅该主题,并且每个消费者都可以收到消息并独立处理。

    • 路由模型(Routing):消息根据特定的路由规则被发送到不同的队列中,消费者根据队列接收并处理消息。

    消息队列的应用经验(使用场景)

    • 异步通信:在系统内部或者不同系统之间进行异步通信,提高系统的响应速度和吞吐量。

    • 任务调度和削峰填谷:通过消息队列进行任务调度,将请求分散到不同的时间段或者不同的处理节点,避免系统在高峰时期负载过重。

    • 分布式事务:在分布式系统中使用消息队列进行事件的发布和订阅,保证系统的一致性和可靠性。

    • 日志收集和数据分析:通过消息队列将日志数据发送到消息队列中进行集中收集和处理,方便进行数据分析和监控。

    消息消费顺序性问题

    很多时候,我们的MQ使用并不需要保证顺序消费,比如订单超时等。

    但有些时候,业务中可能会存在多个消息需要顺序处理的情况,比如在库存更新场景中,减少库存和增加库存的通知必须按照接收顺序处理,以防止库存数量错误或超卖现象。

    那这个时候我们该怎么实现我们消息消费顺序性呢?

    目前的方案是:

    • 单一消费者:一个队列绑定一个消费者,采用单活模式实现顺序消费

    • 分区策略:将不同的消息进行分区(Direct),然后不同的区绑定不同的消息队列,可以加大并发

    • 排序:顺序消息ID+手动排序

    这里是一个单活模式的例子:

    1. /**
    2.     * 创建一个 单活模式的队列
    3.     * @param name
    4.     * @return queue
    5.     */
    6.    private Queue creatQueue(String name) {
    7.        HashMap<String, Object> args = new HashMap<>();
    8.        // x-single-active-consumer 单活模式 队列
    9.        // 表示是否最多只允许一个消费者消费,如果有多个消费者同时绑定,则只会激活第一个,
    10.        // 除非第一个消费者被取消或者死亡,才会自动转到下一个消费者。
    11.        args.put("x-single-active-consumer", true);
    12.        return new Queue(name, true, false, false, args);
    13.   }

  • 相关阅读:
    COI实验室技能——自动切换显示器画面以及实现采集大量实验数据的方法(采集数据可供深度学习训练)
    苹果如何做ASO优化?优化的主要思路有哪些?
    腾讯云~docker onlyoffice7.1.1 word excel ppt在线编辑、在线预览_部署01
    接口设计的18条军规
    Qt创建线程(继承于QThread的方法)
    【Java 基础篇】Java 异常处理指南:解密异常处理的关键技巧
    1156:求π的值
    FineBI 将聚合之后的结果,再求平均值
    大数据算法系列9:字符串匹配问题,海量字符串处理
    Nacos--服务搭建以及SpringBoot整合--方法/实例
  • 原文地址:https://blog.csdn.net/qq_62074445/article/details/139596974