• Flink和Kafka连接时的精确一次保证


    Flink写入Kafka两阶段提交

    端到端的 exactly-once(精准一次)

    kafka -> Flink -> kafka

    1)输入端

    输入数据源端的 Kafka 可以对数据进行持久化保存,并可以重置偏移量(offset)

    2)Flink内部

    Flink 内部可以通过检查点机制保证状态和处理结果的 exactly-once 语义

    3)输出端

    两阶段提交(2PC)

    写入 Kafka 的过程实际上是一个两段式的提交:处理完毕得到结果,写入 Kafka 时是基于事务的“预提交”;等到检查点保存完毕,才会提交事务进行“正式提交”

    如果中间出现故障,事务进行回滚,预提交就会被放弃;恢复状态之后,也只能恢复所有已经确认提交的操作。

    必须的配置

    1)必须启用检查点

    2)指定 KafkaSink 的发送级别为 DeliveryGuarantee.EXACTLY_ONCE

    3)配置 Kafka 读取数据的消费者的隔离级别【默认kafka消费者隔离级别是读未提交,2PC第一阶段预提交数据也会被读到,下游消费者需要设置为读已提交

    4)事务超时配置

    【配置的事务超时时间 transaction.timeout.ms 默认是1小时,而Kafka 集群配置的事务最大超时时间 transaction.max.timeout.ms 默认是15 分钟。在检查点保存时间很长时,有可能出现 Kafka 已经认为事务超时了,丢弃了预提交的数据;而Sink任务认为还可以继续等待。如果接下来检查点保存成功,发生故障后回滚到这个检查点的状态,这部分数据就被真正丢掉了。因此checkpoint 间隔 < 事务超时时间 < max的15分钟

    代码实战

    kafka -> Flink -> kafka【Flink处理kafka来源数据再输出到kafka】

    public class KafkaEOSDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 【1】、启用检查点,设置为精准一次
            env.enableCheckpointing(5000, CheckpointingMode.EXACTLY_ONCE);
            CheckpointConfig checkpointConfig = env.getCheckpointConfig();
            checkpointConfig.setCheckpointStorage("hdfs://hadoop102:8020/chk");
            checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
            // 2.读取 kafka
            KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers("hadoop102:9092")
                    .setGroupId("default")
                    .setTopics("topic_1")
                    .setValueOnlyDeserializer(new SimpleStringSchema())
                    .setStartingOffsets(OffsetsInitializer.latest())
                    .build();
            DataStreamSource<String> kafkasource = env
                    .fromSource(kafkaSource,
                            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource");
    
            /*
             3.写出到 Kafka
              精准一次 写入 Kafka,需要满足以下条件,【缺一不可】
              1、开启 checkpoint
              2、sink 设置保证级别为 精准一次
              3、sink 设置事务前缀
              4、sink 设置事务超时时间: checkpoint 间隔 < 事务超时时间 < max的15分钟
             */
            KafkaSink<String> kafkaSink = KafkaSink.<String>builder()// 指定 kafka 的地址和端口
                    .setBootstrapServers("hadoop102:9092")
                    // 指定序列化器:指定 Topic 名称、具体的序列化
                    .setRecordSerializer(
                            KafkaRecordSerializationSchema.<String>builder()
                                    .setTopic("ws")
                                    .setValueSerializationSchema(new SimpleStringSchema())
                                    .build()
                    )
                    // 【3.1】 精准一次,开启 2pc
                    .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    
                    // 【3.2】 精准一次,必须设置 事务的前缀
                    .setTransactionalIdPrefix("li-")
                    // 【3.3】 设置事务超时时间
                    .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000 + "")
                    .build();
            kafkasource.sinkTo(kafkaSink);
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    后续读取“ws”这个 topic 的消费者,要设置事务的隔离级别为“读已提交”

    public class KafkaEOSConsumer {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 消费 在前面使用【两阶段提交】写入的 Topic
            KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                    .setBootstrapServers("hadoop102:9092")
                    .setGroupId("default")
                    .setTopics("ws")
                    .setValueOnlyDeserializer(new SimpleStringSchema()).setStartingOffsets(OffsetsInitializer.latest())
                    // 作为 下游的消费者,要设置事务的隔离级别为 【读已提交】
                    .setProperty(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
                    .build();
            env
                    .fromSource(kafkaSource,
                            WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource")
                    .print();
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    处理程序以及消费程序如上设置才能真正实现端到端精准一次的保证。

  • 相关阅读:
    制作404页面的注意事项
    机器学习常见数据格式转换(xml_to_csv,csv_to_tfrecord)
    UWB高精度定位系统:引领精准定位技术的新纪元
    蓝桥杯DP算法——区间DP(C++)
    浅析网络编程
    面向对象设计模式之工厂方法模式
    Java基础 | 如何用Javadoc Tool写规范正确的java注释
    Ubuntu安装AndroidStudio
    一款兼容Win和Mac的iOS设备管理软件iMazing 3 for Windows新功能介绍
    学习鱼眼相机的总结
  • 原文地址:https://blog.csdn.net/qq_43417581/article/details/134432009