• 消息队列 RocketMQ 消息重复消费问题(原因及解决)


    目录

    1.出现重复消费的原因

    2.解决

    2.1 数据库插入法

    2.2 使用布隆过滤器

    2.2.1 添加hutool的依赖

    2.2.2 测试生产者

    2.2.2 测试消费者


    1.出现重复消费的原因

    1. BROADCASTING(广播) 模式下,所有注册的消费者都会消费,而这些消费者通常是集群部署的一个个微服务,这样就会多台机器重复消费,当然这个是根据需要来选择。
    2. CLUSTERING(负载均衡)模式下,如果一个 topic 被多个 consumerGroup 消费,也会重复消费。
    3. 即使是在 CLUSTERING 模式下,同一个 consumerGroup 下,一个队列只会分配给一个消费者,看起来好像是不会重复消费。但是,有个特殊情况:一个消费者新上线后,同组的所有消费者要重新负载均衡重平衡 reBalance(反之一个消费者掉线后,也一样)。一个队列所对应的新的消费者要获取之前消费的 offset(偏移量,也就是消息消费的点位),此时之前的消费者可能已经消费了一条消息,但是并没有把 offset 提交给 broker,那么新的消费者可能会重新消费一次。虽然 orderly 模式是前一个消费者先解锁,后一个消费者加锁再消费的模式,比起 concurrently 要严格了,但是加锁的线程和提交offset 的线程不是同一个,所以还是会出现极端情况下的重复消费。
    4. 还有在发送批量消息的时候,会被当做一条消息进行处理,那么如果批量消息中有一条业务处理成功,其他失败了,还是会被重新消费一次。

    简单的说:

    1. Consumer 消费完消息并不是实时同步到 Broker 的,而是将 offset 先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致 offset 没提交,下次没提交 offset 的这部分消息会被再次消费
    2. 即使 offset 被提交到了 Broker,在还没来得及持久化的时候 Broker 宕机了,当重启的时候 Broker 会读取consumerOffset.json 中保存的 offset 信息,这就会导致没持久化 offset 的这部分消息会被再次消费

    那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId 也可以是你自定义的唯一的 key,这样就可以去重了

    2.解决

    我们需要给我们的消费者实现 幂等 ,也就是对同一个消息的处理结果,执行多少次都不变。

    幂等性:多次操作产生的影响均和第一次操作产生的影响相同

    例如:判断 crud 的幂等性

    a. 新增:普通的新增是非幂等,设置了唯一索引的新增是幂等操作

    b. 修改:update goods set stock = 10 where id = 1 幂等

                   update goods set stock = stock - 1 where id = 1 非幂等

    c. 查询:幂等

    d. 删除:幂等

    那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用写入 Redis 来保证,因为Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。

    2.1 数据库插入

    发送方需要给消息带一个唯一标记(根据业务标识)

    模拟业务 数据库的订单操作日志表结构(去重表)

    给订单号添加唯一索引(订单号存的是 key)

    模拟业务,生产者发送了重复的消息

    1. @Test
    2. public void repeatTest() throws Exception {
    3. String key = UUID.randomUUID().toString();
    4. Message<String> msg = MessageBuilder.withPayload("扣减库存 -1").setHeader(RocketMQHeaders.KEYS, key).build();
    5. rocketMQTemplate.syncSend("repeatTopic", msg);
    6. rocketMQTemplate.syncSend("repeatTopic", msg);
    7. }

    消费者

    1. @Component
    2. @RocketMQMessageListener(topic = "repeatTopic",consumerGroup = "repeat-consumer-group")
    3. public class RepeatListener implements RocketMQListener<MessageExt> {
    4. @Autowired
    5. private LogMapper logMapper;
    6. @Override
    7. public void onMessage(MessageExt messageExt) {
    8. // 先拿key
    9. String keys = messageExt.getKeys();
    10. // 插入数据库 因为key做了唯一索引
    11. OrderOperLog orderOperLog = new OrderOperLog();
    12. orderOperLog.setType(1l);
    13. orderOperLog.setOrderSn(keys);
    14. orderOperLog.setUserId("1003");
    15. int insert = logMapper.insert(orderOperLog);
    16. System.out.println(keys);
    17. System.out.println(new String(messageExt.getBody()));
    18. }
    19. }

    在消费第二条的时候抛出唯一索引重复 SQLIntegrityConstraintViolationException

    数据库只插入一条这样的记录

    优化,捕获到异常是 SQLIntegrityConstraintViolationException 时直接将消息签收了,不再进行业务处理,因为之前已经消费了一条同样的消息,这样便可以解决重复消费问题

    2.2 使用布隆过滤器

    • 使用去重方案解决,例如将消息的唯一标识存起来,然后每次消费之前先判断是否存在这个唯一标识,如果存在则不消费,如果不存在则消费,并且消费以后将这个标记保存。
    • 想法很好,但是消息的体量是非常大的,可能在生产环境中会到达上千万甚至上亿条,那么我们该如何选择一个容器来保存所有消息的标识,并且又可以快速的判断是否存在呢?

    我们可以选择布隆过滤器(BloomFilter)

    介绍:

    布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。

    布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

    2.2.1 添加hutool的依赖

    1. <dependency>
    2. <groupId>cn.hutool</groupId>
    3. <artifactId>hutool-all</artifactId>
    4. <version>5.7.11</version>
    5. </dependency>

    2.2.2 测试生产者

    1. public void testRepeatProducer() throws Exception {
    2. // 创建默认的生产者
    3. DefaultMQProducer producer = new DefaultMQProducer("test-group");
    4. // 设置nameServer地址
    5. producer.setNamesrvAddr("localhost:9876");
    6. // 启动实例
    7. producer.start();
    8. // 我们可以使用自定义key当做唯一标识
    9. String keyId = UUID.randomUUID().toString();
    10. System.out.println(keyId);
    11. Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
    12. SendResult send = producer.send(msg);
    13. System.out.println(send);
    14. // 关闭实例
    15. producer.shutdown();
    16. }

    发送了两条相同的消息

    1. 55d397c9-814f-4931-b0fd-7e142c04759b
    2. SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C359, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
    3. SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C43F, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=2], queueOffset=0]

    2.2.2 测试消费者

    1. /**
    2. * 在boot项目中可以使用@Bean在整个容器中放置一个单利对象
    3. */
    4. public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m数组长度
    5. @Test
    6. public void testRepeatConsumer() throws Exception {
    7. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
    8. consumer.setMessageModel(MessageModel.BROADCASTING);
    9. consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
    10. consumer.subscribe("repeatTestTopic", "*");
    11. // 注册一个消费监听 MessageListenerConcurrently是并发消费
    12. consumer.registerMessageListener(new MessageListenerConcurrently() {
    13. @Override
    14. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
    15. ConsumeConcurrentlyContext context) {
    16. // 拿到消息的key
    17. MessageExt messageExt = msgs.get(0);
    18. String keys = messageExt.getKeys();
    19. // 判断是否存在布隆过滤器中
    20. if (bloomFilter.contains(keys)) {
    21. // 直接返回了 不往下处理业务
    22. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    23. }
    24. // 这个处理业务,然后放入过滤器中
    25. // do sth...
    26. bloomFilter.add(keys);
    27. System.out.println("keys:" + keys);
    28. System.out.println(new String(messageExt.getBody()));
    29. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    30. }
    31. });
    32. consumer.start();
    33. System.in.read();
    34. }

    业务只处理了一条

    1. keys:55d397c9-814f-4931-b0fd-7e142c04759b
    2. 库存-1

    延迟过了后 重复消息被签收

    解决重复消费问题

  • 相关阅读:
    Kubernetes(K8S)快速搭建typecho个人博客
    Autosar模块介绍:AutosarOS(3)
    HTML期末大作业:DIV简单的篮球网页制作期末作业 篮球明星科比js三级页面
    yolact 环境配置
    零基础Linux_16(基础IO_文件)笔试选择题:文件描述符+ionde和动静态库
    2024届秋招小记
    vue导入数据添加在列表数据中
    Chapter 3 New Optimizers for Deep Learning
    【模板】差分
    人工智能 AI 绘画 AI绘制的图片 ? 简介的版权,以及如何使用图像生成AI 绘画 ?
  • 原文地址:https://blog.csdn.net/m0_65819602/article/details/133973792