• 基于RabbitMQ构建延迟队列


    延迟任务在业务中是一个很常见的需求,比如:

    订单下单15分钟之后,用户没有支付,则自动取消订单 用户做了某些操作,5分钟之后发短信提醒用户 诸如此类的场景比比皆是,一种最常见的实现方式,就是开启一个定时任务,然后一直轮询数据库,这种实现方式在数据量小的时候还好,但是数据量一旦过大,这轮询数据库就会给数据库造成很大的压力,此时全面扫表的实现方式就显得不可靠了。

    另外一种实现方式,就是用延迟队列的方式来实现,但是RabbitMQ本身是没有实现延迟队列的,不过可以使用TTL+死信队列的方式来实现延迟队列。

    消息的TTL

    TTL全称Time To Live,即生存时间。消息的TTL也就是消息的生存时间。在RabbitMQ中设置TTL有两种

    第一种是声明队列的时候,在队列的属性中设置TTL,这样该队列中的消息都会有相同的有效期 第二种是发送消息时给消息设置属性,可以为每条消息都设置不同的TTL 如果两者都设置,生存时间取两者最小的那一个。这里我们采用第二种,即为每条消息设置TTL

    死信交换机/死信队列

    一个消息在满足如下的条件的时候,就会变成“死信”,并且能被投递到死信交换机(Dead-Letter-Exchange),最后进入到死信交换机绑定的队列,也称死信队列(Dead-Letter-Queue)

    • 消息被拒绝而且requeue=false
    • 消息的TTL到了,即消息过期
    • 队列排满了,排在前面的消息会被丢弃或者扔到死信路由上

    死信交换机和普通的交换机是没有区别的,只是某一个设置死信交换机的队列中有消息过期了,会自动触发消息的转发,发送到死信交换机中去,再由死信交换机转发到死信队列中。死信队列也是一个普通的队列,并没有什么其它特殊的。

    延迟队列的实现

    接着来看看TTL+死信交换机是如何实现延迟队列的

    alt

    上面的流程就是实现延迟队列的思路,比方说15分钟取消订单,那么用户下单之后,消息的TTL设置为15分钟,当消息在Queue1待的时间到了15分钟,那么就会被转发到Dead-Letter-Exchange,从而转发到Dead-Letter-Queue,最后被消费者消费,实现延迟任务。

    先在RabbitMQ控制台创建一个名为dlx的交换机,作为死信交换机,并绑定上一个dlxQueue队列,作为Dead-Letter-Queue

    alt
    // 生产者.go
    package main

    import (
        "github.com/streadway/amqp"
        "mq/fail"
    )

    func main() {
        conn, err := amqp.Dial("amqp://123:123@localhost:5672")
        fail.OnError(err)
        defer conn.Close()

        ch, err := conn.Channel()
        fail.OnError(err)
        defer ch.Close()

        args := amqp.Table{"x-dead-letter-exchange""dlx"
        q, err := ch.QueueDeclare("test"truefalsefalsefalse, args) // 声明一个test队列,并设置队列的死信交换机为"dlx"

        body := "hello world1"
        for i := 0; i < 10; i++ {
            err = ch.Publish("", q.Name, falsefalse, amqp.Publishing{
                Body:       []byte(body),
                Expiration: "5000", // 设置TTL为5秒
            })
            fail.OnError(err)
        }
    }
    • 1

    启动生产者,可以看到消息被投递到test队列中

    alt

    5秒之后,消息被转发到dlxQueue队列中

    alt

    之后有一个消费者,专门处理这个dlxQueu队列中的消息

    // 消费者.go
    package main

    import (
        "fmt"
        "github.com/streadway/amqp"
        "mq/fail"
    )

    func main() {
        conn, err := amqp.Dial("amqp://123:123@localhost:5672")
        fail.OnError(err)

        c, err := conn.Channel()
        fail.OnError(err)

        msgs, err := c.Consume("dlxQueue"""truefalsefalsefalse, nil) //监听dlxQueue队列
        fail.OnError(err)

        for d := range msgs {
            fmt.Printf("收到信息: %s\n", d.Body) // 收到消息,业务处理
        }
    }
    • 1
    // 5秒之后,打印
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    // 收到信息: hello world1
    • 1

    总结 使用TTL+死信交换机实现延迟任务还是非常方便的,除此之外还可以使用相关的插件abbitmq-delayed-message-exchange,来实现延迟队列,也是非常的方便。

    alt

    转自:

    juejin.cn/post/6844904142155022344

    本文由 mdnice 多平台发布

  • 相关阅读:
    【SQL刷题】DAY15----SQL联结表专项练习
    机器学习股票大数据量化分析与预测系统 - python 计算机竞赛
    通过IP地址进行精准定位技术、方法与隐私问题的探讨
    【SpringBoot的自动配置--下篇】架构师如何自定义自己的条件注解与自动配置
    进程环境+进程终止
    使用 Shell 脚本定期检查 MySQL 服务是否正常运行
    物联网专业前景怎么样?
    Vue2 解决computed返回值未能渲染到DOM的问题
    设计模式之【策略模式】
    这就是程序员眼中的函数吗?(一)
  • 原文地址:https://blog.csdn.net/qq_39787367/article/details/126017050