• RabbitMQ延迟消息:死信队列 | 延迟插件 | 二合一用法+踩坑手记+最佳使用心得


    🚀 优质资源分享 🚀

    学习路线指引(点击解锁)知识定位人群定位
    🧡 Python实战微信订餐小程序 🧡进阶级本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
    💛Python量化交易实战💛入门级手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

    前言

    前段时间写过一篇:

    # RabbitMQ:消息丢失 | 消息重复 | 消息积压的原因+解决方案+网上学不到的使用心得

    很多人加了我好友,说很喜欢这篇文章,也问了我一些问题。

    因为最近工作比较忙,隔了一段时间没写,忙完后专门花时间把RabbitMQ剩下的一个重要技术点通过案例的方式整理出来,就是延迟消息的用法。

    延迟消息含义不解释了,就是字面意思。

    用法一共两种方式,死信队列和延迟插件,两种各有利弊,我会一一陈述并给出最佳用法。

    死信队列方式

    死信队列不要理解成很玄乎的东西,它就是普通队列绑定了死信交换机,而且配置参数还是固定的,无需动脑,作用的话你想象成回收站就好了,被拒绝或超时的消息就往这里边丢,然后还能继续被消费,就这么简单。

    1、原理图解

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iRCzs6rZ-1659200994136)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4237caf329e440aeaf93544f67148071~tplv-k3u1fbpfcp-watermark.image?)]

    2、引入MQ

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ayr5XjWg-1659200994139)(https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/0836a7113e40483fb1b398306eb3aa10~tplv-k3u1fbpfcp-watermark.image?)]

    3、声明交换机和队列

    声明普通交换机、队列、路由,这里我们声明两个预备延迟的队列,名称分别包含5s和15min,用来区分延迟消息是否达到预期效果。

    我们接下来所有交换机和队列都是以Direct模式来创建的,也就是点对点方式,具体原因后面会讲。

    另外,注意这里注释的延迟交换机、队列,都是为了特别说明,其实还是普通队列,参考上面的原理图解。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u6BGXthH-1659200994141)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/530470928c604f8eb231d5c42a041d01~tplv-k3u1fbpfcp-watermark.image?)]

    创建交换机、队列、绑定关系。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Nztk1XlF-1659200994142)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c1eb460a242149388021ca2a7d76ebdc~tplv-k3u1fbpfcp-watermark.image?)]

    聪明的小伙伴应该能发现,上面这段代码只有交换机和绑定队列的关系,却没有创建队列。

    没错,接下来就是重点部分,创建队列时,要绑定死信交换机,这样就变成了一个死信队列。

    可以看到,5s和15min的队列绑定的都是同一个死信交换机,只是路由规则、消息过期时间TTL不同。

    这样,在项目启动后,RabbitMQ就会创建出两个具备不同过期时间的死信队列,后面会有截图专门给大家看。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oTefrKQO-1659200994143)(https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/79d9aab2d2444a99978c0f5509ce05af~tplv-k3u1fbpfcp-watermark.image?)]

    绑定后的效果,在项目启动后RabbitMQ会把交换机和队列都创建出来,在控制台就能看到。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mQzmdokY-1659200994144)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/bf29de5dc56e4a1f8108c54621187eda~tplv-k3u1fbpfcp-watermark.image?)]

    普通队列绑定死信交换机和对应的路由规则后,我们接下来就把死信交换机、路由规则、队列创建出来即可,其实和创建普通队列没区别。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0oCXydJd-1659200994145)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ceb19456630d428a87611f0b363c8aa5~tplv-k3u1fbpfcp-watermark.image?)]

    4、yml配置

    为了演示方便,我们的生产者和消费者是写在同一个项目中的,所以配置文件没有区别。但是在线上环境中,为了解耦生产者和消费者往往是分开的。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-o0YJTc2h-1659200994146)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1a832c8f68734c7395f496069b6fe7e4~tplv-k3u1fbpfcp-watermark.image?)]

    这里可以发现,我们给RabbitMQ开启了消息确认机制,读过开头提过那篇文章的小伙伴应该知道,线上环境我们为了提高性能一般是不打开确认机制的,这里之所以打开,是为了演示消息的投递情况,同时也为了特别讲后面延迟插件会出现的一个问题。

    5、创建生产者

    这里加了诸如消息唯一ID、消息确认机制的写法,单纯为了展示给大家看,实际上你可以不加。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kEfcMJ50-1659200994147)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/93a45628c4164285802bdd84fda43394~tplv-k3u1fbpfcp-watermark.image?)]

    6、创建消费者

    这里注意,监听的队列也就是我们前面声明的死信队列,因为过期的消息都通过绑定的死信交换机转发到了里面,如果对过程有疑惑,可以回到开头的图解那里对着图片来看。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8qGc7bFF-1659200994148)(https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/4cd4848876ff4b21a61cd3c2dcfc6aee~tplv-k3u1fbpfcp-watermark.image?)]

    7、测试接口

    分别创建了5s延迟和15min延迟的测试接口

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-zvOWTjC5-1659200994149)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c04ac3d97b8c4794a39ed8a221e17049~tplv-k3u1fbpfcp-watermark.image?)]

    8、效果

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JyHNGAji-1659200994150)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e2d9f037071d418d80984e5cdd062c36~tplv-k3u1fbpfcp-watermark.image?)]

    延迟插件方式

    插件方式,要比死信队列方式简单得多,只需要安装插件,启动插件功能,然后创建延迟队列即可。

    1、安装插件

    这里给出源码安装方式和docker安装方式,大家根据各自情况自己选。

    1)、源码安装

    下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

    本次演示下载的是3.8.0版本插件,它能兼容3.7.x和3.8.x的RabbitMQ

    直通车:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/v3.8.0

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bIg9Eizl-1659200994151)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e6056833b2f24ddc8d797019b5af6352~tplv-k3u1fbpfcp-watermark.image?)]

    提醒一下,要下载和RabbitMQ对应的版本,否则延迟队列不会生效。这里面有些版本是兼容低版本MQ的,可以点击进去具体查看支持的版本。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1NyCiIEb-1659200994153)(https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/1f9a2256c0ba4a0dac205449da6dee92~tplv-k3u1fbpfcp-watermark.image?)]

    将下载好的插件上传到RabbitMQ的plugins下:rabbitmq_server-3.7.24/plugins

    然后开启延迟队列:rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    这样就可以开始愉快的使用啦,具体的操作和图示在下面的docker安装步骤里展示出来,这俩的操作没啥区别。

    2)、docker安装

    首先,还是一样的步骤,把下载好的插件上传到RabbitMQ服务器上,因为是docker方式,所以你还得把它上传到docker容器中。

    复制文件到docker容器中:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dAGvLYuu-1659200994154)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f9e991ebf07d48a4b56ec421100fe068~tplv-k3u1fbpfcp-watermark.image?)]

    进入容器查看是否上传成功

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gPz37Li7-1659200994155)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7c8d7f0833a4401e80366f376eba0d9b~tplv-k3u1fbpfcp-watermark.image?)]

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vyxqGKBl-1659200994156)(https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/414117c4b2b9426088bd118a6c0b3b2f~tplv-k3u1fbpfcp-watermark.image?)]

    开启延迟插件,开启成功后就是下面的提示效果。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d2VpctQY-1659200994157)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5b3236b9afb84f1d8f59f03512f086a3~tplv-k3u1fbpfcp-watermark.image?)]

    重启RabbitMQ,不重启不会生效,如果重启没效果,你可能需要kill进程再启动。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HKIEOLBz-1659200994158)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/a3fe22b911204bf98b3600c4b279c48e~tplv-k3u1fbpfcp-watermark.image?)]

    打开控制台界面,如果看到创建交换机的选项中有了x-delayed-message,就表示延迟插件安装和启动成功。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mvfPDVQe-1659200994159)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/85d07021858147bf80526ec185309d30~tplv-k3u1fbpfcp-watermark.image?)]

    2、yml配置

    这里我专门画了红框,是因为这个配置在延迟插件中有一个作用,可以加也可以不加,后面的踩坑手记会单独解释。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tiYI0I4W-1659200994160)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/e9705ecb3522430192ddba8f58266a55~tplv-k3u1fbpfcp-watermark.image?)]

    3、声明交换机和队列

    这里只需注意一点,交换机创建时要设置为延迟交换机,也就是setDelayed(true)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3ctHeJbc-1659200994161)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/55853d1ab177480e84f6aa7dd9acaf2d~tplv-k3u1fbpfcp-watermark.image?)]

    4、消息处理器工具类

    我们通过MessagePostProcessor这个钩子来设置持久化模式和延迟时间,项目中可能多个地方会用到,所以单独抽取出来通过工具类获取对象。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-T3CNHyqi-1659200994162)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9d57a405d5c54e2aadc6fcdb97f5fc0b~tplv-k3u1fbpfcp-watermark.image?)]

    5、创建生产者

    这个写法是固定的,可以直接复制到自己的项目中使用,延迟时间自己定义。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TZVPzeit-1659200994163)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/563f89a8a6e24c9992fc9a8db6cea74b~tplv-k3u1fbpfcp-watermark.image?)]

    6、创建消费者

    直接监听这个延迟队列即可,没有特别的地方。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Usp7U0X4-1659200994164)(https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/bdb2b113c8b148e1bf9fbabe31937161~tplv-k3u1fbpfcp-watermark.image?)]

    7、测试接口

    这里我们测试这个6s延迟的消息是否成功

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ymxy4Grs-1659200994165)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/8882e1f4b566435bb612ee2175fcd206~tplv-k3u1fbpfcp-watermark.image?)]

    8、效果

    可以看到,刚好6s后消费了消息,延迟效果没问题。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XNAWW7tU-1659200994166)(https://p9-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/f6addff6c5384f8f9fa4a363afb85aba~tplv-k3u1fbpfcp-watermark.image?)]

    踩坑手记

    1、死信队列坑点

    其实死信队列的坑点比插件要少多了,但是死信队列没有插件那么简单直接好理解。

    1)、原理一定要弄明白,否则你连交换机和队列怎么创建怎么绑定都不知道;

    2)、绑定死信交换机时,x-dead-letter这些参数对应的值一定要写对,尤其是路由,写错了会导致过期消息不进入死信队列,找半天原因都找不到;

    2、延迟插件坑点

    延迟插件的坑点就真的多了,笔者用死信队列方式几乎是1次就成功,插件反倒折腾了我好几个回合。

    1)、延迟插件的版本一定要下载对,和RabbitMQ本身版本对应,最好是点进去看下说明,一般都会告诉你兼容哪个版本,如果下载错了,我只能为你默哀;

    2)、不要下载过高版本的RabbitMQ和插件,你可能会疯掉;

    3)、开启插件后一定要重启RabbitMQ,否则不生效,如果重启也没生效,你可以尝试下kill掉MQ的进程,然后再启动;

    4)、开启消息确认机制后,你会发现延迟插件很特殊的一个地方,就是每次投递消息都会进入returncallback回调中。

    我着重来说明下最后一个坑点,也谈不上是坑点,只是你以后使用过程中很可能会产生疑惑。

    还记得前文中yml文件画一个红框的配置吗,我们把那个参数注释掉看看效果。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SRMutYiI-1659200994167)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/5b7cf946d86b43538fcf093777ed3275~tplv-k3u1fbpfcp-watermark.image?)]

    你会发现,一旦开启了消息确认机制,延迟消息每次都会先进入returncallback回调,然后才会投递成功,你们可以自己试一试,每次都会这样。

    原因是什么呢?

    这里就要提到延迟插件的原理了,从前文创建延迟交换机那里就可以看到,是给交换机设置了delayed:true,因为延迟消息实际上是挂载到交换机上,不会马上就通过路由投递出去

    那么我们再来看看,上面这个图中returncallback回调打印出来的返回信息:replyText=NO_ROUTE,很明显了吧,说是没有路由,所以消息确认失败了,因为延迟插件没有马上通过路由投递。

    那又有疑问了,没有马上投递,为什么会进入returncallback回调呢?

    下面这张图是源码中的一段,告诉你了为什么。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AJKd5oYl-1659200994168)(https://p6-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/56c085cbc5a6423f8325c343d9e0887e~tplv-k3u1fbpfcp-watermark.image?)]

    因为有个mandatory参数,如果不配置它的话,它是为null的,当为null的时候,传递的值是this.properties.isPublisherReturns()

    官方对mandatory的解释如下:

    Enable mandatory messages. If a mandatory message cannot be routed to a queue by the server, it will return an unroutable message with a Return method.

    翻译过来:

    开启强制性的消息。如果强制消息不能被服务器路由到队列,它将使用return方法返回一个不可路由的消息。

    所以,mandatory为true表示开启强制投递,为false表示不强制,而且这个值可以为null。

    而我们前面yml文件中开启了消息确认机制:publisher-returns: true

    所以,每次一定会走returncallback回调。

    因此,我们要么不开启消息确认机制,要么就把mandatory设置为false,这样延迟插件的消息就不会每次走一遍回调了

    最佳使用心得

    1)、延迟插件方式更方便,但我建议首选死信队列方式,因为死信队列在RabbitMQ的使用中占比还挺重的,它不仅可以用在延迟消息,还可以用在其他很多地方,另外死信队列是必学的,并且拿来即用免去了安装插件带来的风险;

    2)、死信队列在延迟消息这块其实是有隐形BUG的,它在多个延迟时间场景同时存在的情况下有先后执行顺序的问题,可能出现15min的延迟消息在前面,导致5s的延迟消息要等待15min的执行完了再执行,这是RabbitMQ本身的有序机制导致的,只对队尾消息判定,所以我们在使用时一定要做延迟消息隔离,不同延迟场景要分开处理;

    3)、延迟插件就没有上述死信队列的问题,已经专门处理了复杂场景下的有序问题,所以一个项目在开始之初决定要使用RabbitMQ的时候,不管未来用不用得上,请务必就在安装时顺便也把延迟插件也装上,避免项目中期忽然想用时安装插件必须重启RabbitMQ带来的未知风险;

    4)、大部分情况下,死信队列完全足够,但切记项目中不要频繁使用延迟队列,基本上用到的场景会很少,比如定时关单、定时释放锁资源等等,特定的对延迟时间准确性要求很高的场景才用,其他还是以分布式任务调度为主,延迟队列太多会引起不必要的认知混乱;

    5)、我个人的经验,延迟消息的场景下,交换机最好使用Direct点对点模式,我们公司曾经出现过同事使用Topic模式,自以为路由规则匹配写的没问题,实际上导致规则冲突消息被其他消费者给消费掉了,MQ流转消息本身是静默处理的,所以他找死都找不到原因,直到后来被其他同事偶然发现才解决。所以对于这种单一场景最保险的方式还是Direct,一对一总不会有问题。

    总结

    总结下来就以下几点:

    1)、不管用不用,在安装RabbitMQ时就顺便把延迟插件也装上;

    2)、推荐以死信队列方式为主;

    3)、不要太多地方使用延迟队列;

    4)、交换机模式使用Direct点对点。

    最后,我会把本次案例的代码地址放在评论中,两种实现方式都有,可以直接运行起来,想要学习的可以下载来看看。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZdH4s5Af-1659200994169)(https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/036ba551c8cf4a18ad6114b1db23b9ed~tplv-k3u1fbpfcp-watermark.image?)]


    原创文章纯手打,觉得有一滴滴帮助就请举手之劳点个推荐吧~

    持续分享工作中的真实经验和心得体会,喜欢的话就点个关注吧~

    • 本文链接: https://blog.csdn.net/fulongyuanjushi/p/16535150.html
    • 关于博主: 评论和私信会在第一时间回复。或者直接私信我。
    • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
    • 声援博主: 如果您觉得文章对您有帮助,可以点击文章右下角**【[推荐](javascript:void(0)😉】**一下。
  • 相关阅读:
    鹅厂3.5亿打水漂,又一款产品黄了
    linux 安装中文字体
    如何用 Python 实现 Excel 任务自动化
    (个人杂记)第九章 串口实验2
    9. 微积分 - 导数
    Nydus | 容器镜像基础
    包装类 or 泛型
    编译原理:编译原理简明教程知识点梳理(应对考试版)
    基于卷积神经网络的苗语孤立词语音识别
    上架即封神!3.6k Star 的开源游戏模拟器,Delta 冲上 App Store 免费榜
  • 原文地址:https://blog.csdn.net/qq_43479892/article/details/126080225