• 6.2、Flink数据写入到Kafka


    目录

    1、添加POM依赖

    2、API使用说明

    3、序列化器

    3.1 使用预定义的序列化器

    3.2 使用自定义的序列化器

    4、容错保证级别

    4.1 至少一次 的配置

    4.2 精确一次 的配置

    5、这是一个完整的入门案例


    1、添加POM依赖

    Apache Flink 集成了通用的 Kafka 连接器,使用时需要根据生产环境的版本引入相应的依赖

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-kafkaartifactId>
    4. <version>1.17.1version>
    5. dependency>

    2、API使用说明

    KafkaSink 可将数据流写入一个或多个 Kafka topic。

    官网链接:官网链接

    1. DataStream stream = ...;
    2. KafkaSink sink = KafkaSink.builder() // 泛型为 输入输入的类型
    3. // TODO 必填项:配置 kafka 的地址和端口
    4. .setBootstrapServers(brokers)
    5. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
    6. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    7. .setTopic("topic-name")
    8. .setValueSerializationSchema(new SimpleStringSchema())
    9. .build()
    10. )
    11. // TODO 必填项:配置容错保证级别 精准一次、至少一次、不做任何保证
    12. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    13. .build();
    14. stream.sinkTo(sink);

    3、序列化

    序列化器的作用是将flink数据转换成 kafka的ProducerRecord

    3.1 使用预定义的序列化器

    功能:将 DataStream 数据转换为 Kafka消息中的value,key为默认值null,timestamp为默认值

    1. // 初始化 KafkaSink 实例
    2. KafkaSink kafkaSink = KafkaSink.builder()
    3. // TODO 必填项:配置 kafka 的地址和端口
    4. .setBootstrapServers("worker01:9092")
    5. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
    6. .setRecordSerializer(
    7. KafkaRecordSerializationSchema.builder()
    8. .setTopic("20230912")
    9. .setValueSerializationSchema(new SimpleStringSchema())
    10. .build()
    11. )
    12. .build();

    3.2 使用自定义的序列化器

    功能:可以对 kafka消息的key、value、partition、timestamp进行赋值

    1. /**
    2. * 如果要指定写入kafka的key,可以自定义序列化器:
    3. * 1、实现 一个接口,重写 序列化 方法
    4. * 2、指定key,转成 字节数组
    5. * 3、指定value,转成 字节数组
    6. * 4、返回一个 ProducerRecord对象,把key、value放进去
    7. */
    8. // 初始化 KafkaSink 实例 (自定义 KafkaRecordSerializationSchema 实例)
    9. KafkaSink kafkaSink = KafkaSink.builder()
    10. // TODO 必填项:配置 kafka 的地址和端口
    11. .setBootstrapServers("worker01:9092")
    12. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
    13. .setRecordSerializer(
    14. new KafkaRecordSerializationSchema() {
    15. @Nullable
    16. @Override
    17. public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
    18. String[] datas = element.split(",");
    19. byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
    20. byte[] value = element.getBytes(StandardCharsets.UTF_8);
    21. Long currTimestamp = System.currentTimeMillis();
    22. Integer partition = 0;
    23. return new ProducerRecord<>("20230913", partition, currTimestamp, key, value);
    24. }
    25. }
    26. )
    27. .build();

    4、容错保证级别

    KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee

    • DeliveryGuarantee.NONE   不提供任何保证
      • 消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复
    • DeliveryGuarantee.AT_LEAST_ONCE  至少一次
      • sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。
      • 消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
    • DeliveryGuarantee.EXACTLY_ONCE 精确一次
      • 该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。
      • 因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。
      • 然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。
      • 请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务 不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于 checkpoint 最大间隔 + 最大重启时间,否则 Kafka 对未提交事务的过期处理会导致数据丢失。

    4.1 至少一次 的配置

    1. DataStream stream = ...;
    2. // 初始化 KafkaSink 实例
    3. KafkaSink kafkaSink = KafkaSink.builder()
    4. // TODO 必填项:配置 kafka 的地址和端口
    5. .setBootstrapServers("worker01:9092")
    6. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
    7. .setRecordSerializer(
    8. KafkaRecordSerializationSchema.builder()
    9. .setTopic("20230912")
    10. .setValueSerializationSchema(new SimpleStringSchema())
    11. .build()
    12. )
    13. // TODO 必填项:配置容灾保证级别设置为 至少一次
    14. .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
    15. .build();
    16. stream.sinkTo(sink);

    4.2 精确一次 的配置

    1. // 如果是精准一次,必须开启checkpoint
    2. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    3. DataStream stream = ...;
    4. KafkaSink sink = KafkaSink.builder() // 泛型为 输入输入的类型
    5. // TODO 必填项:配置 kafka 的地址和端口
    6. .setBootstrapServers(brokers)
    7. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
    8. .setRecordSerializer(KafkaRecordSerializationSchema.builder()
    9. .setTopic("topic-name")
    10. .setValueSerializationSchema(new SimpleStringSchema())
    11. .build()
    12. )
    13. // TODO 必填项:配置容灾保证级别设置为 精准一次
    14. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    15. // 如果是精准一次,必须设置 事务的前缀
    16. .setTransactionalIdPrefix("flink-")
    17. // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
    18. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
    19. .build();
    20. stream.sinkTo(sink);

    5、这是一个完整的入门案例

    需求:Flink实时读取 socket数据源,将读取到的数据写入到Kafka (要保证不丢失,不重复)

    开发语言:java1.8

    flink版本:flink1.17.0

    1. package com.baidu.datastream.sink;
    2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    3. import org.apache.flink.connector.base.DeliveryGuarantee;
    4. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    5. import org.apache.flink.connector.kafka.sink.KafkaSink;
    6. import org.apache.flink.streaming.api.CheckpointingMode;
    7. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import org.apache.kafka.clients.producer.ProducerConfig;
    10. // TODO flink 数据输出到kafka
    11. public class SinkKafka {
    12. public static void main(String[] args) throws Exception {
    13. // 1.获取执行环境
    14. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    15. env.setParallelism(2);
    16. // 如果是精准一次,必须开启checkpoint
    17. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    18. // 2.指定数据源
    19. DataStreamSource streamSource = env.socketTextStream("localhost", 9999);
    20. // 3.初始化 KafkaSink 实例
    21. KafkaSink kafkaSink = KafkaSink.builder()
    22. // TODO 必填项:配置 kafka 的地址和端口
    23. .setBootstrapServers("worker01:9092")
    24. // TODO 必填项:配置消息序列化器信息 Topic名称、消息序列化器类型
    25. .setRecordSerializer(
    26. KafkaRecordSerializationSchema.builder()
    27. .setTopic("20230912")
    28. .setValueSerializationSchema(new SimpleStringSchema())
    29. .build()
    30. )
    31. // TODO 必填项:配置容灾保证级别设置为 精准一次
    32. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    33. // 如果是精准一次,必须设置 事务的前缀
    34. .setTransactionalIdPrefix("flink-")
    35. // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
    36. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "6000")
    37. .build();
    38. streamSource.sinkTo(kafkaSink);
    39. // 3.触发程序执行
    40. env.execute();
    41. }
    42. }

  • 相关阅读:
    Java-GUI-AWT-组件-TextComponent类
    【SCAU数据挖掘】数据挖掘期末总复习题库简答题及解析——上
    力扣(LeetCode)2652. 倍数求和(C++)
    C++继承机制学习笔记
    数仓工具—Hive源码之SQL 解析(6)
    固体物理期末3套试题
    数字逻辑实践6-> 从数字逻辑到计算机组成 | 逻辑元件总结与注意事项
    【开发】视频集中存储/直播点播平台EasyDSS点播文件分类功能优化
    高忆管理:证券账户国债逆回购怎么操作?
    数据结构-双向链表
  • 原文地址:https://blog.csdn.net/weixin_42845827/article/details/132854512