• Kafka消费一致性和幂等性分析


    1、前言

    分布式系统中,消息队列被广泛用于数据的传输和处理。其中,Kafka因其高吞吐量、可扩展性和容错性而备受关注。然而,在处理海量数据时,确保消息的一致性和幂等性十分重要。本文将通过代码示例,对Kafka消费一致性和幂等性进行分析。

    2、问题背景

    Kafka消费过程中,消费者从消息队列中获取消息并处理。然而,在某些场景下,可能会出现消息处理不一致的情况。例如,当消费者从Kafka中获取一条消息后,如果处理过程中发生异常,导致该消息未被正确标记为已处理,那么在下次消费时,该消息可能会被重复处理。这将对数据的一致性造成影响。

    3、场景

    假设我们有一个订单系统,它使用Kafka来处理订单的创建、更新和取消操作。当消费者从Kafka中获取一条订单创建消息时,它会将订单信息写入数据库。然而,在写入数据库的过程中,由于网络异常或数据库故障等原因,导致订单信息写入失败。此时,消费者未将该订单状态标记为已处理,那么在下次消费时,该订单信息可能会被重复写入数据库,从而导致数据的不一致。

    4、原因分析

    Kafka消费过程中出现不一致性的原因主要有以下几点:

    1. 消息处理失败

    当消费者获取到消息后,如果处理失败,未将该消息状态标记为已处理,就会导致消息的不一致。

    2. 消费者重试

    当消费者处理消息失败后,可能会进行重试。然而,如果未进行有效控制,可能会导致消息的重复处理。

    3. Kafka消费的故障转移

    当消费者出现故障时,Kafka会将该消费者已消费但未标记的消息重新推送到其他消费者进行消费。如果新消费者未正确处理这些消息,也会导致数据的不一致。

    5、解决方案

    为了解决Kafka消费过程中出现的不一致性问题,我们可以采取以下几种解决方案:

    1. 引入外部存储

    我们可以使用外部存储(如数据库)来记录消息的处理状态。当消费者获取到消息后,先查询外部存储,如果该消息已经被处理过,则直接跳过;否则,进行处理并将状态更新为已处理。这样可以确保每个消息只被处理一次,从而保证数据的一致性。

    2. 使用分布式锁

    我们可以使用分布式锁来保证同一时间只有一个消费者可以处理该消息。当消费者获取到消息后,先尝试获取分布式锁,如果获取成功,则进行处理并将状态更新为已处理;否则,说明其他消费者已经处理过该消息,直接跳过。这样可以避免消息的重复处理。

    3. 控制重试次数

    我们可以限制消费者的重试次数,避免因重试导致的消息重复处理。当消费者处理消息失败后,最多只能重试指定次数。超过重试次数后,消费者需要向Kafka发送一个特殊标识,表示该消息无法处理,Kafka会将该消息重新推送到其他消费者进行消费。这样可以避免因重试导致的消息重复处理。

    以下是一个简单的示例代码,用于记录消息的处理状态:

    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.springframework.jdbc.core.JdbcTemplate;
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class KafkaConsumerExample {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "test-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList("my-topic"));
    
            JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    String messageId = record.key();
    				String status = record.value();
    				// 校验是否已消费
                if (jdbcTemplate.queryForObject("SELECT COUNT(*) FROM processed_messages WHERE message_id = ?", Integer.class, messageId) > 0) {  
                    // Message has been processed, skip processing  
                    System.out.println("Message with ID " + messageId + " has been processed before, skipping...");  
                    continue;  
                }  
    
                // Process the message  
                try {  
                    // TODO: 消息处理逻辑
                    System.out.println("Processing message with ID " + messageId + "...");  
    
                    // 消息状态变更
                    jdbcTemplate.update("INSERT INTO processed_messages (message_id, status) VALUES (?, ?)", messageId, status);  
                } catch (Exception e) {  
                    // 可加入异常重试机制
                    System.out.println("Processing failed for message with ID " + messageId + ", retrying...");  
                    continue;  
                }  
            }  
        }  
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    在上述示例代码中,我们使用了Spring的JdbcTemplate来访问数据库。首先,我们通过Kafka的Consumer API从Kafka中获取消息,然后检查该消息是否已经被处理过。如果消息已经被处理过,则跳过该消息的处理;否则,进行消息的处理,并将消息标记为已处理。在处理消息时,可以根据实际业务逻辑进行相应的处理操作。如果处理失败,则将该消息重新放入Kafka中,以便进行重试。

    6、总结

    该示例代码演示了如何使用Kafka Consumer API从Kafka中获取消息,并通过检查消息是否已经被处理过来实现消息的一致性和幂等性。如果消息已经被处理过,则跳过该消息的处理;否则,进行消息的处理,并将消息标记为已处理。在处理消息时,可以根据实际业务逻辑进行相应的处理操作。如果处理失败,则将该消息重新放入Kafka中,以便进行重试。这种方法确保了每个消息只被处理一次,从而避免了数据的不一致性。

  • 相关阅读:
    怎么做手机App测试?app测试详细流程和方法介绍(即学即用宝典)
    23.CF911G Mass Change Queries 动态开点权值线段树+线段树合并
    【C语言基础】分享近期学习到的volatile关键字、__NOP__()函数以及# #if 1 #endif
    SaaSBase:什么是SAP(思爱普) ERP?
    Open3D 生成空间圆点云
    拓展培训开场白集锦
    北京东物流,南顺丰速运
    基于JAVA视频点播系统设计与实现 开题报告
    单目标追踪——【评测工具】Ubuntu20.04的Python版本的VOT评测工具
    风控策略的上线效果评估与调优
  • 原文地址:https://blog.csdn.net/a1774381324/article/details/132700972