• ActiveMQ如何处理重复消息?如何保证消息的有序性?如何处理消息堆积?


    1. 如何处理重复消息

    我们先来看看能不能避免消息的重复。

    假设我们发送消息,就管发,不管Broker的响应,那么我们发往Broker是不会重复的。

    但是一般情况我们是不允许这样的,这样消息就完全不可靠了,我们的基本需求是消息至少得发到Broker上,那就得等Broker的响应,那么就可能存在Broker已经写入了,当时响应由于网络原因生产者没有收到,然后生产者又重发了一次,此时消息就重复了。

    再看消费者消费的时候,假设我们消费者拿到消息消费了,业务逻辑已经走完了,事务提交了,此时需要更新Consumer offset了,然后这个消费者挂了,另一个消费者顶上,此时Consumer offset还没更新,于是又拿到刚才那条消息,业务又被执行了一遍。于是消息又重复了。

    可以看到正常业务而言消息重复是不可避免的,因此我们只能从另一个角度来解决重复消息的问题。

    关键点就是幂等。既然我们不能防止重复消息的产生,那么我们只能在业务上处理重复消息所带来的影响。

    1.1 幂等处理重复消息

    幂等是数学上的概念,我们就理解为同样的参数多次调用同一个接口和调用一次产生的结果是一致的

    例如这条 SQLupdate t1 set money = 150 where id = 1 and money = 100; 执行多少遍money都是150,这就叫幂等

    因此需要改造业务处理逻辑,使得在重复消息的情况下也不会影响最终的结果。

    可以通过上面我那条 SQL 一样,做了个前置条件判断,即money = 100情况,并且直接修改,更通用的是做个version即版本号控制,对比消息中的版本号和数据库中的版本号

    或者通过数据库的约束例如唯一键,例如insert into update on duplicate key...

    或者记录关键的key,比如处理订单这种,记录订单ID,假如有重复的消息过来,先判断下这个ID是否已经被处理过了,如果没处理再进行下一步。当然也可以用全局唯一ID等等

    真正应用到实际中还是得看具体业务细节

    2. 如何保证消息的有序性

    有序性分:全局有序和部分有序

    2.1 全局有序

    如果要保证消息的全局有序,首先只能由一个生产者往Topic发送消息

    并且一个Topic内部只能有一个队列(分区)。消费者也必须是单线程消费这个队列。这样的消息就是全局有序的!

    不过一般情况下我们都不需要全局有序,即使是同步MySQL Binlog也只需要保证单表消息有序即可

    在这里插入图片描述

    2.2 部分有序

    因此绝大部分的有序需求是部分有序,部分有序我们就可以将Topic内部划分成我们需要的队列数,把消息通过特定的策略发往固定的队列中,然后每个队列对应一个单线程处理的消费者。

    这样即完成了部分有序的需求,又可以通过队列数量的并发来提高消息处理效率。

    在这里插入图片描述
    图中我画了多个生产者,一个生产者也可以,只要同类消息发往指定的队列即可。

    3. 如果处理消息堆积

    消息的堆积往往是因为生产者的生产速度与消费者的消费速度不匹配。有可能是因为消息消费失败反复重试造成的,也有可能就是消费者消费能力弱,渐渐地消息就积压了。

    因此我们需要先定位消费慢的原因,如果是bug则处理 bug ,如果是因为本身消费能力较弱,我们可以优化下消费逻辑,比如之前是一条一条消息消费处理的,这次我们批量处理,比如数据库的插入,一条一条插和批量插效率是不一样的。

    假如逻辑我们已经都优化了,但还是慢,那就得考虑水平扩容了,增加Topic的队列数和消费者数量,注意队列数一定要增加,不然新增加的消费者是没东西消费的。一个Topic中,一个队列只会分配给一个消费者

    当然你消费者内部是单线程还是多线程消费那看具体场景。不过要注意上面提到的消息丢失的问题,如果你是将接受到的消息写入内存队列之后,然后就返回响应给Broker,然后多线程向内存队列消费消息,假设此时消费者宕机了,内存队列里面还未消费的消息也就丢了。

    最后

    上面的几个问题都是我们在使用消息队列的时候经常能遇到的问题,并且也是面试关于消息队列方面的核心考点。大方向上搞明白很关键。

    转载自 : https://mp.weixin.qq.com/s/1r1x-Irbatvzdc90haaecA

  • 相关阅读:
    无分类器指导的Classifier-free Diffusion Models技术
    Cesium冷知识:判断cesium是否使用webgl2
    把大模型装进手机,总共分几步?
    分布式锁以及实现方式三种
    函数中使用sizeof(arr) / sizeof(arr[0])求数组长度不正确的原因
    ffmpeg把RTSP流分段录制成MP4,如果能把ffmpeg.exe改成ffmpeg.dll用,那音视频开发的难度直接就降一个维度啊
    人工智能导论笔记(持续更新)
    prometheus学习2数据类型了解&PromQL
    树莓派简单驱动编写以及代码测试
    JavaScript 编写一个 数值转换函数 万以后简化 例如1000000 展示为 100万 万以下原来数值返回
  • 原文地址:https://blog.csdn.net/weixin_44232093/article/details/126606915