• RocketMQ—RocketMQ消费重试和死信消息


    RocketMQ—RocketMQ消费重试和死信消息

    消费重试

    生产者重试

    设置重试的代码如下

    // 失败的情况重发3次
    producer.setRetryTimesWhenSendFailed(3);
    // 消息在1S内没有发送成功,就会重试
    producer.send(msg, 1000);
    

    一般情况下,我们不会在生产者方进行重试。

    消费者重试

    消费者在消费消息的过程中,下方三种情况会进行重试:

    • 业务报错了
    • 返回null 返回
    • 返回RECONSUME_LATER

    代码如下:

    /**
         * 重试的时间间隔
         * 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
         * 默认重试16次
         * --------------
         * 重试的次数一般 5次
         * @throws Exception
         */
    @Test
    public void retryConsumer() throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group");
        consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
        consumer.subscribe("retryTopic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
                MessageExt messageExt = msgs.get(0);
                System.out.println(new Date());
                System.out.println(messageExt.getReconsumeTimes());
                System.out.println(new String(messageExt.getBody()));
                // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
        consumer.start();
        System.in.read();
    }
    

    消息默认重试16次:

    能否自定义重试次数

    设置重试次数的代码如下:

    // 设定重试次数
    consumer.setMaxReconsumeTimes(2);
    

    消息的构成如下:

    消息构成

    如果使用了上述代码,就会为消息头设置重试次数。

    死信消息

    如果消息重试了最大次数还是失败怎么办

    最大次数:如果没有设置最大次数,默认情况下,并发模式是16次,顺序模式是int的最大值。

    如果重试了最大次数还是失败,就会变成死信消息,会被放进一个死信主题中去,这个死信主题的名字是有规律的,这个主题是

    %DLQ%消费者组的名称

    当消息处理失败的时候该如何正确的处理

    1. 可以监听死信消息,给管理员发送邮件或者短信通知,但是如果有多个死信消息,就要写多个监听器;
    2. 可以手动判断重试次数,如果大于某个次数,就记录下来,就不重试了,发送邮件或者短信通知。
    try {
        handleDb();
    } catch (Exception e) {
        // 重试
        int reconsumeTimes = messageExt.getReconsumeTimes();
        if (reconsumeTimes >= MAX_TIMES) {
            // 不要重试了
            System.out.println("记录到特别的位置 文件 mysql 通知人工处理");
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    
  • 相关阅读:
    新学期,新FLAG
    万界星空科技云MES系统生产全流程追溯功能介绍
    【iOS】Fastlane一键打包上传到TestFlight、蒲公英
    RR有幻读问题吗?MVCC能否解决幻读?
    前端面试官:你能实现js数组原型上的方法吗?
    旅游业为什么要选择VR全景,VR全景在景区旅游上有哪些应用
    进程与线程的记忆方法
    Redis-cli 查询中文数据乱码问题
    每日练习------实现双色球的彩票功能。规则:从36个红球中随机选择不重复的6个数,从15个篮球中随机选择1个组成一注彩票。可以选择买多注。
    C语言编程用递归法求
  • 原文地址:https://www.cnblogs.com/nicaicai/p/18018420