• RabbitMQ详解及其特性


    最近在准备面试,发现之前学习的RabbitMQ基本都忘了,于是乎,趁着复习准备的机会,顺便做一些RabbitMQ的知识整理工作

    要了解RabbitMQ,首先需要了解什么是MQ

    1.MQ(Message Queue)消息队列

    • 消息队列中间件,是分布式系统中的重要组件
    • 主要解决,异步处理,应用解耦,流量削峰等问题
    • 使用较多的消息队列产品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka等

    1.1异步处理

    异步处理,想必大家都了解,就是把同步处理的事情变成异步来做,有效降低了处理时间,常见的如注册的时候发送注册邮件,发送注册短信码等
    在这里插入图片描述

    1.2 应用解耦

    • 场景:订单系统需要通知库存系统
    • 如果库存系统异常,则订单调用库存失败,导致下单失败
      原因:订单系统和库存系统耦合度太高
      在这里插入图片描述
    • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户,下单成功;
    • 库存系统:订阅下单的消息,获取下单信息,库存系统根据下单信息,再进行库存操作;
    • 假如:下单的时候,库存系统不能正常运行,也不会影响下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现了订单系统和库存系统的应用解耦;
    • 所以说,消息队列是典型的:生产者消费者模型
    • 生产者不断的向消息队列中生产消息,消费者不断的从队列中获取消息
    • 因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的入侵,这样就实现了生产者和消费者的解耦
    • 此外,如果我们新增新的消费者服务,我们只需要实现消费者的需求,然后修改生产者的消息即可,不用做过多的侵入式编码操作,大大降低了耦合度

    1.3流量削峰

    • 抢购,秒杀等业务,针对高并发的场景
    • 因为流量过大,暴增会导致应用挂掉,为解决这个问题,在前端加入消息队列
      在这里插入图片描述
    • 用户的请求,服务器接收后,首先写入消息队列,如果超过队列的长度,就抛弃,甩一个秒杀结束的页面!
    • 我们要保证的是秒杀时的正常运行,秒杀成功后的业务处理我们完全可以后处理

    2 RabbitMQ背景知识介绍

    2.1 AMQP高级消息队列协议

    • 即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议
    • 基于此协议的客户端可以与消息中间件传递消息
    • 不受产品、开发语言等条件的限制

    2.2 JMS

    • Java Message Server,Java消息服务应用程序接口, 一种规范,和JDBC担任的角色类似
    • 是一个Java平台中关于面向消息中间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信

    2.3 二者的联系和区别

    • JMS是定义了统一接口,统一消息操作AMQP通过协议统一数据交互格式
    • JMS必须是java语言;AMQP只是协议,与语言无关

    2.4 Erlang语言

    • Erlang是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境
    • 最初是由爱立信专门为通信应用设计的,比如控制交换机或者变换协议等,因此非常适合构建分布式,实时软并行计算系统
    • Erlang运行时环境是一个虚拟机,有点像Java的虚拟机,这样代码一经编译,同样可以随处运行

    2.5 RabbitMQ的特点

    • Erlang开发,AMQP的最佳搭档,安装部署简单,上手门槛低
    • 企业级消息队列,经过大量实践考验的高可靠,大量成功的应用案例,例如阿里、网易等一线大厂都有使用
    • 有强大的WEB管理页面
    • 强大的社区支持,为技术进步提供动力
    • 支持消息持久化、支持消息确认机制、灵活的任务分发机制等,支持功能非常丰富,可靠性高
    • 集群扩展很容易,并且可以通过增加节点实现成倍的性能提升

    总结:
    如果你希望使用一个可靠性高、功能强大、易于管理的消息队列系统那么就选择RabbitMQ;
    如果你想用一个性能高,但偶尔丢点数据不是很在乎可以使用kafka或zeroMQ,kafka和zeroMQ的性能爆表,绝对可以压RabbitMQ一头!

    3 RabbitMQ各组件功能

    在这里插入图片描述

    • Broker:消息队列服务器实体
    • Virtual Host:虚拟主机
      • 标识一批交换机、消息队列和相关对象,形成的整体
      • 虚拟主机是共享相同的身份认证和加密环境的独立服务器域
      • 每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制
      • vhost是AMQP概念的基础,RabbitMQ默认的vhost是 /,必须在链接时指定
    • Exchange:交换器(路由)
      • 用来接收生产者发送的消息并将这些消息路由给服务器中的队列
    • Queue:消息队列
      • 用来保存消息直到发送给消费者。
      • 消息的容器,也是消息的终点。
      • 一个消息可投入一个或多个队列。
      • 消息一直在队列里面,等待消费者连接到这个队列将其取走。
    • Banding:绑定,用于消息队列和交换机之间的关联。
    • Channel:通道(信道)
      • 多路复用连接中的一条独立的双向数据流通道。
      • 信道是建立在真实的TCP连接内的虚拟链接
      • AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,都是通过信道完成的
      • 对于操作系统来说,建立和销毁TCP连接都是非常昂贵的开销,所以引入了信道的概念,用来复用TCP连接。
    • Connection:网络连接,比如一个TCP连接。
    • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
    • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
    • Message:消息
      • 消息是不具名的,它是由消息头和消息体组成。
      • 消息体是不透明的,而消息头则是由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(优先级)、delivery-mode(消息可能需要持久性存储[消息的路由模式])等。

    4 RabbitMQ安装

    RabbitMQ安装

    5 RabblitMQ 入门

    5.1导入依赖

    <dependencies>
        <dependency>
           <groupId>com.rabbitmqgroupId>
           <artifactId>amqp-clientartifactId>
           <version>5.7.3version>
        dependency> 
        <dependency>
           <groupId>org.slf4jgroupId> 
           <artifactId>slf4j-log4j12artifactId> 
           <version>1.7.25version> 
           <scope>compilescope> 
        dependency> 
        <dependency>
           <groupId>org.apache.commonsgroupId> 
           <artifactId>commons-lang3artifactId> 
           <version>3.9version> 
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    5.2 日志依赖log4j

    log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
    log4j.appender.stdout.Target=System.out 
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n 
    log4j.appender.file=org.apache.log4j.FileAppender 
    log4j.appender.file.File=rebbitmq.log 
    log4j.appender.file.layout=org.apache.log4j.PatternLayout 
    log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n 
    log4j.rootLogger=debug, stdout,file
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    5.3 创建连接

    • 先创建好虚拟主机,在RabbitMQ管理界面Virtual Hosts下,创建虚拟主机 /test
    public class ConnectionUtil {
        public static Connection getConnection() throws Exception{
            //1.创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //2.在工厂对象中设置MQ的连接信息(ip,port,vhost,username,password)
            factory.setHost("192.168.204.141");
            factory.setPort(5672);
            factory.setVirtualHost("/test");
            factory.setUsername("test"); factory.setPassword("123123");
            //3.通过工厂获得与MQ的连接
            Connection connection = factory.newConnection();
            return connection;
        }
    
        public static void main(String[] args) throws Exception{
            Connection connection = getConnection();
            System.out.println("connection = " + connection);
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    6. RabbitMQ模式

    • RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ
    • 在线手册:https://www.rabbitmq.com/getstarted.html
      在这里插入图片描述
      从左到右依次为:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式
      大致可以分为两类:
    • 1和2属于点对点模式
    • 3、4、5属于发布订阅模式,即1对多模式

    6.1 点对点模式:P2P(point to point)

    • 模式包含三个角色:消息队列(queue),发送者(sender),接收者(receiver)
    • 每个消息发送到一个特定的队列中,接收者从中获得消息
    • 队列中保留这些消息,直到他们被消费或超时
    • 特点:
      • 每个消息只有一个消费者,一旦消费,消息就不在队列中了
      • 发送者和接收者之间没有依赖性,发送者发送完成,不管接收者是否运行,都不会影响消息发送到队列中(我给你发微信,不管你看不看手机,反正我发完了),即松耦合
      • 接收者成功接收消息之后需向对象应答成功(确认),即发送回执ack

    6.2 发布订阅模式 publish(Pub)/subscribe(Sub)

    • pub/sub模式包含三个角色:交换机(exchange),发布者(publisher),订阅者
      (subcriber)
    • 多个发布者将消息发送交换机,系统将这些消息传递给多个订阅者
    • 特点:
      • 每个消息可以有多个订阅者,即一对多的机制
      • 发布者和订阅者之间在时间上有依赖,对于某个交换机的订阅者,必须创建一个订阅后,才能消费发布者的消息
      • 为了消费消息,订阅者必须保持运行状态

    6.3 简单模式

    在这里插入图片描述
    即发送、传递、接收的过程

    • 生产者
    public class Sender {
    
        public static void main(String[] args) throws Exception{
            String msg = "Hello,RabbitMQ!";
    
            // 1.获得连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.在连接中创建通道(信道)
            Channel channel = connection.createChannel();
            // 3.创建消息队列(1,2,3,4,5)
            /*
            参数1:队列的名称
            参数2:队列中的数据是否持久化
            参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用)
            参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据)
            参数5:队列参数(没有参数为null)
            */
            channel.queueDeclare("queue1",false,false,false,null);
            // 4.向指定的队列发送消息(1,2,3,4)
            /*
            参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为""
            参数2:目标队列的名称
            参数3:设置消息的属性(没有属性则为null)
            参数4:消息的内容(只接收字节数组)
            */
            channel.basicPublish("","queue1",null,msg.getBytes());
            System.out.println("发送:" + msg);
            // 5.释放资源
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认
    在这里插入图片描述

    • 消费者
    public class Recer {
    
        public static void main(String[] args) throws Exception{
            // 1.获得连接
            Connection connection = ConnectionUtil.getConnection();
            // 2.在连接中创建通道(信道)
            Channel channel = connection.createChannel();
            // 3.从信道中获得消息
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                //交付处理(收件人信息,包裹上的快递标签,协议的配置,消息)
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // body就是从队列中获取的消息
                    String s = new String(body);
                    System.out.println("接收 = " + s);
                }
            };
            // 4.监听队列 true:自动消息确认
            channel.basicConsume("queue1", true,consumer);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0
    在这里插入图片描述

    6.3.1 消息确认机制ACK

    从上面可以看出,消息一旦被消费,消息就会立刻从队列中移除,那么问题来了,RabbitMQ是如何感知到消息被消费了呢?
    解决方案
    基于上面的问题,RabbitMQ提供了一种ACK消息确认机制,当消费者获得消息后,会向RabbitMQ发送回执ACK,告知消息已经被接收
    分类

    • 自动ACK:消息接收后,消费者立刻自动发送ACK,类似于事务的自动提交,常用于消息不太重要的情况,丢失也没有影响
    • 手动ACK:消息接收后,不会发送ACK,需程序手动调用,类似于事务的手动提交,常用于重要的消息,如消费、退款等

    代码

    // false:手动消息确认 
    channel.basicConsume("queue1", false, consumer);
    
    • 1
    • 2

    6.4 工作队列模式

    在这里插入图片描述
    如果生产者生产消息过多,而消费者来不及消费,这时候堆积的消息就会越来越多,那这时候我们该如何做呢?可以考虑增加消费者,让更多的消费者来消费消息便可解决这一情况

    注意:如何解决消息堆积的情况?
    采用工作队列的模式,多个消费者监听同一个队列,接收到消息后,通过线程池,进行异步消费

    6.5 发布订阅模式

    在这里插入图片描述
    发布订阅模式其实本质上就是在工作队列的模式上进行了改造,不同的是,工作队列模式是每个任务都被准确地交付给一个工作者,而发布订阅模式是可以将每个任务交付给binding的每一个工作者,类似于广播的机制,广播给每一个消费者

    流程

    • P生产者发送信息给X路由,X将信息转发给绑定X的队列
    • X队列将信息通过信道发送给消费者,从而进行消费
    • 整个过程,必须先创建路由
      • 路由在生产者程序中创建
      • 因为路由没有存储消息的能力,当生产者将信息发送给路由后,消费者还没有运行,所以没有队列,路由并不知道将信息发送给谁
      • 运行程序的顺序:
        1. MessageSender
        2. MessageReceiver1和MessageReceiver2
        3. MessageSender

    代码

    • 生产者
    public class Sender {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 声明路由(路由名,路由类型) 
            // fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该 路由绑定的所有队列上) 
            channel.exchangeDeclare("test_exchange_fanout", "fanout"); 
            String msg = "hello,大家好!"; 
            channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());
            System.out.println("生产者:" + msg); 
            channel.close();
            connection.close(); 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 消费者
    public class Recer1 {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            // 声明队列
            channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null);
            // 绑定路由(关注)
            /*
            参数1:队列名
            参数2:交换器名称
            参数3:路由key(暂时无用,""即可)
            */
            channel.queueBind("test_exchange_fanout_queue_1", "test_exchange_fanout", "");
            DefaultConsumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String s = new String(body);
                System.out.println("【消费者】 = " + s);
            }
        };
            // 4.监听队列 true:自动消息确认
            channel.basicConsume("test_exchange_fanout_queue_1", true,consumer); 
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    6.6 路由模式

    在这里插入图片描述
    路由会根据类型进行定向分发消息给不同的队列

    代码实现

    // 声明路由(路由名,路由类型)
    // direct:根据路由键进行定向分发消息
    channel.exchangeDeclare("test_exchange_direct", "direct");
    
    • 1
    • 2
    • 3
    // 绑定路由(如果路由键的类型是添加的话,绑定到这个队列1上)
    channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "insert");
    
    • 1
    • 2

    注意:

    1. 记住运行程序的顺序,先运行一次sender(创建路由器),
    2. 有了路由器之后,在创建两个Recer1和Recer2,进行队列绑定
    3. 再次运行sender,发出消息

    6.7 通配符模式

    在这里插入图片描述

    • 和路由模式90%是一样的。
    • 唯独的区别就是路由键支持模糊匹配
    • 匹配符号
    • *:只能匹配一个词(正好一个词,多一个不行,少一个也不行)
    • #:匹配0个或更多个词

    7.持久化机制

    • 消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何避免消息丢失?
      • 消费者的ACK确认机制,可以防止消费者丢失消息
      • 万一在消费者消费之前,RabbitMQ服务器宕机了,那消息也会丢失
    • 想要将消息持久化,那么 路由和队列都要持久化才可以

    生产者

    // 声明路由(路由名,路由类型,持久化) 
    channel.exchangeDeclare("test_exchange_topic", "topic",true); String msg = "商品降价"; 
    // 发送消息(第三个参数作用是让消息持久化) 
    channel.basicPublish("test_exchange_topic", "product.price",MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    
    • 1
    • 2
    • 3
    • 4

    消费者

    // 声明队列( 第二个参数为true:支持持久化) 
    channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null);
    
    • 1
    • 2

    8. 消息成功确认机制

    在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?

    • 事务机制
    • 发布确认机制

    8.1 事务机制

    • AMQP协议提供的一种保证消息成功投递的方式,通过信道开启 transactional 模式,主要包括以下三个方法:
      • channel.txSelect(): 开启事务
      • channel.txCommit() :提交事务
      • channel.txRollback() :回滚事务

    代码示例

    public class Sender {
        public static void main(String[] args) throws Exception {
            Connection connection = ConnectionUtil.getConnection();
            Channel channel = connection.createChannel();
            channel.exchangeDeclare("test_transaction", "topic");
            channel.txSelect(); // 开启事务
           try {
               channel.basicPublish("test_transaction", "product.price", null, "商品 降价1".getBytes());
               // System.out.println(1/0); // 模拟异常!
               channel.basicPublish("test_transaction", "product.price", null, "商品 降价2".getBytes());
               System.out.println("消息全部发出!");
               channel.txCommit(); // 事务提交 
           }
           catch (Exception e){
               System.out.println("由于系统异常,消息全部撤回!");
               channel.txRollback(); // 事务回滚
               e.printStackTrace();
           }finally {
               channel.close();
               connection.close();
           }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    8.2 Confirm发布确认机制

    RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是采用事务会大大降低消息的吞吐量。试想一下:10条消息,前9条成功,如果第10条失败,那么9条消息要全部撤销回滚,太浪费性能了。那么有没有其它更好的解放方法呢 ?有,那就是Confirm模式,confirm模式采用补发第10条的措施来完成10条消息的送达

    使用步骤

    • 在spring-rabbitmq-producer.xml中,启动生产者确认机制
     
    <rabbit:connection-factory id="connectionFactory"
    host="192.168.204.141" port="5672" 
    username="test" password="123123"
    virtual-host="/test" publisher-confirms="true" />
    
     
    <rabbit:template id="rabbitTemplate" connection-
    factory="connectionFactory" exchange="spring_topic_exchange" 
    message-converter="jsonMessageConverter" confirm-callback="msgSendConfirmCallback"/>
    
    
    <bean id="msgSendConfirmCallback" class="confirm.MsgSendConfirmCallback"/>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    @Component
    public class MsgSendConfirmCallback implements RabbitTemplate.ConfirmCallback {
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            if (b){
                System.out.println("消息确认成功!!");
            } else {
                System.out.println("消息确认失败。。。");
                // 如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用消息补发
                // 采用递归(固定次数,不可无限)或 redis+定时任务 
                } 
        } 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    生产者

    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); 
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
    //获得rabbitTemplate的实例后,通过rabbitTemplate.convertAndSend()发送消息
    
    • 1
    • 2
    • 3

    9.消费端限流

    假设一直情况:消费者客户端宕机了,Rabbitmq 服务器积压了成千上万条未处理的消息,修好消费者客户端后,随便打开一个消费者客户端,就会
    出现这样的情况: 巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法同时处理这么多数据,就会被压垮崩溃,再次宕机
    解决方案:

    • 为了应对这种情况,我们应该对消费端进行限流,用于保持消费端的稳定
    • RabbitMQ 提供了一种 Qos (Quality of Service,服务质量)服务质量保证功能,即在非自动确认消息的前提下,如果一定数目的消息未被确认前,不再进行消费新的消息
     
     
     
    <rabbit:listener-container connection-factory="connectionFactory" 
    prefetch="3" acknowledge="manual">
    <rabbit:listener ref="consumerListener" queue- names="test_spring_queue_1" /> 
    rabbit:listener-container>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    10.过期时间TTL

    订单中常用于过期订单,如30分钟自动取消

    • Time To Live:生存时间、还能活多久,单位毫秒,在这个周期内,消息可以被消费者正常消费,超过这个时间,则自动删除(其实是被称为dead message并投入到死信队列,无法消费该消息)
    • RabbitMQ可以对消息和队列设置TTL
      • 通过队列设置,队列中所有消息都有相同的过期时间
      • 对消息单独设置,每条消息的TTL可以不同(更颗粒化)

    10.1设置队列TTL(针对于一类消息)

    spring-rabbitmq-producer.xml

    
    <rabbit:queue name="test_spring_queue_ttl" auto-declare="true"> 
       <rabbit:queue-arguments> 
          <entry key="x-message-ttl" value-type="long" value="5000">entry>      
       rabbit:queue-arguments> 
    rabbit:queue>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    • 5s之后,消息自动删除
      在这里插入图片描述

    10.2设置消息TTL(针对于单条消息)

    • 设置某条消息的ttl,只需要在生产者创建发送消息时指定即可
    ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); 
    RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
    // 创建消息配置对象
    MessageProperties messageProperties = new MessageProperties();
    // 设置消息过期时间 
    messageProperties.setExpiration("6000");
    // 创建消息 
    Message message = new Message("6秒后自动删除".getBytes(), messageProperties);
    // 发送消息 
    rabbitTemplate.convertAndSend("msg.user", message);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    注意:如果同时设置了queue和message的TTL值,则二者中较小的才会起作用

    11. 死信队列

    • DLX(Dead Letter Exchanges)死信交换机/死信邮箱,当消息在队列中由于某些原因没有被及时消费而变成死信(dead message)后,这些消息就会被分发到DLX交换机中,而绑定DLX交换机的队列,称之为:“死信队列”
    • 消息没有被及时消费的原因:
      • 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false
      • 消息超时未消费
      • 达到最大队列长度
        在这里插入图片描述
    • spring-rabbitmq-producer-dlx.xml
     
    <rabbit:queue name="dlx_queue"/> 
     
    <rabbit:direct-exchange name="dlx_exchange" > 
        <rabbit:bindings> 
            <rabbit:binding key="dlx_ttl" queue="dlx_queue">rabbit:binding> 
            <rabbit:binding key="dlx_max" queue="dlx_queue">rabbit:binding> 
        rabbit:bindings> 
    rabbit:direct-exchange>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    12. 延迟队列

    • 延迟队列:TTL + 死信队列的合体
    • 死信队列只是一种特殊的队列,里面的消息仍然可以消费
    • 在电商开发部分中,都会涉及到延时关闭订单,此时延迟队列正好可以解决这个问题

    使用步骤:

    • 生产者:沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可
    • 消费者
      spring-rabbitmq-consumer.xml
     
    <rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual"> 
        <rabbit:listener ref="consumerListener" queue-names="dlx_queue" /> 
    rabbit:listener-container>
    
    • 1
    • 2
    • 3
    • 4
  • 相关阅读:
    轻量云服务器租用好在哪
    MongoDB - 安装
    TreeBERT:基于树的编程语言预训练模型。
    Spring Boot集成Redisson实现延迟队列
    Spark基础【介绍、入门WordCount案例】
    UE5——动画重定向
    springboot+高校失物招领系统 毕业设计-附源码121441
    蓝桥杯 题库 简单 每日十题 day5
    CommonJS,ES6 Module以及webpack模块打包原理
    Feign负载均衡写法
  • 原文地址:https://blog.csdn.net/mr4569870/article/details/126627682