• Kafka 数据重复怎么办?(案例)


    一、前言

    数据重复这个问题其实也是挺正常,全链路都有可能会导致数据重复。

    通常,消息消费时候都会设置一定重试次数来避免网络波动造成的影响,同时带来副作用是可能出现消息重复。

    整理下消息重复的几个场景:

    1. 生产端: 遇到异常,基本解决措施都是 重试
      • 场景一:leader分区不可用了,抛 LeaderNotAvailableException 异常,等待选出新 leader 分区。
      • 场景二:Controller 所在 Broker 挂了,抛 NotControllerException 异常,等待 Controller 重新选举。
      • 场景三:网络异常、断网、网络分区、丢包等,抛 NetworkException 异常,等待网络恢复。
    2. 消费端: poll 一批数据,处理完毕还没提交 offset ,机子宕机重启了,又会 poll 上批数据,再度消费就造成了消息重复。

    怎么解决?

    先来了解下消息的三种投递语义:

    • 最多一次(at most once): 消息只发一次,消息可能会丢失,但绝不会被重复发送。例如: mqttQoS = 0
    • 至少一次(at least once): 消息至少发一次,消息不会丢失,但有可能被重复发送。例如: mqttQoS = 1
    • 精确一次(exactly once): 消息精确发一次,消息不会丢失,也不会被重复发送。例如: mqttQoS = 2

    了解了这三种语义,再来看如何解决消息重复,即如何实现精准一次,可分为三种方法:

    1. Kafka 幂等性 Producer 保证生产端发送消息幂等。局限性,是只能保证单分区且单会话(重启后就算新会话)
    2. Kafka 事务: 保证生产端发送消息幂等。解决幂等 Producer 的局限性。
    3. **消费端幂等: ** 保证消费端接收消息幂等。蔸底方案。

    1)Kafka 幂等性 Producer

    **幂等性指:**无论执行多少次同样的运算,结果都是相同的。即一条命令,任意多次执行所产生的影响均与一次执行的影响相同。

    幂等性使用示例:在生产端添加对应配置即可

    1. Properties props = new Properties();
    2. props.put("enable.idempotence", ture); // 1. 设置幂等
    3. props.put("acks", "all"); // 2. 当 enable.idempotence 为 true,这里默认为 all
    4. props.put("max.in.flight.requests.per.connection", 5); // 3. 注意
    5. 复制代码
    1. 设置幂等,启动幂等。
    2. 配置 acks,注意:一定要设置 acks=all,否则会抛异常。
    3. 配置 max.in.flight.requests.per.connection 需要 <= 5,否则会抛异常 OutOfOrderSequenceException
      • 0.11 >= Kafka < 1.1, max.in.flight.request.per.connection = 1
      • Kafka >= 1.1, max.in.flight.request.per.connection <= 5

    为了更好理解,需要了解下 Kafka 幂等机制:

    1. Producer 每次启动后,会向 Broker 申请一个全局唯一的 pid。(重启后 pid 会变化,这也是弊端之一)
    2. Sequence Numbe:针对每个 都对应一个从0开始单调递增的 Sequence,同时 Broker端会缓存这个 seq num
    3. 判断是否重复:Broker 里对应的队列 ProducerStateEntry.Queue(默认队列长度为 5)查询是否存在
      • 如果 nextSeq == lastSeq + 1,即 服务端seq + 1 == 生产传入seq,则接收。
      • 如果 nextSeq == 0 && lastSeq == Int.MaxValue,即刚初始化,也接收。
      • 反之,要么重复,要么丢消息,均拒绝。

    这种设计针对解决了两个问题:

    1. 消息重复: 场景 Broker 保存消息后还没发送 ack 就宕机了,这时候 Producer 就会重试,这就造成消息重复。
    2. 消息乱序: 避免场景,前一条消息发送失败而其后一条发送成功,前一条消息重试后成功,造成的消息乱序。

    那什么时候该使用幂等:

    1. 如果已经使用 acks=all,使用幂等也可以。
    2. 如果已经使用 acks=0 或者 acks=1,说明你的系统追求高性能,对数据一致性要求不高。不要使用幂等。

    2)Kafka 事务

    使用 Kafka 事务解决幂等的弊端:单会话且单分区幂等。

    Tips 这块篇幅较长,这先稍微提及下使用,之后另起一篇。

    事务使用示例:分为生产端 和 消费端

    1. Properties props = new Properties();
    2. props.put("enable.idempotence", ture); // 1. 设置幂等
    3. props.put("acks", "all"); // 2. 当 enable.idempotence 为 true,这里默认为 all
    4. props.put("max.in.flight.requests.per.connection", 5); // 3. 最大等待数
    5. props.put("transactional.id", "my-transactional-id"); // 4. 设定事务 id
    6. Producer<String, String> producer = new KafkaProducer<String, String>(props);
    7. // 初始化事务
    8. producer.initTransactions();
    9. try{
    10. // 开始事务
    11. producer.beginTransaction();
    12. // 发送数据
    13. producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value"));
    14. // 数据发送及 Offset 发送均成功的情况下,提交事务
    15. producer.commitTransaction();
    16. } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    17. // 数据发送或者 Offset 发送出现异常时,终止事务
    18. producer.abortTransaction();
    19. } finally {
    20. // 关闭 Producer 和 Consumer
    21. producer.close();
    22. consumer.close();
    23. }
    24. 复制代码

    这里消费端 Consumer 需要设置下配置:isolation.level 参数

    • read_uncommitted 这是默认值,表明 Consumer 能够读取到 Kafka 写入的任何消息,不论事务型 Producer 提交事务还是终止事务,其写入的消息都可以读取。如果你用了事务型 Producer,那么对应的 Consumer 就不要使用这个值。

    • read_committed 表明 Consumer 只会读取事务型 Producer 成功提交事务写入的消息。当然了,它也能看到非事务型 Producer 写入的所有消息。

    3)消费端幂等

    “如何解决消息重复?” 这个问题,其实换一种说法:就是如何解决消费端幂等性问题。

    只要消费端具备了幂等性,那么重复消费消息的问题也就解决了。

    典型的方案是使用:消息表,来去重:

    • 上述栗子中,消费端拉取到一条消息后,开启事务,将消息Id 新增到本地消息表中,同时更新订单信息。
    • 如果消息重复,则新增操作 insert 会异常,同时触发事务回滚。


     

    二、案例:Kafka 幂等性 Producer 使用

    环境搭建可参考:链接

    准备工作如下:

    1. Zookeeper:本地使用 Docker 启动

      1. $ docker run -d --name zookeeper -p 2181:2181 zookeeper
      2. a86dff3689b68f6af7eb3da5a21c2dba06e9623f3c961154a8bbbe3e9991dea4
      3. 复制代码
    2. Kafka:版本 2.7.1,源码编译启动(看上文源码搭建启动)

    3. 启动生产者:Kafka 源码中 exmaple

    4. 启动消息者:可以用 Kafka 提供的脚本

      1. # 举个栗子:topic 需要自己去修改
      2. $ cd ./kafka-2.7.1-src/bin
      3. $ ./kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
      4. 复制代码

    创建 topic 1副本,2 分区

    1. $ ./kafka-topics.sh --bootstrap-server localhost:9092 --topic myTopic --create --replication-factor 1 --partitions 2
    2. # 查看
    3. $ ./kafka-topics.sh --bootstrap-server broker:9092 --topic myTopic --describe
    4. 复制代码

    生产者代码:

    1. public class KafkaProducerApplication {
    2. private final Producer<String, String> producer;
    3. final String outTopic;
    4. public KafkaProducerApplication(final Producer<String, String> producer,
    5. final String topic) {
    6. this.producer = producer;
    7. outTopic = topic;
    8. }
    9. public void produce(final String message) {
    10. final String[] parts = message.split("-");
    11. final String key, value;
    12. if (parts.length > 1) {
    13. key = parts[0];
    14. value = parts[1];
    15. } else {
    16. key = null;
    17. value = parts[0];
    18. }
    19. final ProducerRecord<String, String> producerRecord
    20. = new ProducerRecord<>(outTopic, key, value);
    21. producer.send(producerRecord,
    22. (recordMetadata, e) -> {
    23. if(e != null) {
    24. e.printStackTrace();
    25. } else {
    26. System.out.println("key/value " + key + "/" + value + "\twritten to topic[partition] " + recordMetadata.topic() + "[" + recordMetadata.partition() + "] at offset " + recordMetadata.offset());
    27. }
    28. }
    29. );
    30. }
    31. public void shutdown() {
    32. producer.close();
    33. }
    34. public static void main(String[] args) {
    35. final Properties props = new Properties();
    36. props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
    37. props.put(ProducerConfig.ACKS_CONFIG, "all");
    38. props.put(ProducerConfig.CLIENT_ID_CONFIG, "myApp");
    39. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    40. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    41. final String topic = "myTopic";
    42. final Producer<String, String> producer = new KafkaProducer<>(props);
    43. final KafkaProducerApplication producerApp = new KafkaProducerApplication(producer, topic);
    44. String filePath = "/home/donald/Documents/Code/Source/kafka-2.7.1-src/examples/src/main/java/kafka/examples/input.txt";
    45. try {
    46. List<String> linesToProduce = Files.readAllLines(Paths.get(filePath));
    47. linesToProduce.stream().filter(l -> !l.trim().isEmpty())
    48. .forEach(producerApp::produce);
    49. System.out.println("Offsets and timestamps committed in batch from " + filePath);
    50. } catch (IOException e) {
    51. System.err.printf("Error reading file %s due to %s %n", filePath, e);
    52. } finally {
    53. producerApp.shutdown();
    54. }
    55. }
    56. }
    57. 复制代码

    启动生产者后,控制台输出如下:

    启动消费者:

    1. $ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic
    2. 复制代码

    修改配置 acks

    启用幂等的情况下,调整 acks 配置,生产者启动后结果是怎样的:

    • 修改配置 acks = 1
    • 修改配置 acks = 0

    会直接报错:

    1. Exception in thread "main" org.apache.kafka.common.config.ConfigException: Must set acks to all in order to use the idempotent producer.
    2. Otherwise we cannot guarantee idempotence.
    3. 复制代码

    修改配置 max.in.flight.requests.per.connection

    启用幂等的情况下,调整此配置,结果是怎样的:

    • max.in.flight.requests.per.connection > 5 会怎样?

    当然会报错:

    1. Caused by: org.apache.kafka.common.config.ConfigException: Must set max.in.flight.requests.per.connection to at most 5 to use the idempotent producer.
    2. 复制代码

  • 相关阅读:
    2022 Java生态系统报告:Java 11超Java 8、Oracle在缩水、Amazon在崛起
    HTML5期末大作业:美食网页主题网站设计与实现——HTML+CSS+JavaScript月饼美食食品企业网站html模板9页面
    竞赛选题 深度学习+python+opencv实现动物识别 - 图像识别
    FPGA笔试
    PyTorch深度学习实战(18)——目标检测基础
    1999-2021地级市GDP及一二三产业GDP数据
    MySQL双主模式(2022/11/19)
    如何根据性能需求进行场景设计?
    Python3 + Appium + 安卓模拟器实现APP自动化测试并生成测试报告
    加密原生消费产品的未来:Web3 数字身份如何发挥实际作用
  • 原文地址:https://blog.csdn.net/BASK2312/article/details/128183328