目录
简单的说:
- Consumer 消费完消息并不是实时同步到 Broker 的,而是将 offset 先保存在本地map中,通过定时任务持久化上去。这就导致消息被消费了,但是此时消费者宕机了导致 offset 没提交,下次没提交 offset 的这部分消息会被再次消费
- 即使 offset 被提交到了 Broker,在还没来得及持久化的时候 Broker 宕机了,当重启的时候 Broker 会读取consumerOffset.json 中保存的 offset 信息,这就会导致没持久化 offset 的这部分消息会被再次消费
那么如果在CLUSTERING(负载均衡)模式下,并且在同一个消费者组中,不希望一条消息被重复消费,改怎么办呢?我们可以想到去重操作,找到消息唯一的标识,可以是 msgId 也可以是你自定义的唯一的 key,这样就可以去重了
我们需要给我们的消费者实现 幂等 ,也就是对同一个消息的处理结果,执行多少次都不变。
幂等性:多次操作产生的影响均和第一次操作产生的影响相同
例如:判断 crud 的幂等性
a. 新增:普通的新增是非幂等,设置了唯一索引的新增是幂等操作
b. 修改:update goods set stock = 10 where id = 1 幂等
update goods set stock = stock - 1 where id = 1 非幂等
c. 查询:幂等
d. 删除:幂等
那么如何给业务实现幂等呢?这个还是需要结合具体的业务的。你可以使用写入 Redis 来保证,因为Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 ,基于数据库的唯一键来保证重复数据不会被插入多条。
发送方需要给消息带一个唯一标记(根据业务标识)
模拟业务 数据库的订单操作日志表结构(去重表)

给订单号添加唯一索引(订单号存的是 key)

模拟业务,生产者发送了重复的消息
- @Test
- public void repeatTest() throws Exception {
- String key = UUID.randomUUID().toString();
- Message<String> msg = MessageBuilder.withPayload("扣减库存 -1").setHeader(RocketMQHeaders.KEYS, key).build();
- rocketMQTemplate.syncSend("repeatTopic", msg);
- rocketMQTemplate.syncSend("repeatTopic", msg);
- }
消费者
- @Component
- @RocketMQMessageListener(topic = "repeatTopic",consumerGroup = "repeat-consumer-group")
- public class RepeatListener implements RocketMQListener<MessageExt> {
- @Autowired
- private LogMapper logMapper;
- @Override
- public void onMessage(MessageExt messageExt) {
- // 先拿key
- String keys = messageExt.getKeys();
- // 插入数据库 因为key做了唯一索引
- OrderOperLog orderOperLog = new OrderOperLog();
- orderOperLog.setType(1l);
- orderOperLog.setOrderSn(keys);
- orderOperLog.setUserId("1003");
- int insert = logMapper.insert(orderOperLog);
- System.out.println(keys);
- System.out.println(new String(messageExt.getBody()));
- }
- }
在消费第二条的时候抛出唯一索引重复 SQLIntegrityConstraintViolationException

数据库只插入一条这样的记录
![]()
优化,捕获到异常是 SQLIntegrityConstraintViolationException 时直接将消息签收了,不再进行业务处理,因为之前已经消费了一条同样的消息,这样便可以解决重复消费问题
我们可以选择布隆过滤器(BloomFilter)
介绍:
布隆过滤器(英语:Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
- <dependency>
- <groupId>cn.hutool</groupId>
- <artifactId>hutool-all</artifactId>
- <version>5.7.11</version>
- </dependency>
- public void testRepeatProducer() throws Exception {
- // 创建默认的生产者
- DefaultMQProducer producer = new DefaultMQProducer("test-group");
- // 设置nameServer地址
- producer.setNamesrvAddr("localhost:9876");
- // 启动实例
- producer.start();
- // 我们可以使用自定义key当做唯一标识
- String keyId = UUID.randomUUID().toString();
- System.out.println(keyId);
- Message msg = new Message("TopicTest", "tagA", keyId, "我是一个测试消息".getBytes());
- SendResult send = producer.send(msg);
- System.out.println(send);
- // 关闭实例
- producer.shutdown();
- }
发送了两条相同的消息
- 55d397c9-814f-4931-b0fd-7e142c04759b
- SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C359, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=1], queueOffset=0]
- SendResult [sendStatus=SEND_OK, msgId=7F00000121C418B4AAC204A76B050000, offsetMsgId=C0A8588200002A9F000000000002C43F, messageQueue=MessageQueue [topic=repeatTestTopic, brokerName=broker-a, queueId=2], queueOffset=0]
- /**
- * 在boot项目中可以使用@Bean在整个容器中放置一个单利对象
- */
- public static BitMapBloomFilter bloomFilter = new BitMapBloomFilter(100); // m数组长度
-
- @Test
- public void testRepeatConsumer() throws Exception {
-
- DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group");
- consumer.setMessageModel(MessageModel.BROADCASTING);
- consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
- consumer.subscribe("repeatTestTopic", "*");
- // 注册一个消费监听 MessageListenerConcurrently是并发消费
- consumer.registerMessageListener(new MessageListenerConcurrently() {
- @Override
- public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
- ConsumeConcurrentlyContext context) {
- // 拿到消息的key
- MessageExt messageExt = msgs.get(0);
- String keys = messageExt.getKeys();
- // 判断是否存在布隆过滤器中
- if (bloomFilter.contains(keys)) {
- // 直接返回了 不往下处理业务
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- // 这个处理业务,然后放入过滤器中
- // do sth...
- bloomFilter.add(keys);
- System.out.println("keys:" + keys);
- System.out.println(new String(messageExt.getBody()));
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
- }
- });
- consumer.start();
- System.in.read();
- }
业务只处理了一条
- keys:55d397c9-814f-4931-b0fd-7e142c04759b
- 库存-1
延迟过了后 重复消息被签收

解决重复消费问题