• 深入解析Kafka消息传递的可靠性保证机制


    深入解析Kafka消息传递的可靠性保证机制

    Kafka在设计上提供了不同层次的消息传递保证,包括at most once(至多一次)、at least once(至少一次)和exactly once(精确一次)。每种保证通过不同的机制实现,下面详细介绍Kafka如何实现这些消息传递保证。

    1. At Most Once(至多一次)

    在这种模式下,消息可能会丢失,但不会被重复传递。这通常发生在消费者在处理消息之前提交了偏移量,导致即使消息处理失败,也认为已经处理完成。

    实现机制:

    • 消费者配置enable.auto.commit=true,并且默认提交偏移量的时间间隔较短(auto.commit.interval.ms)。
    • 消费者在处理消息之前提交偏移量,处理过程中如果发生故障,消息不会被重新处理。

    2. At Least Once(至少一次)

    在这种模式下,消息不会丢失,但可能会被重复传递。消费者确保在处理消息后才提交偏移量,故障恢复后会重新处理未提交偏移量的消息。

    实现机制:

    • 消费者配置enable.auto.commit=false,手动提交偏移量。
    • 消费者在处理完每条消息后调用consumer.commitSync()或consumer.commitAsync()提交偏移量。

    示例代码

    Properties props = new Properties();
    props.put("bootstrap.servers", "your_kafka_broker:9092");
    props.put("group.id", "test_group");
    props.put("enable.auto.commit", "false");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Arrays.asList("your_topic"));
    
    try {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                // 处理消息
            }
            consumer.commitSync(); // 确保消息处理后提交偏移量
        }
    } finally {
        consumer.close();
    }
    

    3. Exactly Once(精确一次)

    在这种模式下,消息既不会丢失也不会重复传递。Kafka通过引入幂等性生产者和事务性API来实现这种保证。

    实现机制

    • 幂等性生产者:确保生产者在重试发送时不会产生重复消息。通过配置enable.idempotence=true启用幂等性。
    • 事务性生产者和消费者:确保在生产和消费过程中可以使用事务,使消息的生产和消费操作要么全部成功要么全部失败。

    配置幂等性生产者

    Properties props = new Properties();
    props.put("bootstrap.servers", "your_kafka_broker:9092");
    props.put("acks", "all");
    props.put("retries", Integer.MAX_VALUE);
    props.put("enable.idempotence", "true");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    

    使用事务性生产者和消费者

    // 配置事务性生产者
    Properties props = new Properties();
    props.put("bootstrap.servers", "your_kafka_broker:9092");
    props.put("acks", "all");
    props.put("retries", Integer.MAX_VALUE);
    props.put("enable.idempotence", "true");
    props.put("transactional.id", "my-transactional-id"); // 唯一的事务ID
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    producer.initTransactions(); // 初始化事务
    
    try {
        producer.beginTransaction(); // 开始事务
        producer.send(new ProducerRecord<>("your_topic", "key", "value"));
        // 其他的发送操作
        producer.commitTransaction(); // 提交事务
    } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
        producer.close();
    } catch (KafkaException e) {
        producer.abortTransaction(); // 中止事务
        producer.close();
    }
    

    在消费者端,可以使用Kafka Streams API或者事务性消费模式确保精确一次语义。

    总结

    Kafka提供了不同层次的消息传递保证,通过合适的配置和使用模式,用户可以根据应用需求选择合适的保证模式:

    • At Most Once:适用于对数据丢失不敏感的应用。
    • At Least Once:适用于不能接受数据丢失但可以接受重复数据的应用。
    • Exactly Once:适用于对数据一致性要求非常高的应用。

    通过合理配置生产者、消费者和broker,可以在不同场景下实现合适的消息传递保证。

  • 相关阅读:
    快鲸智慧楼宇数字化管理平台,赋能商业地产快速转型升级
    PTA 7-78 烤肉饼(*)
    Java单例模式
    Zookeeper
    JavaScript判断字符串是否为数字类型:Number.isInteger、isNaN、正则表达式比较
    python学习笔记(三)
    maven 阿里源配置完整 亲测有效
    小乌龟,git使用教程,gitLab打版教程
    nonebot 原神角色查询插件
    如何解决MidJourney错过付费后被暂停
  • 原文地址:https://blog.csdn.net/qq_38411796/article/details/139550706