• 解析 RocketMQ 业务消息 - “顺序消息”


    引言

    Apache RocketMQ 诞生至今,历经十余年大规模业务稳定性打磨,服务了阿里集团内部业务以及阿里云数以万计的企业客户。作为金融级可靠的业务消息方案,RocketMQ 从创建之初就一直专注于业务集成领域的异步通信能力构建。本篇将继续业务消息集成的场景,从功能原理、应用案例、最佳实践以及实战等角度介绍 RocketMQ 的顺序消息功能。

    简介

    顺序消息是消息队列 RocketMQ 版提供的一种对消息发送和消费顺序有严格要求的消息。对于一个指定的 Topic,同一 MessageGroup 的消息按照严格的先进先出(FIFO)原则进行发布和消费,即先发布的消息先消费,后发布的消息后消费,服务端严格按照发送顺序进行存储、消费。同一 MessageGroup 的消息保证顺序,不同 MessageGroup 之间的消息顺序不做要求,因此需做到两点,发送的顺序性和消费的顺序性。

    功能原理

    在这里首先抛出一个问题,在日常的接触中,许多 RocketMQ 使用者会认为,既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?接下来就会围绕这个问题,对比普通消息和顺序消息进行阐述。

    顺序发送

    在分布式环境下,保证消息的全局顺序性是十分困难的,例如两个 RocketMQ Producer A 与 Producer B,它们在没有沟通的情况下各自向 RocketMQ 服务端发送消息 a 和消息 b,由于分布式系统的限制,我们无法保证 a 和 b 的顺序。因此业界消息系统通常保证的是分区的顺序性,即保证带有同一属性的消息的顺序,我们将该属性称之为 MessageGroup。如图所示,ProducerA 发送了 MessageGroup 属性为 A 的两条消息 A1,A2 和 MessageGroup 属性为 B 的 B1,B2,而 ProducerB 发送了 MessageGroup 属性为 C 的两条属性 C1,C2。

    同时,对于同一 MessageGroup,为了保证其发送顺序的先后性,比较简单的做法是构造一个单线程的场景,即不同的 MessageGroup 由不同的 Producer 负责,并且对于每一个 Producer 而言,顺序消息是同步发送的。同步发送的好处是显而易见的,在客户端得到上一条消息的发送结果后再发送下一条,即能准确保证发送顺序,若使用异步发送或多线程则很难保证这一点。

    因此可以看到,虽然在底层原理上,顺序消息发送和普通消息发送并无二异,但是为了保证顺序消息的发送顺序性,同步发送的方式相比较普通消息,实际上降低了消息的最大吞吐。

    顺序消费

    与顺序消息不同的是,普通消息的消费实际上没有任何限制,消费者拉取的消息是被异步、并发消费的,而顺序消息,需要保证对于同一个 MessageGroup,同一时刻只有一个客户端在消费消息,并且在该条消息被确认消费完成之前(或者进入死信队列),消费者无法消费同一 MessageGroup 的下一条消息,否则消费的顺序性将得不到保证。因此这里存在着一个消费瓶颈,该瓶颈取决于用户自身的业务处理逻辑。极端情况下当某一 MessageGroup 的消息过多时,就可能导致消费堆积。当然也需要明确的是,这里的语境都指的是同一 MessageGroup,不同 MessageGroup 的消息之间并不存在顺序性的关联,是可以进行并发消费的。因此全文中提到的顺序实际上是一种偏序。

    小结

    无论对于发送还是消费,我们通过 MessageGroup 的方式将消息分组,即并发的基本单元是 MessageGroup,不同的 MessageGroup 可以并发的发送和消费,从而一定程度具备了可拓展性,支持多队列存储、水平拆分、并发消费,且不受影响。回顾普通消息,站在顺序消息的视角,可以认为普通消息的并发基本单元是单条消息,即每条消息均拥有不同的 MessageGroup。

    我们回到开头那个问题:

    既然顺序消息能在普通消息的基础上实现顺序,看起来就是普通消息的加强版,那么为什么不全部都使用顺序消息呢?

    现在大家对于这个问题可能有一个基本的印象了,消息的顺序性当然很好,但是为了实现顺序性也是有代价的。

    下述是一个表格,简要对比了顺序消息和普通消息。

    最佳实践

    合理设置 MessageGroup

    MessageGroup 会有很多错误的选择,以某电商平台为例,某电商平台将商家 ID 作为 MessageGroup,因为部分规模较大的商家会产出较多订单,由于下游消费能力的限制,因此这部分商家所对应的订单就发生了严重的堆积。正确的做法应当是将订单号作为 MessageGroup,而且站在背后的业务逻辑上来说,同一订单才有顺序性的要求。即选择 MessageGroup 的最佳实践是:MessageGroup 生命周期最好较为短暂,且不同 MessageGroup 的数量应当尽量相同且均匀。

    同步发送和发送重试

    如之前章节所述,需使用同步发送和发送重试来保证发送的顺序性。

    消费幂等

    消息传输链路在异常场景下会有少量重复,业务消费是需要做消费幂等,避免重复处理带来的风险。

    应用案例

    • 用户注册需要发送验证码,以用户 ID 作为 MessageGroup,那么同一个用户发送的消息都会按照发布的先后顺序来消费。
    • 电商的订单创建,以订单 ID 作为 MessageGroup,那么同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息都会按照发布的先后顺序来消费。

    实战

    发送

    可以看到,该发送案例设置了 MessageGroup 并且使用了同步发送,发送的代码如下:

    1. public class ProducerFifoMessageExample {
    2. private static final Logger LOGGER = LoggerFactory.getLogger(ProducerFifoMessageExample.class);
    3. private ProducerFifoMessageExample() {
    4. }
    5. public static void main(String[] args) throws ClientException, IOException {
    6. final ClientServiceProvider provider = ClientServiceProvider.loadService();
    7. // Credential provider is optional for client configuration.
    8. String accessKey = "yourAccessKey";
    9. String secretKey = "yourSecretKey";
    10. SessionCredentialsProvider sessionCredentialsProvider =
    11. new StaticSessionCredentialsProvider(accessKey, secretKey);
    12. String endpoints = "foobar.com:8080";
    13. ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    14. .setEndpoints(endpoints)
    15. .setCredentialProvider(sessionCredentialsProvider)
    16. .build();
    17. String topic = "yourFifoTopic";
    18. final Producer producer = provider.newProducerBuilder()
    19. .setClientConfiguration(clientConfiguration)
    20. // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before
    21. // message publishing.
    22. .setTopics(topic)
    23. // May throw {@link ClientException} if the producer is not initialized.
    24. .build();
    25. // Define your message body.
    26. byte[] body = "This is a FIFO message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);
    27. String tag = "yourMessageTagA";
    28. final Message message = provider.newMessageBuilder()
    29. // Set topic for the current message.
    30. .setTopic(topic)
    31. // Message secondary classifier of message besides topic.
    32. .setTag(tag)
    33. // Key(s) of the message, another way to mark message besides message id.
    34. .setKeys("yourMessageKey-1ff69ada8e0e")
    35. // Message group decides the message delivery order.
    36. .setMessageGroup("youMessageGroup0")
    37. .setBody(body)
    38. .build();
    39. try {
    40. final SendReceipt sendReceipt = producer.send(message);
    41. LOGGER.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
    42. } catch (Throwable t) {
    43. LOGGER.error("Failed to send message", t);
    44. }
    45. // Close the producer when you don't need it anymore.
    46. producer.close();
    47. }
    48. }

    消费

    消费的代码如下:

    1. public class SimpleConsumerExample {
    2. private static final Logger LOGGER = LoggerFactory.getLogger(SimpleConsumerExample.class);
    3. private SimpleConsumerExample() {
    4. }
    5. public static void main(String[] args) throws ClientException, IOException {
    6. final ClientServiceProvider provider = ClientServiceProvider.loadService();
    7. // Credential provider is optional for client configuration.
    8. String accessKey = "yourAccessKey";
    9. String secretKey = "yourSecretKey";
    10. SessionCredentialsProvider sessionCredentialsProvider =
    11. new StaticSessionCredentialsProvider(accessKey, secretKey);
    12. String endpoints = "foobar.com:8080";
    13. ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
    14. .setEndpoints(endpoints)
    15. .setCredentialProvider(sessionCredentialsProvider)
    16. .build();
    17. String consumerGroup = "yourConsumerGroup";
    18. Duration awaitDuration = Duration.ofSeconds(30);
    19. String tag = "yourMessageTagA";
    20. String topic = "yourTopic";
    21. FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
    22. SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
    23. .setClientConfiguration(clientConfiguration)
    24. // Set the consumer group name.
    25. .setConsumerGroup(consumerGroup)
    26. // set await duration for long-polling.
    27. .setAwaitDuration(awaitDuration)
    28. // Set the subscription for the consumer.
    29. .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
    30. .build();
    31. // Max message num for each long polling.
    32. int maxMessageNum = 16;
    33. // Set message invisible duration after it is received.
    34. Duration invisibleDuration = Duration.ofSeconds(5);
    35. final List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
    36. for (MessageView message : messages) {
    37. try {
    38. consumer.ack(message);
    39. } catch (Throwable t) {
    40. LOGGER.error("Failed to acknowledge message, messageId={}", message.getMessageId(), t);
    41. }
    42. }
    43. // Close the simple consumer when you don't need it anymore.
    44. consumer.close();
    45. }
    46. }

    作者:绍舒

    原文链接

    本文为阿里云原创内容,未经允许不得转载。

  • 相关阅读:
    三台centos7部署redis6.2版本集群
    深入理解 Python 虚拟机:字典(dict)的实现原理及源码剖析
    APS自动排产-AP工厂高级计划
    Flink部署模式
    科技云报道:历经四年,RPA走向同质化?
    Kali-linux Arpspoof工具
    十七、redux
    论文笔记:Contrastive Trajectory Similarity Learning withDual-Feature Attention
    k8s学习-Secret(创建、使用、更新、删除等)
    初学者必读书籍——两个月速成Python
  • 原文地址:https://blog.csdn.net/weixin_43970890/article/details/126526420