• RocketMQ 顺序消息解析——图解、源码级解析


    🍊 Java学习:Java从入门到精通总结

    🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

    🍊 绝对不一样的职场干货:大厂最佳实践经验指南


    📆 最近更新:2022年7月30日

    🍊 个人简介:通信工程本硕💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

    🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


    顺序消息

    说到顺序,我们经常会将它和现实里的时间关联起来,即按照时间顺序表示事件的先后关系。比如发生在10:00的消息A就要早于发生在11:00的消息B。

    上面例子之所以成立的原因是他们有相同的参考系,倘若A的时间是北京时间,而B的时间是纽约时间,这个先后顺序就不一定成立了。


    当然除了时间以外,A和B之间的因果关系也可以断定他们的顺序,例如退款一定发生于付款之后。

    综上所述,我们所讲的顺序,实际上的意思是:

    1. 有统一的时间参考下,事件发生的先后关系
    2. 没有统一的时间参考下的happen-before关系

    分布式环境下的顺序

    设想一个分布式的环境:

    1. 同一线程上的事件是有明确顺序关系的,发生的先后顺序就是
    2. 不同线程的事件只能通过因果关系去推断

    在这里插入图片描述
    例如针对上图的两个线程A和B,进程A中的事件有明显的先后顺序(A1 -> A2 -> A3 -> A4),又因为A1给B2发了消息,所以A1一定在B2之前……


    消息中间件中的顺序消息

    RocketMQ支持顺序消息的功能,既有顺序发送又有顺序消费

    而顺序消息又包含了两种类型:

    • 分区顺序:一个Partition内所有的消息按照先进先出的顺序进行发布和消费
    • 全局顺序:一个Topic内所有的消息按照先进先出的顺序进行发布和消费

    对于顺序消费,需要明确哪些来自同一个发送线程的消息在消费时是按照相同的发送顺序来进行消费的。


    在MQ里,顺序在不同的阶段里都需要得到保障:

    1. 发送消息是保证顺序

    在同一个线程内应该采取同步的方式发送;

    1. 消息按照顺序存储

    按照A、B顺序发送的消息,在空间上A也要保存在B之前;

    1. 按照存储的顺序消费消息

    消息A、B到达后,Consumer先消费A后消费B

    在这里插入图片描述
    如上图所示,假设有两个订单A、B,消息的原始数据为a1、b1、b2、a2、a3、b3

    • 发送阶段: A订单的消息要保证a1、a2、a3的顺序,B订单的消息要保证b1、b2、b3的顺序,但是两个订单之间的消息没有先后顺序要求,所以可以由两个线程分别发送

    • 存储阶段: A订单的消息要保证a1、a2、a3的顺序,B订单的消息要保证b1、b2、b3的顺序,但是两个订单之间的消息没有先后顺序要求

    • 消费阶段: 可以由一个线程按照接收到的顺序进行消费,也可以用两个线程分别消费订单A和订单B的数据



    RocketMQ中顺序消息的实现

    在RocketMQ里顺序消息的实现如下图所示:
    在这里插入图片描述
    假设创建订单,订单付款,完成订单是三个必须的顺序消息,通过他们相同的订单ID将其路由到不同的分区中,Consumer消费时一个分区只对应一个线程来消费,从而保障消息的顺序性。



    Producer顺序发送

    Producer要确保消息有序性唯一要做的就是将消息路由到特定的分区,在RocketMQ中,通过MessageQueueSelector来实现分区的选择。

    public interface MessageQueueSelector {
        MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
    }
    
    • 1
    • 2
    • 3
    1. List mqs:消息要发送到的Topic下的所有队列
    2. Message msg:消息对象
    3. Object arg:用户自定义的参数

    例如下面的代码就可以将相同订单ID的消息路由到相同的分区:

    long orderId = order.getOrderId;
    return mqs.get(orderId % mqs.size());
    
    • 1
    • 2

    完整的示例Demo如下:

    public class Main {
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("Group A");
            producer.setNamesrvAddr("localhost");
            producer.start();
    
            for (int i = 0; i < 100; i++) {
                // 流水号
                int orderId = i % 10;
                // 构造消息对象
                Message msg = new Message("Topic A", "TagA", ("Hello RocketMQ " + i).getBytes(StandardCharsets.UTF_8));
                // 发送消息,将相同订单id的消息路由到同一个MQ里
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);
                System.out.println(sendResult);
            }
            producer.shutdown();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25



    Consumer顺序消费

    RocketMQ消费消息有两种形式:拉模式和推模式,分别对应MQPullConsumerMQPushConsumer

    • MQPullConsumer由用户线程控制,主动从服务端获取MQ中的一条消息,所以拿到的消息也是天然有顺序的,Consumer在消费时也要保证自己的消费顺序
      在这里插入图片描述

    • MQPushConsumer由消息中间件主动推送消息给Consumer,由用户注册MessageListener来消费消息


    MQPullConsumer为例,保证消息顺序的流程如下:

    1. PullMessageService以单线程从Broker中拿消息
    2. 拿到消息后将其放入ProcessQueue中(可以看做是消息的缓存)
    3. ConsumeMessageService以多线程的形式尝试获取锁,拿到锁之后再从ProcessQueue中获取消息

    示例代码如下:

    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Group A");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("Topic A", "Tag A");
            consumer.setNamesrvAddr("localhost");
            
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    如何保证消费消息的顺序?

    1. 消息添加至ProcessQueue的过程是单线程执行的,所以ProcessQueue中的消息也是顺序的
    2. 消费时是从ProcessQueue中读取消息进行消费,并且使用锁进行了并发控制,所以也是有序的

    在这里插入图片描述


    顺序消息带来的缺陷

    聊完了什么是顺序消息以及他们的实现方式,接下来就看一看为了保证消息的一致,系统所引入的一些缺陷。

    1. 当出现热点数据时,可能某些MessageQueue的数据量会很大
    2. 发送顺序消息不能使用分布式系统的容错性,因为针对同一条数据只能被发送到某一个MessageQueue
    3. 某一串顺序消息里,即使有某条消息消费失败,也不能跳过

    一些尝试性的解决方案:
    目前对于热点数据没有什么好的解决方法,只能通过优化路由策略或拆分MessageQueue来将消息尽可能均匀地发送给不同的MessageQueue

    对于同一个MessageQueue,也可以有其副本,这些MessageQueue之间有自己的路由规则。

    对于消费失败的消息,可以提供重试机制来重新消费这条消息,前提是要满足系统的幂等性。

  • 相关阅读:
    数电学习(十、脉冲波形的产生和整形)(二)
    JDBC知识点
    产品经理如何独立从0-1着手甲方项目,或者负责一个产品?
    卡尔曼滤波之二:Python实现
    薪资17K+需要什么水平?98年测试工程师面试实录…
    C# 使用base64编码用于加密和解密
    GMT,UTC,CST,DST,RTC,NTP,SNTP,NITZ: 嵌入式的时间
    电脑数据同步到APP
    Web全栈开发训练营
    如何才能在人力RPO蓝海项目中实现盈利呢?
  • 原文地址:https://blog.csdn.net/HNU_Csee_wjw/article/details/123003527