• 【消息队列笔记】chp3-如何确保消息不丢失


    一、检测消息是否丢失

    我们要保证消息的可靠交付,首先就要知道消息是否丢失了。如何做到这一点呢?

    对于IT基础设施比较完善的公司,可以使用分布式链路追踪系统来追踪每一条消息。如果没有这样的系统,可以使用消息的有序性来验证是否有消息丢失。原理很简单,我们在producer端给每个发出的消息附加一个连续递增的序号,然后在consumer端检查这个序号的连续性

    如果消息没有丢失,那么consumer收到的消息序号必然是递增的,如果检测到序号不连续,那就可以确定是消息丢失的,丢失的消息就是缺少的序号。

    大多数消息队列的客户端支持拦截器机制,在producer发送消息之前的拦截器中将序号注入到消息中,在consumer收到消息的拦截器中检测序号的连续性。这样消息检测的代码不会侵入到业务代码,等系统稳定后可以删除这部分逻辑。

    对于Kafka和RocketMQ来说,不保证消息在topic上严格有序,但是保证在队列/分区上是有序的,所以我们发送消息时要指定分区,然后在分区上检测消息序号的连续性。

    其次,consumer数量和分区数量一致比较容易对消息序号进行检测。


    二、确保消息可靠传递

    首先我们要知道什么时候可能会丢失消息,然后才能进行针对性的措施。

    一条消息从生产到消费完成,分为如下三个阶段:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kkPQitJ6-1668600114767)(E:/Blog/lansg/source/img/image-20221110103120981.png)]

    1.生产阶段

    在该阶段,消息队列通过请求确认机制来保证消息的可靠传递

    当producer给broker发送消息时,broker收到消息后会返回给客户端一个确认响应,表示消息已收到。只要producer收到了这个确认响应,就可以保证消息不会丢失。如果长时间没有收到响应会进行重发消息,如果一直失败则会返回一个异常或者返回值。

    注意:要正确处理返回值或者捕获异常,才能完全保证消息不会丢失!如果是异步发送的话,需要在回调方法进行检查。


    2.存储阶段

    如果对消息的可靠性要求非常高,可以通过配置broker参数来避免因宕机而丢消息

    • 对于单节点的broker,需要配置参数使得将消息写入磁盘后再给producer返回确认响应。例如在RocketMQ中,将刷盘方式flushDiskType配置为SYNC_FLUSH同步刷盘。

    • 对于broker集群,需要配置为:至少将消息发送到两个以上的节点再返回确认响应

    3.消费阶段

    在该阶段,不要在收到消息后就立即发送消费确认,而是应该再执行完所有消费业务逻辑之后,再发送消费确认

    客户端从broker拉取消息后,执行用户消费的业务逻辑,成功后再给broker发送消费确认响应。如果broker没有收到响应,下次拉取时还会拉取同一条消息。

  • 相关阅读:
    Tomcat使用与Servlet
    hive3.1核心源码思路
    基于SSH一些相关的命令
    Azure Data Factory(九)基础知识回顾
    Python面试题_初级版
    Vue、React和小程序中的组件通信:父传子和子传父的应用
    网络拓扑结构
    pandas(综合测试)
    GDPU 数据结构 天码行空8
    TCP通信
  • 原文地址:https://blog.csdn.net/lans_g/article/details/127892201