码农知识堂 - 1000bd
  •   Python
  •   PHP
  •   JS/TS
  •   JAVA
  •   C/C++
  •   C#
  •   GO
  •   Kotlin
  •   Swift
  • RabbitMQ的confirm机制


    目录

    前言

    一、confirm机制的选择

     二、异步confirm设计。

    第一个问题:我们要支持重试,所以我们必须想一个办法,在发送之前把消息save起来,当监听到ACK后,在把对应的消息remove掉。

    第二个问题:忽略了回调方法的第二个参数,multiple

    第三个问题:没有在关闭Channel前,去检查该channel上是否还存在未ACK的消息。


    前言

    RabbitMQ为了保证消息不丢失,设置了confirm机制,其中confirm有3种方式:

    • 同步confirm:即发送一条消息,同步等待,MQ的ACK。
    • 批量confirm:   即发送一批消息,比如发送100条消息,然后等待MQ的ACK
    • 异步confirm: 通过监听的方式,异步的等待MQ的ack消息。 

    关于RabbitMQ的confirm使用机制我相信大家有了一定的了解,我这边就不在这边过多的介绍,如果还有不太清楚的朋友,可以去朱小厮的博客去学习,我把对应的博客链接放在下面的位置。 

    RabbitMQ之消息确认机制(事务+Confirm)_朱小厮的博客-CSDN博客_rabbitmq生产者确认机制概述在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失https://hiddenpps.blog.csdn.net/article/details/55515234接下来我重点想大家介绍在落地的过程中遇到的一些思考与问题:

    一、confirm机制的选择

    如前言所述:confirm机制有3种方式、

    我们最开始使用的第一种方式:

    主要选择的理由是实现起来特别简单。

    相比于异步confirm的复杂编程,一行代码就能搞定了。

    另外相比于批量confirm,批量confirm的缺点就比较明显了。当发送的一批数据,其中某一条数据发送失败的情况下,就会导致这一批数据都会ACK失败,如果处理不当就很容易产生消息重复。

     但是随着我们线上业务的使用我们发现了如下问题:首先:同步confirm,致命的就是同步。

    我们发现业务系统起了20个线程,去发布消息,某一各时刻,由于网络波动等原因,导致MQ发送失败,业务线程一直同步等待ACK超时(超时时间默认为3S),这样由于MQ服务不稳定的原因,导致业务线程一直阻塞。存在拖垮应用的风险。也频繁的上报发送超时的异常日志报警。

    分析原因:当我遇到这个问题的时候,我是想让运维去看下日志报错的时间段,通过抓包AMQP协议的数据包,分析下为什么业务系统连接MQ会发生超时。来分析下真实的原因,最后运维未能如我所愿的抓到我想看到的信息,导致到现在也没找到原因。

    后面我们便开始着手将同步cofirm机制,更改为异步confrim设计。

     二、异步confirm设计。

    刚开始着手更改的时候,我以为比较简单。但是现在回顾其实也遇到了一些问题,故分享给大家。

    • 首先我们发送的时候,是通过channel进行发送的,由于发送者线程不可控,channel在MQ服务器上过多会造成MQ资源消耗过大。因此,我们在发送的时候,便维护了一个channel的池子。通过通过jdk中的GenericObjectPool做的。
    • 为了保证消息不可丢,我们有补偿机制。需要对发送失败的消息进行重试。

    前面我们提到了我们是通过监听来实现异步的。接下来我们看下实现代码:

    1. channel.confirmSelect();
    2. channel.addConfirmListener(new ConfirmListener() {
    3. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
    4. if (multiple) {
    5. confirmSet.headSet(deliveryTag + 1).clear();
    6. } else {
    7. confirmSet.remove(deliveryTag);
    8. }
    9. }
    10. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    11. if (multiple) {
    12. confirmSet.headSet(deliveryTag + 1).clear();
    13. } else {
    14. confirmSet.remove(deliveryTag);
    15. }
    16. }
    17. });

    第一个问题:我们要支持重试,所以我们必须想一个办法,在发送之前把消息save起来,当监听到ACK后,在把对应的消息remove掉。

    由于监听器的回调方方handleNack的参数是deliverTag,这个是在当前channel的递增的。而我们发送的channel又是从channelPool中获取的。这样就碰到了我们遇到的第一个问题:我们要如何把保存消息和删除消息。我这边处理方案是:将消息保存到如下结构体中

    1. # channelNumber channel的唯一标识
    2. # deliverTag channel下递增的消息ID
    3. # Message 消息体内容
    4. Map>

    通过这种数据结构将消息保存起来。

    第二个问题:忽略了回调方法的第二个参数,multiple

    在进行压测的时候,发现我们Map中是数据一直会存在未被清除的数据。

    解决这个问题的思路:

    1.ACK丢失了,不是所有的消息都触发了ACK。

    2.我们的remove处理逻辑有问题,导致数据未能被清除干净。

    我一直以为MQ的ACK是一条一条进行ACK的,每发送一条消息都会触发一次ACK。实际上是:当发送速率比较慢的时候,MQ确实会一条一条的进行ACK,当我们大批量的发送并且发送速率很快时,MQ是会批量ACK,比如发送100条消息,进行1次ACK,ACK的deliverTag参数为100,代表序号1到100的数据都已经成功发送到MQ服务器了。

    第三个问题:没有在关闭Channel前,去检查该channel上是否还存在未ACK的消息。

    还是上面第二个问题,当我们发现不是每发一条消息都会ACK后,我们改了代码,在进行压测的时候,发现map中的消息还是清空不了。由于我们使用的是channelPool,channelPool进行销毁对象的时候,直接调用的channel.close.但是我们又开启confirm机制,就存在,当channel上还有消息未ack时,我们强行关闭channel,就导致channel不能进行ack,最后就体现在map中的数据不能清除。我们需要在close之前,调用channel.waitForConfirms()等待当前Channel上所有的消息都进行了ACK。

  • 相关阅读:
    【Qt】QGroundControl入门4:框架QGCApplication
    Fake Maxpooling 二维滑动窗口
    一种基于物理信息极限学习机的PDE求解方法
    http的get与post
    做一个物联网温湿度传感器(一)SHT30传感器介绍
    47-用户和权限管理
    学生个人网页模板 学生个人网页设计作品 简单个人主页成品 个人网页制作 HTML学生个人网站作业设计代做
    二进制安装k8s
    【自然语言处理】自然语言处理NLP概述及应用
    1800亿参数,支持中文,3.5万亿训练数据!开源类ChatGPT模型
  • 原文地址:https://blog.csdn.net/weixin_44399827/article/details/126108161
  • 最新文章
  • 攻防演习之三天拿下官网站群
    数据安全治理学习——前期安全规划和安全管理体系建设
    企业安全 | 企业内一次钓鱼演练准备过程
    内网渗透测试 | Kerberos协议及其部分攻击手法
    0day的产生 | 不懂代码的"代码审计"
    安装scrcpy-client模块av模块异常,环境问题解决方案
    leetcode hot100【LeetCode 279. 完全平方数】java实现
    OpenWrt下安装Mosquitto
    AnatoMask论文汇总
    【AI日记】24.11.01 LangChain、openai api和github copilot
  • 热门文章
  • 十款代码表白小特效 一个比一个浪漫 赶紧收藏起来吧!!!
    奉劝各位学弟学妹们,该打造你的技术影响力了!
    五年了,我在 CSDN 的两个一百万。
    Java俄罗斯方块,老程序员花了一个周末,连接中学年代!
    面试官都震惊,你这网络基础可以啊!
    你真的会用百度吗?我不信 — 那些不为人知的搜索引擎语法
    心情不好的时候,用 Python 画棵樱花树送给自己吧
    通宵一晚做出来的一款类似CS的第一人称射击游戏Demo!原来做游戏也不是很难,连憨憨学妹都学会了!
    13 万字 C 语言从入门到精通保姆级教程2021 年版
    10行代码集2000张美女图,Python爬虫120例,再上征途
Copyright © 2022 侵权请联系2656653265@qq.com    京ICP备2022015340号-1
正则表达式工具 cron表达式工具 密码生成工具

京公网安备 11010502049817号