• MQ 之 RocketMQ 应用及代码实现


    前言

    看视频的时候,到这一章节,弹幕里全是终于熬到这一章了,哈哈,对于大家来说,看理论知识确实很枯燥,不过很多东西必须要有一定的理论知识为我们做支撑,否则就是墙上芦苇、山间竹笋。
    那么,我们就开始RocketMQ的应用,使用代码来实现各项功能吧。

    应用

    1、普通消息 ---- 消息发送的三种方式

    1.1 同步消息发送

    Producer 发送一条消息,Broker 收到后会返回一个结果。我们此处会打印出结果信息,并在dashboard上查看是否有相关消息信息。
    代码:

    		// 创建一个producer,参数pg为Producer Group名称
            DefaultMQProducer producer = new DefaultMQProducer("pg");
            // 指定NameServer地址
            producer.setNamesrvAddr("rocketmq:9876");
            // 设置当发送失败时,重试发送的次数,默认为2次
            producer.setRetryTimesWhenSendFailed(3);
            // 设置发送超时时限,默认3s
            producer.setSendMsgTimeout(5000);
    
            // 开启生产者
            producer.start();
    
            // 生产并发送消息
            for (int i = 0; i < 100; i++){
                byte[] body = ("Hello," + i).getBytes();
                Message message = new Message("firstTopic", "firstTag", body);
                // 设置key
                message.setKeys("key-"+i);
                SendResult result = producer.send(message);
                System.out.println(result);
            }
            
            // 关闭producer
            producer.shutdown();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    打印结果:

    SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF47D0000, offsetMsgId=C0A83D6500002A9F000000000008C8B6, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=2], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF7560001, offsetMsgId=C0A83D6500002A9F000000000008C970, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=3], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF79F0002, offsetMsgId=C0A83D6500002A9F000000000008CA2A, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=0], queueOffset=0]
    SendResult [sendStatus=SEND_OK, msgId=7F0000013E7418B4AAC25EEAF7E00003, offsetMsgId=C0A83D6500002A9F000000000008CAE4, messageQueue=MessageQueue [topic=firstTopic, brokerName=rocketMQ, queueId=1], queueOffset=0]
    ......
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    dashboard的Topic
    在这里插入图片描述
    dashboard的Message:
    在这里插入图片描述

    1.2 异步消息发送

    异步消息发送,在Producer发送消息后,并不会等待ACK才继续,当收到ACK后会执行onSuccess回调,出错会执行onException回调。

    		DefaultMQProducer producer = new DefaultMQProducer("pg");
            producer.setNamesrvAddr("rocketmq:9876");
            // 设置当发送失败时,重试发送的次数,默认为2次
            producer.setRetryTimesWhenSendAsyncFailed(3);
            // 设置Queue的个数,默认为4
            producer.setDefaultTopicQueueNums(3);
            producer.start();
    
            // 生产并发送消息
            for (int i = 0; i < 100; i++){
                byte[] body = ("Hello," + i).getBytes();
                Message message = new Message("AsyncTopic", "AsyncTag", body);
                message.setKeys("key-"+i);
                // 异步发送
                producer.send(message, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println(sendResult);
                    }
    
                    @Override
                    public void onException(Throwable throwable) {
                        throwable.printStackTrace();
                    }
                });
            }
    
            // 休眠3秒,因为是异步,不休眠就会导致还未发送producer就被关闭,导致程序报错
            TimeUnit.SECONDS.sleep(3);
            producer.shutdown();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    SendResult [sendStatus=SEND_OK, msgId=7F0000012C4018B4AAC25F060205005C, offsetMsgId=C0A83D6500002A9F000000000009A797, messageQueue=MessageQueue [topic=AsyncTopic, brokerName=rocketMQ, queueId=0], queueOffset=31]
    SendResult [sendStatus=SEND_OK, msgId=7F0000012C4018B4AAC25F0602050060, offsetMsgId=C0A83D6500002A9F000000000009A85E, messageQueue=MessageQueue [topic=AsyncTopic, brokerName=rocketMQ, queueId=0], queueOffset=32]
    SendResult [sendStatus=SEND_OK, msgId=7F0000012C4018B4AAC25F0602040043, offsetMsgId=C0A83D6500002A9F000000000009A925, messageQueue=MessageQueue [topic=AsyncTopic, brokerName=rocketMQ, queueId=0], queueOffset=33]
    ......
    
    • 1
    • 2
    • 3
    • 4

    1.3 单向消息发送

    Producer向Broker发送消息后,并不会有返回值。

     		DefaultMQProducer producer = new DefaultMQProducer("pg");
            producer.setNamesrvAddr("rocketmq:9876");
            producer.start();
    
            // 生产并发送消息
            for (int i = 0; i < 100; i++){
                byte[] body = ("Hello," + i).getBytes();
                Message message = new Message("OneWayTopic", "OneWayTag", body);
    
                producer.sendOneway(message);
            }
    
            producer.shutdown();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    1.4 定义消费者

    		// 指定一个pull消费者
            // DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("cg");
            // 指定一个push消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
            consumer.setNamesrvAddr("rocketmq:9876");
            // 指定消费的Topic和Tag
            consumer.subscribe("firstTopic","*");
            // 指定从第一条消息开始消费
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 注册消息监听器
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                // 一旦Broker中有了订阅的消息就会触发监听器
                // 返回值为当前consumer消费状态
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                   // 消费消息
                    for (int i = 0; i < list.size(); i++) {
                        System.out.println(list.get(i));
                    }
                    // 消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            // 开启消费
            consumer.start();
            System.out.println("starting...");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    2、顺序消息

    严格按照消息的发送顺序进行消费的消息。
    根据有序范围的不同,RocketMQ可以严格的保证两种消息的有序性:全局有序分区有序

    2.1 全局有序

    当发送和消费参与的Queue只有一个时所保证的有序是整个Topic中消息的顺序,成为全局有序。
    全局有序比较简单,就是将queue设置为1就好了,就不代码演示。

    2.2 分区有序

    如果有多个Queue参与,其仅可保证在该Queue分区队列上的消息顺序,成为分区有序。
    Queue的选择:
    在定义Producer时,我们可以指定消息队列选择器,它需要实现MessageQueueSelector接口。
    定义选择器时要使用选择key,它可以是消息key,也可以是其他值。要求:唯一性。
    一般的选择算法:让选择key(或其hash值)与该topic所包含的Queue的数量进行取模。

    
    /**
    * Producer,发送顺序消息
    */
    public class Producer {
    
       public static void main(String[] args) throws Exception {
           DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    
           producer.setNamesrvAddr("rocketmq:9876");
    
           producer.start();
    
           String[] tags = new String[]{"TagA", "TagC", "TagD"};
    
           // 订单列表
           List<OrderStep> orderList = new Producer().buildOrders();
    
           Date date = new Date();
           SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
           String dateStr = sdf.format(date);
           for (int i = 0; i < 10; i++) {
               // 加个时间前缀
               String body = dateStr + " Hello RocketMQ " + orderList.get(i);
               Message msg = new Message("TopicTestA", tags[i % tags.length], "KEY" + i, body.getBytes());
    
               SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                   @Override
                   public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                       Long id = (Long) arg;  //根据订单id选择发送queue
                       long index = id % mqs.size();
                       return mqs.get((int) index);
                   }
               }, orderList.get(i).getOrderId());//订单id
    
               System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
                   sendResult.getSendStatus(),
                   sendResult.getMessageQueue().getQueueId(),
                   body));
           }
    
           producer.shutdown();
       }
    
       /**
        * 订单的步骤
        */
       private static class OrderStep {
           private long orderId;
           private String desc;
    
           public long getOrderId() {
               return orderId;
           }
    
           public void setOrderId(long orderId) {
               this.orderId = orderId;
           }
    
           public String getDesc() {
               return desc;
           }
    
           public void setDesc(String desc) {
               this.desc = desc;
           }
    
           @Override
           public String toString() {
               return "OrderStep{" +
                   "orderId=" + orderId +
                   ", desc='" + desc + '\'' +
                   '}';
           }
       }
    
       /**
        * 生成模拟订单数据
        */
       private List<OrderStep> buildOrders() {
           List<OrderStep> orderList = new ArrayList<OrderStep>();
    
           OrderStep orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("创建");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("付款");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111065L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("推送");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103117235L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           orderDemo = new OrderStep();
           orderDemo.setOrderId(15103111039L);
           orderDemo.setDesc("完成");
           orderList.add(orderDemo);
    
           return orderList;
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    		/**
    		* 顺序消息消费,带事务方式(应用可控制Offset什么时候提交)
    		*/
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
            consumer.setNamesrvAddr("rocketmq:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
    * 如果非第一次启动,那么按照上次消费的位置继续消费 */
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTestA", "TagA || TagC || TagD"); consumer.registerMessageListener(new MessageListenerOrderly() { Random random = new Random(); @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); for (MessageExt msg : msgs) { // 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序 System.out.println("consumeThread=" + Thread.currentThread().getName() + ",queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody())); } try { //模拟业务逻辑处理中... TimeUnit.SECONDS.sleep(random.nextInt(5)); } catch (Exception e) { e.printStackTrace(); } return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer Started.");
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    3、延迟消息

    当消息写入Broker后,在指定的时长后才可被消费处理的消息。
    现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18 消息消费失败会进入延时消息队列,消息发送时间与设置的延时等级和重试次数有关,详见代码SendMessageProcessor.java

    private String messageDelayLevel = “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;

    下面是Producer的代码,如果取消message.setDelayTimeLevel(3);这一行代码,则跟普通发送一样了。
    消费和普通的一样,就不列举了。

     		DefaultMQProducer producer = new DefaultMQProducer("ProducerG1");
            producer.setNamesrvAddr("rocketmq:9876");
            producer.start();
    
            for (int i = 0; i < 10; i++) {
                Message message = new Message("TopicA", "TagA", ("Hello-" + i).getBytes());
                // 指定延迟等级为3,即延迟10s消费,有可能存在一定的误差
                message.setDelayTimeLevel(3);
                producer.send(message);
            }
    
            producer.shutdown();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    4、事务消息

    RocketMQ提供了类似X/open XA的分布式事务功能,通过事务消息能达到分布式事务的最终一致,XA是一种分布式事务的解决方案,一种分布式事务处理模式。
    在这里插入图片描述

    5、批量消息

    5.1 批量发送消息

    生产者进行消息发送时可以一次性发送多条消息,这可以大大提高Producer的发送效率。
    注意:

    • 批量发送的消息必须具有相同的Topic
    • 批量发送的消息必须具有相同的刷盘策略
    • 批量发送的消息不能是延时消息或事务消息

    批量发送的大小
    默认情况下,一批发送的消息总大小不能超过4MB字节,如需超过,有两种解决办法:

    • 将批量消息进行拆分,进行多次批量发送
    • 在Producer端和Broker端修改属性
    • 1)Producer端需要在发送之前设置Producer的maxMessageSize属性
    • 2)Broker端需要修改其加载的配置文件r的maxMessageSize属性

    生产者发送的消息
    在这里插入图片描述
    send(message)是将message生成一个字符串进行发送,这个字符串由四个部门组成:Topic、消息body、消息日志、及用于描述消息的一堆key-value键值对。这些属性包括生产者地址、生产时间等信息,最终写入到Broker中消息单元的属性中。

    5.2 批量接收消息

    在这里插入图片描述

    5.3 消息列表分割器

    为了防止消息超过4m,超过了就将消息分割成消息列表

    public class MessageListSplitter implements Iterable<List<Message>>{
        private final int SIZE_LIMIT = 4 * 1024 * 1024;
        private final List<Message> MESSAGES;
        // 要进行批量操作的小集合的起始索引
        private int currentIndex;
    
        public MessageListSplitter(List<Message> messages) {
            MESSAGES = messages;
        }
    
        public boolean hashNext(){
            return currentIndex < MESSAGES.size();
        }
    
        public List<Message> next(){
            int nextIndex = currentIndex;
            int totalSize = 0;
            for(;nextIndex < MESSAGES.size(); nextIndex++){
                Message message = MESSAGES.get(nextIndex);
    
                // 计算当前消息的长度
                int tmpSize = message.getTopic().length() + message.getBody().length;
                Map<String,String> properties = message.getProperties();
                for (Map.Entry<String,String> entry: properties.entrySet()) {
                    tmpSize = entry.getKey().length() + entry.getValue().length();
                }
                tmpSize += 20;
    
                if (tmpSize > SIZE_LIMIT) {
                    if(nextIndex - currentIndex == 0){
                        nextIndex++;
                    }
                    break;
                }
    
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    totalSize += tmpSize;
                }
    
            }// for - end
    
    
            // subList -> [ )
            List<Message> subList = MESSAGES.subList(currentIndex, nextIndex);
            currentIndex = nextIndex;
            return subList;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    5.4 批量消息发送代码

    		DefaultMQProducer producer = new DefaultMQProducer("pg");
            producer.setNamesrvAddr("rocketmq:9876");
            // 只写此处是不行的,还得修改Broker加载的配置文件MaxMessageSize属性
            // producer.setMaxMessageSize(8 * 1024 * 1024);
            producer.start();
    
            List<Message> messageList = new ArrayList<Message>();
            for (int i = 0; i < messageList.size(); i++) {
                Message message = new Message("BatchTopic","BatchTag", "Batchkey" + i,("Hello" + i).getBytes());
                messageList.add(message);
            }
    
            MessageListSplitter splitter = new MessageListSplitter(messageList);
            while(splitter.hashNext()) {
                try {
                    List<Message> item = splitter.next();
                    producer.send(item);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            } // while - end
    
            producer.shutdown();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    5.5 批量消息消费代码

    // 指定每次批处理多少条消息
    consumer.setConsumeMessageBatchMaxSize(1);
    // 指定可以从Broker拉取的消息数量
    consumer.setPullBatchSize(32);
    
    • 1
    • 2
    • 3
    • 4

    6、消息过滤

    Tag过滤SQL 过滤
    Tag过滤 : 使用 || 分割,例如:TagA || TagC
    SQL过滤可以看成是Tag过滤的升级,可以进行复杂过滤。
    通过message.putUserProperty()事先埋入属性。如message.putUserProperty("age", i + "");
    消费时consumer.subscribe("topicAA", MessageSelector.bySql("age > 2"));通过bySQL进行过滤。

    Broker默认情况下没有开启SQL过滤,如需使用,需要修改配置文件的enablePropertyFilter = true;

    7、消息发送重试机制

    Producer对发送失败的消息进行重新发送的机制称为消息发送重试机制,也称为消息重投机制。
    特点:

    • 生产者发送消息,如果时同步或异步发送方式,发送失败会重试,若是单向发送则没有重试机制。
    • 只有普通消息有重试机制,顺序消息没有。
    • 该机制可以尽可能发送成功、不丢失,但可能造成消息重复。消息重复在RocketMQ中是无法避免的。消息重复在一般情况下是不会发生的,但是当网络波动、信息量大时,消息重复就成为了大概率事件。要避免消息重复消费。解决方案:为消息添加唯一标识,使消费者对消息进行消费判断来避免重复消费。
    • Producer主动重发、consumer负载变化也会导致消息重发。
    • 消息发送重试机制有三种策略:同步发送失败策略、异步发送失败策略、消息刷盘失败策略。

    8、消息消费重试机制

    8.1 顺序消息的消费重试

    当consumer消费消息失败后,为了保证消息的顺序性,其会自动不断的进行消息重试,直到成功消费,重试期间应用会出现消息消费阻塞的情况。因此,此时一定要保证应用能够及时监控并处理消费失败的消息,避免永久阻塞。
    重试的时间间隔默认是1000ms,可以通过consumer.setSuspendCurrentQueueTimeMillis(ms);来修改。ms的取值范围为10-30000ms。

    顺序消息是没有发送重试,而有消费重试的。

    8.2 无序消息的消费重试

    当consumer消费消息失败后,可以通过设置返回状态达到消息重试的效果。
    无序消息只对集群消费模式生效。
    广播消费方式没有消费重试,失败后不再重试,继续消费后续消息。

    8.3 无序消息的消费重试的次数与间隔

    在无序消息集群模式下,每条消息最多重试16次,但每次的时间间隔不一样。会逐渐变长,如下。
    可以通过consumer.setMaxReconsumeTimes();修改次数。若这个次数小于等于16次,则仍然按照时间间隔执行,若大于16次,则都是2小时。
    对于consumer group,若仅修改了一个consumer的消费次数,则会应用到该group的其他consumer实例。
    若修改了多个consumer实例,则采用覆盖策略,最后的修改会覆盖前面的修改。
    在这里插入图片描述

    8.4 无序消息的重试队列

    对于需要重试消费的消息,,并不是consumer在等待相应时间后再去拉取原来的消息进行消费,而是将这些消息放入一个特殊的Topic的队列中,而后进行再次消费,这个队列就是重试队列。Broker为消费者组创建的队列名称为%RETRY%consumerGroup@consumerGroup。

    8.5 无序消息的消费重试配置方式

    无序消息的集群模式下,消息失败后希望进行重试,需要在消息监听器接口的实现中明确进行如下三种之一的配置。

    • 返回ConsumeConcurrentStatus.RECONSUME_LATER
    • 返回null
    • 抛出异常

    8.6 无序消息的消费不重试配置方式

    无序消息的集群模式下,消息失败后不希望进行重试,则捕获异常后返回ConsumeConcurrentStatus.CONSUME_SUCCESS即可。

    9、死信队列

    当重试次数达到最大次数时,仍然无法消费消息,表明消费者在正常情况下无法正确的消费消息,此时就会将该消息发送到一个特殊队列——死信队列。其中的消息称为死信消息。

    特征:

    • 死信队列中的消息不会再被消费者正常消费。
    • 死信存储有效期与正常消息相同,均为3天,3天后会被自动清理。
    • 死信队列就是一个特殊的Topic,名称为%DLQ%consumerGroup@consumerGroup。
    • 如果一个消费者组没有产生死信消息,则不会为其创建死信队列。

    处理:
    当一条消息进入死信队列,就意味着系统中某些地方出现了问题,导致消费者无法正常处理消息。因此,对于私信消息,通常需要开发人员进行特殊处理,最关键的步骤是排查可疑因素,解决可能存在的bug,然后将原来的私信消息进行重新投递消费。

    结束语

    前前后后还是花了很多时间来学习RocketMQ,主要是根据尚硅谷的视频来的,因此,在这几章里,有的地方是直接截取的课件的内容。虽然说,从头到尾跟了一遍,但个人感觉还是不太够,理论知识比较多,实操还是太少了点,因此,我需要再去找点东西练练手,最后再将RocketMQ融入到Spring Boot 或者 Spring Cloud中去。需要多次的练手,融合贯通知识,才能学有所成,学有所用。一起卷起来吧,再会。

    本章参考

    B站尚硅谷视频:https://www.bilibili.com/video/BV1cf4y157sz
    GitHub:https://github.com/apache/rocketmq/blob/master/docs/cn/RocketMQ_Example.md
    GitHub上的比较的新且全,实例也比较好,可以多参考。

  • 相关阅读:
    Service Mesh之Istio基础入门
    Stream
    全球名校AI课程库(31)| MIT麻省理工 · 深度学习导论课程『Introduction to Deep Learning』
    WebStorm 2023:让您更接近理想的开发环境 mac/win版
    L86.linux命令每日一练 -- 第12章 Linux系统常用内置命令(二)
    PostgreSQL、KingBase 数据库 ORDER BY LIMIT 查询缓慢案例
    目标检测YOLO系列从入门到精通技术详解100篇-【目标检测】目标视觉检测
    Mybatis引子—Mybatis简单使用
    win11 无法登录微软账户 终极解决方案
    NDP 协议介绍
  • 原文地址:https://blog.csdn.net/qq_41404112/article/details/126410502