• 四、RabbitMQ TTL、死信队列以及延迟队列


    TTL

    TTL 全称 Time To Live(存活时间/过期时间)。当消息到达存活时间后,还没有被消费,就会被自动清除。RabbitMQ可以设置两种过期时间:

    • 对消息设置过期时间。
    • 对整个队列(Queue)设置过期时间。

    如何设置

    • 设置队列过期时间使用参数:x-message-ttl,单位:ms(毫秒),会对整个队列消息统一过期。
    • 设置消息过期时间使用参数:expiration,单位:ms(毫秒),当该消息在队列头部时(消费时),会单独判断这一消息是否过期。

    如果两者都设置了过期时间,以时间短的为准。

    在streadway/amqp库提供的API中设置TTL

    设置队列过期时间:

    QueueDeclare函数的最后一个参数是一个amqp.Table类型,它的声明是这样的: type Table map[string]interface{},其实是一个可以用于设置队列属性的map。

    // 设置Queue ttl为5s
    args := amqp.Table{"x-message-ttl": 5000}
    
    q, e := ch.QueueDeclare(
    		name,  //队列名
    		false, 
    		true, 
    		false, 
    		false,
    		args,   //设置Queue ttl为5s
    	)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    设置消息过期时间:

    e = q.channel.Publish(
    		"",    
    		queue, 
    		false, 
    		false, 
    		amqp.Publishing{
    			// 设置当前发送消息的过期时间为3s
    			Expiration: "3000",
    			ReplyTo:    q.Name,
    			Body:       []byte(str),
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    死信队列

    当一个队列中存在死信时,RabbitMQ会把消息发送给DLX(死信交换机),进而被路由到另一个队列中,这个队列就叫做死信队列。

    死信就是指没有被消费者消费成功的消息,一条消息变成死信有三种情况:

    1. 如果给消息队列设置了最大容量x-max-length,队列已经满了,后续再进来的消息会溢出,无法被队列接收就会变成死信。
    2. 消息接收时被拒绝会变成死信,例如调用Reject()函数,并设置requeuefalse
    3. 如果给消息队列设置了消息的过期时间x-message-ttl,或者发送消息时设置了当前消息的过期时间,当消息在队列中的存活时间大于过期时间时,就会变成死信。

    如何将死信发送给DLX

    为队列设置参数即可,将要发送死信的队列配置以下两个参数:

    x-dead-letter-exchange: [DLX的名字]
    x-dead-letter-routing-key: [DLX的routing key]
    
    • 1
    • 2

    下面是死信队列的工作流程:

    在这里插入图片描述

    延迟队列

    延时队列就是用来存放需要在指定时间被处理的元素的队列,通常可以用来处理一些具有过期性操作的业务。

    比如十分钟内未支付则取消订单,原先这个功能我们可以使用定时器来实现,即每隔一段时间去数据库对比未支付订单的当前时间与订单创建时间。但是定时器的时长难以确定,太长会导致订单失效时间出现误差,太短则会增大数据库压力。

    实现

    在RabbitMQ中没有提供延迟队列的功能,但是我们可以使用:TTL+死信队列组合的方式来实现延迟队列的效果。

    下面是实现延迟队列的流程图:

    在这里插入图片描述

    Go实现延迟队列

    创建一个死信交换机

    在这里插入图片描述

    再创建一个死信队列

    在这里插入图片描述

    将死信队列绑定至死信交换机

    在这里插入图片描述

    创建一个正常队列,并指定消息过期后被发往的死信交换机

    在这里插入图片描述
    生产者

    func main() {
    	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
    	ch, _ := conn.Channel()
    	body := "This is a delayed message, created at " + time.Now().Format("2006-01-02 15:04:05")
    	fmt.Println(body)
    	// 发送消息到queue.normal队列中
    	ch.Publish("", "queue.normal", false, false, amqp.Publishing{
    
    		Body:       []byte(body),
    		Expiration: "10000", // 设置TTL为10秒
    	})
    
    	defer conn.Close()
    	defer ch.Close()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    消费者

    func main() {
    	conn, _ := amqp.Dial("amqp://guest:guest@35.76.111.125:5672/")
    	ch, _ := conn.Channel()
    
    	//监听queue.dlx队列
    	msgs, _ := ch.Consume(
    		"queue.dlx",
    		"",
    		true,
    		false,
    		false,
    		false,
    		nil,
    	)
    	for d := range msgs {
    		fmt.Printf("receive: %s\n", d.Body) // 收到消息,业务处理
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    流程说明

    生产者生产一条消息,然后指定消息的TTL为10s,接着将消息发给普通队列,消息在普通队列中过期后被发往死信交换机,死信交换机将这条消息路由给延迟队列。消费者一直在监听到延迟队列中的死信后,开始消费。

  • 相关阅读:
    运动蓝牙耳机哪种款式好用、专业的蓝牙运动耳机推荐
    Linux动态库
    【C++】List -- 详解
    数据分析——A/B测试二:优惠券AB测试项目
    transient关键字使用说明
    数据库可视化工具分享 (DBeaver)
    房地产小程序 | 小程序赋能,房地产业务数字化升级
    unordered_map算法
    Prometheus监控之SNMP Exporter介绍和数据展现
    JavaScript中遍历对象的方法
  • 原文地址:https://blog.csdn.net/qq_49723651/article/details/127720469