• Rocketmq消费消息时不丢失不重复


    消息消费不丢失

    手动ACK

    在消费者端,需要确保在消息拉取并消费成功之后再给Broker返回ACK,就可以保证消息不丢失了,如果这个过程中Broker一直没收到ACK,那么就可以重试。所以,在消费者的代码中,一定要在业务逻辑的最后一步return ConsumeConcurrentlyStatus.CONSUME_SUCCESS

    spring boot中 消费消息确认

    @Component
    @Slf4j
    @RocketMQMessageListener(consumerGroup = "${carInInfo.topic}",
            topic = "${carInInfo.topic}", selectorExpression = "*",
            consumeMode = ConsumeMode.ORDERLY)
    public class CarInParkSynThirdMQ implements RocketMQListener<AddCarInParkDTO> {
       
        /**
         * 请不要捕获异常信息,否则无法进行消息重新推送
         *
         * @param addCarInParkDTO
         */
        @Override
        public void onMessage(AddCarInParkDTO addCarInParkDTO) {
            System.out.println("收到消息:" + JSON.toJSONString(addCarInParkDTO));
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    指定consumeMode = ConsumeMode.ORDERLY,实现消息确认,我们看下源码:

    DefaultRocketMQListenerContainer.java这个类,看下其中一个类实现

    public class DefaultMessageListenerOrderly implements MessageListenerOrderly {
     @SuppressWarnings("unchecked")
        @Override
        public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {
            for (MessageExt messageExt : msgs) {
                log.debug("received msg: {}", messageExt);
                try {
                    long now = System.currentTimeMillis();
                    handleMessage(messageExt);
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
                } catch (Exception e) {
                    log.warn("consume message failed. messageExt:{}", messageExt, e);
                    context.setSuspendCurrentQueueTimeMillis(suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
     
            return ConsumeOrderlyStatus.SUCCESS;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    只要没有异常出现,那么就会消费成功,有异常出现了就重新进行发送,那这个又是在哪里调用的呢?再看下这个private方法就明白了

     private void initRocketMQPushConsumer() throws MQClientException {
           ......
     
            switch (consumeMode) {
                case ORDERLY:
                    consumer.setMessageListener(new DefaultMessageListenerOrderly());
                    break;
                case CONCURRENTLY:
                    consumer.setMessageListener(new DefaultMessageListenerConcurrently());
                    break;
                default:
                    throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
            }
     
            if (rocketMQListener instanceof RocketMQPushConsumerLifecycleListener) {
                ((RocketMQPushConsumerLifecycleListener) rocketMQListener).prepareStart(consumer);
            } else if (rocketMQReplyListener instanceof RocketMQPushConsumerLifecycleListener) {
                ((RocketMQPushConsumerLifecycleListener) rocketMQReplyListener).prepareStart(consumer);
            }
     
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    消息重试

    对于普通的消息,当消费者消费消息失败后,可以通过设置返回状态达到消息重试的结果。

    如何让消息进行重试

    RocketMQ 提供消费重试的机制。在消息消费失败的时候,RocketMQ 会通过消费重试机制,重新投递该消息给 Consumer ,让 Consumer 有机会重新消费消息,实现消费成功。

    当然,RocketMQ 并不会无限重新投递消息给 Consumer 重新消费,而是在默认情况下,达到 16 次重试次数时,Consumer 还是消费失败时,该消息就会进入到死信队列

    @Component
    @RocketMQMessageListener(consumerGroup = "MyConsumerGroup", topic = "TestTopic")
    public class SpringConsumer implements RocketMQListener<String> , RocketMQPushConsumerLifecycleListener {
    
        @Override
        public void onMessage(String message) {
            System.out.println("Received message : "+ message);
    
        }
    
        @Override
        public void prepareStart(DefaultMQPushConsumer consumer) {
          // 设置最大重试次数
          consumer.setMaxReconsumeTimes(5);
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    一条消息无论重试多少次,这些重试消息的MessageId始终都是一样的。

    每次重试的间隔时间如下:

    死信队列

    当一条消息消费失败,RocketMQ就会自动进行消息重试。而如果消息超过最大重试次数,RocketMQ就会认为这个消息有问题。但是此时,RocketMQ不会立刻将这个有问题的消息丢弃,而会将其发送到这个消费者组对应的一种特殊队列:死信队列。

    死信队列的特征:

    • 一个死信队列对应一个ConsumGroup,而不是对应某个消费者实例。
    • 如果一个ConsumeGroup没有产生死信队列,RocketMQ就不会为其创建相应的死信队列。
    • 一个死信队列包含了这个ConsumeGroup里的所有死信消息,而不区分该消息属于哪个Topic。
    • 死信队列中的消息不会再被消费者正常消费。
    • 死信队列的有效期跟正常消息相同。默认3天,对应broker.conf中的fileReservedTime属性。超过这个最长时间的消息都会被删除,而不管消息是否消费过。

    通常,一条消息进入了死信队列,意味着消息在消费处理的过程中出现了比较严重的错误,并且无法自行恢复。此时,一般需要人工去查看死信队列中的消息,对错误原因进行排查。然后对死信消息进行处理,比如转发到正常的Topic重新进行消费,或者丢弃。

    消息幂等

    在MQ系统中,消息幂等有三种实现语义:

    at most once 最多一次:每条消息最多只会被消费一次

    at least once 至少一次:每条消息至少会被消费一次

    exactly once 刚刚好一次:每条消息都只会确定的消费一次

    这三种语义都有他适用的业务场景。

    at most once是最好保证的。RocketMQ中可以直接用异步发送、sendOneWay等方式就可以保证。

    at least once这个语义,RocketMQ也有同步发送、事务消息等很多方式能够保证。

    而这个exactly once是MQ中最理想也是最难保证的一种语义。RocketMQ只能保证at least once,保证不了exactly once。

    RocketMQ 消息重复的场景

    发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

    投递时消息重复消息

    消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

    负载均衡时消息重复

    包括但不限于网络抖动、Broker 重启以及订阅方应用重启,当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

    消息幂等解决方案

    在RocketMQ中,是无法保证每个消息只被投递一次的,所以要在业务上自行来保证消息消费的幂等性。

    ocketMQ的每条消息都有一个唯一的MessageId,这个参数在多次投递的过程中是不会改变的,所以业务上可以用这个MessageId来作为判断幂等的关键依据。

    但是,这个MessageId是无法保证全局唯一的,也会有冲突的情况。所以在一些对幂等性要求严格的场景,最好是使用业务上唯一的一个标识比较靠谱。例如订单ID。而这个业务标识可以使用Message的Key来进行传递。

    比如我们业务的消息消费逻辑是:插入某张订单表的数据,然后更新库存。

    insert into t_order values .....
    update t_inv set count = count-1 where good_id = 'good123';
    
    • 1
    • 2

    要实现消息的幂等,我们可能会采取这样的方案:

    select * from t_order where order_no = 'order123'
    if(order  != null) {
        return ;//消息重复,直接返回
    }
    
    • 1
    • 2
    • 3
    • 4

    这对于很多情况下,的确能起到不错的效果,但是在并发场景下,还是会有问题。

    还可以通过以下方式处理:

    • 使用数据库的行锁处理
    • 利用分布式锁处理不同服务间的并发。
    • 数据库对唯一值的入库字段设唯一索引。
  • 相关阅读:
    聚力人才一路“向北”,开福区为“强省会”装上强劲引擎
    liteide 找不到 go 路径错误修复
    MTK平台Metadata的加载(1)——Metadata介绍
    git克隆一直报错remote: HTTP Basic: Access denied
    鸿蒙HarmonyOS应用开发初体验
    Postgresql事物快照介绍
    宇宙最強的IDE - Visual Studio 25岁生日快乐
    今日睡眠质量记录80分
    Android常用的延迟执行任务及轮询定时任务的几种方式
    Selenium-元素操作、浏览器操作方法
  • 原文地址:https://blog.csdn.net/qq_30823993/article/details/134518147