• 如何实现RabbitMQ、kafaka、rocketmq等消息队列的消息有序


    概述

    解决思路:将需要保证先后顺序的消息放到同一个队列,只用一个消费者去消费该队列。即保证入队有序,出队后的顺序交给消费者自己保证

    1. 大多数的项目是不需要保证 mq 消息顺序一致性的问题,只有在一些特定的场景可能会
      需要,比如 MySQL 与 Redis 实现异步同步数据;
    2. 所有消息需要投递到同一个 mq 服务器,同一个分区模型中存放,最终被同一个消费者
      消费,核心原理:设定相同的消息 key,根据相同的消息 key 计算 hash 存放在同一个分区
      中。
      3.如果保证了消息顺序一致性有可能降低我们消费者消费的速率。

    消息分区:对于 Kafka 和 RocketMQ,可以通过消息分区的方式将相关的消息发送到同一个分区中,然后消费者按照分区顺序来消费消息,确保消息的有序性。对于 RabbitMQ,可以使用 Exchange 和 Queue 的绑定方式来确保相关消息发送到同一个队列,并由单个消费者按顺序消费。
    undefined 消息标识:在消息中增加序号或者唯一标识,消费者根据消息的序号或者标识来进行顺序处理。
    单一消费者:在 RabbitMQ 和 RocketMQ 中,可以保证每个队列只有一个消费者来消费消息,从而保证消息的顺序性。对于 Kafka,可以采用单个消费者组来消费特定的分区,确保消息有序。
    全局排序服务:对于需要全局有序的场景,可以引入全局排序服务,将乱序的消息发送到该服务进行排序后再发送到消息队列,消费者按照顺序来消费。
    引入时间戳:在消息中添加时间戳,消费者可以根据消息的时间戳来进行排序和处理。
    以上是一些常见的实现消息有序处理的方法,具体选择哪种方式取决于业务场景和对消息有序性的要求。需要根据具体情况选择合适的技术和策略来实现有序消息处理。
    Kafka 最适合处理流数据,在同一主题同一分区内保证消息顺序,
    RabbitMQ 对流中消息的顺序只提供基本的保证。

    RabbitMQ消息有序

    拆分多个 queue(消息队列),每个 queue(消息队列) 一个 consumer(消费者),就是多一些 queue
    (消息队列)而已,确实是麻烦点;
    或者就一个 queue (消息队列)但是对应一个 consumer(消费者),然后这个 consumer(消费者)内
    部用内存队列做排队,然后分发给底层不同的 worker 来处理。

    kafka消息有序

    1 个 Topic 只对应一个 Partition。
    (推荐)发送消息的时候指定 key/Partition。
    该topic强制采用一个分区,所有消息放到一个队列里,这样能达到全局顺序性。但是会损失高并发特性。
    undefined 局部有序,采用路由机制,将同一个订单的不同状态消息存储在一个分区partition,单线程消费。比如Kafka就提供了一个接口扩展org.apache.kafka.clients.Partitioner,方便开发人员按照自己的业务场景来定制路由规则。
    partatation 同一个分片
    消息队列保证顺序性整体思路就这样啦。比如 Kafka 全局有序消息,就这种思想体现: 就✁生产者发消息时,1 个 Topic只能对应 1 个 Partition, 一个 Consumer,内部单线程消费。
    但这样吞吐量低,一般保证消息局部有序即可。在发消息时候指定Partition Key,Kafka 对其进行Hash 计算,根据计算结果决定放入哪个Partition。这样 Partition Key 相同消息会放在同一个 Partition。然后多消费者单线程消费指定Partition。
    一个topic,一个partition,一个consumer消费一个内存queue即可
    Kafka 怎么保证消息是有序的
    消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏 移量(offset)来保证消息在分区内的顺序性。发送消息的时候指定 key/Partition
    Kafka如何保证消息的全局有序
    全局使用一个生产者,全局使用一个消费者,全局使用一个分区严格来说,kafka只能保证同一个分区内的消息存储的有序性
    在 Kafka 中近似实现消息有序消费的方法:

    1. 将相关消息发送到同一个分区:确保相关的消息被发送到同一个分区中,这样可以在消费端针对该分区进行顺序消费。
    2. 在消费者端实现有序消费:在消费端,使用单线程消费同一个分区中的消息,这样可以保证在该消费者实例中的消息有序消费。
      下面是一个简单的示例代码,展示如何在 Kafka 中实现简单的消息有序消费:
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class OrderedKafkaConsumer {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("topic_name"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在上述代码中,我们创建了一个 Kafka 消费者,并订阅了一个名为 “topic_name” 的主题。消费者会从该主题中拉取消息并逐条消费。请注意,这种方式仅适用于单个消费者实例消费单个分区的情况下。
    需要注意的是,由于 Kafka 的设计特性,完全实现全局的消息有序消费是比较困难的。因此,在实际应用中,建议根据业务需求和场景来选择合适的方案,并在可能的情况下尽量避免依赖全局的消息顺序。

    如何rocketmq的消息有序

    RocketMQ 提供了一种简单而有效的方法来确保消息的有序性,即通过消息队列中的顺序消费。以下是实现 RocketMQ 消息有序性的基本思路:
    undefined 消息发送有序性:在发送消息时,可以为每条消息设置一个自定义的 key(例如订单号、用户ID等),保证同一个 key 的消息会被发送到同一个队列或同一个消息分区中。
    undefined 消费者顺序消费:在消费消息时,保证消费者按照顺序从队列或分区中拉取消息,并且只有一个消费者消费该队列或分区的消息。这样就能确保消息的有序性。
    undefined 单线程消费:为了确保每个消费者实例只有一个线程来消费消息,在消费消息时可以采用单线程消费的方式,避免多线程并发消费导致消息顺序混乱。
    undefined 设置消费模式:在 RocketMQ 中,可以通过设置消费者的消费模式为Orderly,来保证消息的顺序消费。Orderly 模式确保了同一个队列中的消息是有序消费的。
    undefined 消息处理幂等性:在消费消息时,需要保证消息处理的幂等性,即同一条消息被消费多次也不会产生影响。这样可以避免由于消息重复消费导致的数据错误。
    要实现 RocketMQ 中消息的有序消费,可以按照以下步骤进行:

    1. 发送有序消息:在发送消息时,确保将相关的消息按顺序发送到同一个消息队列或者同一个消息 Topic 下。
    2. 消费者端实现有序消费:在消费消息时,保证消息的有序性,可以通过设置消费者的消费模式为Orderly来实现。消费者使用顺序消费模式时,会从同一个队列中依次拉取消息,确保消息的有序性。
      下面是一个简单的Java代码示例,演示如何在RocketMQ中实现有序消费:
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    public class OrderedConsumer {
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
            consumer.setNamesrvAddr("localhost:9876");
            
            // 设置消费模式为顺序消费
            consumer.setMessageModel(MessageModel.CLUSTERING);
            
            consumer.subscribe("topic_name", "*");
            
            // 注册顺序消息监听器
            consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
                for (MessageExt msg : msgs) {
                    System.out.println("Received message: " + new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            });
            
            consumer.start();
            System.out.println("Consumer started.");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在上面的代码中,我们创建了一个RocketMQ消费者,并设置了消费模式为顺序消费。然后注册了一个顺序消息监听器,确保消息按顺序被消费。

  • 相关阅读:
    ELK8.4安装配置错误记录
    如何编写有效的接口测试
    Kotlin 使用@BindingAdapter编译出错
    深度学习 paper 代码复现参考
    【LIN总线测试】——LIN主节点物理层测试
    DataFrame,数据列筛选代替遍历每一行数据去判断,大大提高数据过滤速度
    Git - 命令的操作规范
    [AIGC] 字节跳动面试题:简单说说 JVM 的垃圾回收机制
    中压电缆和高压电缆有哪些型号?它们的的执行标准是什么?
    【MATLAB教程案例14】基于ACO蚁群优化算法的函数极值计算matlab仿真及其他应用
  • 原文地址:https://blog.csdn.net/Fireworkit/article/details/136524376