• Flink(七)【输出算子(Sink)】


    前言

            今天是我写博客的第 200 篇,恍惚间两年过去了,现在已经是大三的学长了。仍然记得两年前第一次写博客的时候,当时学的应该是 Java 语言,菜的一批,写了就删,怕被人看到丢脸。当时就想着自己一年之后,两年之后能学到什么水平,什么是 JDBC、什么是 MVC、SSM,在当时都是特别好奇的东西,不过都在后来的学习中慢慢接触到,并且好多已经烂熟于心了。

            那,今天我在畅想一下,一年后的今天,我又学到了什么水平?能否达到三花聚顶、草木山石皆可为码的超凡入圣的境界?拿没拿到心仪的 offer?和那个心动过的女孩相处怎么样了?哈哈哈哈哈


    输出算子(Sink)

    学完了 Flink 在不同执行环境(本地测试环境和集群环境)下的多种读取(多种数据源)和转换操作(多种转换算子),最后就是输出操作了。

    1、连接到外部系统

    Flink 1.12 之前,Sink 算子是通过调用 DataStream 的 addSink 方法来实现的:

    stream.addSink(new SinkFunction(...));

    从 Flink 1.12 开始,Flink 重构了 Sink 架构:

    stream.sinkTo(...)

    查看 Flink 支持的连接器

    需要我们自己导入依赖,比如上面的 Kfaka 和 DataGen 我们之前使用的时候都导入过相关依赖,需要知道,有的是只支持source,有的只支持sink,有的全都支持。

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-kafkaartifactId>
    4. <version>${flink.version}version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.apache.flinkgroupId>
    8. <artifactId>flink-connector-datagenartifactId>
    9. <version>${flink.version}version>
    10. dependency>

    2、输出到文件

            Flink 专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。
            它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。
            FileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用 FileSink 的静态方法:

    • 行编码:FileSink.forRowFormat(basePath,rowEncoder)。
    • 批量编码:FileSink.forBulkFormat(basePath,bulkWriterFactory)。

    在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)。

    1. package com.lyh.sink;
    2. import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    3. import org.apache.flink.api.common.serialization.SimpleStringEncoder;
    4. import org.apache.flink.api.common.typeinfo.TypeInformation;
    5. import org.apache.flink.api.common.typeinfo.Types;
    6. import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
    7. import org.apache.flink.configuration.MemorySize;
    8. import org.apache.flink.connector.datagen.source.DataGeneratorSource;
    9. import org.apache.flink.connector.datagen.source.GeneratorFunction;
    10. import org.apache.flink.connector.file.sink.FileSink;
    11. import org.apache.flink.core.fs.Path;
    12. import org.apache.flink.streaming.api.CheckpointingMode;
    13. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    14. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    15. import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
    16. import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
    17. import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
    18. import java.time.Duration;
    19. import java.time.ZoneId;
    20. /**
    21. * @author 刘xx
    22. * @version 1.0
    23. * @date 2023-11-18 9:51
    24. */
    25. public class SinkFile {
    26. public static void main(String[] args) throws Exception {
    27. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    28. env.setParallelism(2);
    29. // 必须开启 检查点 不然一直都是 .inprogress
    30. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    31. DataGeneratorSource dataGeneratorSource = new DataGeneratorSource(
    32. new GeneratorFunction() {
    33. @Override
    34. public String map(Long value) throws Exception {
    35. return "Number:"+value;
    36. }
    37. },
    38. Long.MAX_VALUE,
    39. RateLimiterStrategy.perSecond(10), // 每s 10条
    40. Types.STRING
    41. );
    42. DataStreamSource dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generate");
    43. // todo 输出到文件系统
    44. FileSink fileSink = FileSink.
    45. // 泛型方法 需要和输出结果的泛型保持一致
    46. forRowFormat(
    47. new Path("D:/Desktop"), // 指定输出路径 可以是 hdfs:// 路径
    48. new SimpleStringEncoder<>("UTF-8")) // 指定编码
    49. .withOutputFileConfig(OutputFileConfig.builder()
    50. .withPartPrefix("lyh")
    51. .withPartSuffix(".log")
    52. .build())
    53. // 按照目录分桶 一个小时一个目录(这里的时间格式别改为分钟 会报错: flink Relative path in absolute URI:)
    54. .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
    55. // 设置文件滚动策略-时间或者大小 10s 或 1KB 或 5min内没有新数据写入 滚动一次
    56. // 滚动的时候 文件就会更名为我们设定的格式(前缀)不再写入
    57. .withRollingPolicy(
    58. DefaultRollingPolicy.builder()
    59. .withRolloverInterval(Duration.ofSeconds(10L)) // 10s
    60. .withMaxPartSize(new MemorySize(1024)) // 1KB
    61. .withInactivityInterval(Duration.ofMinutes(5)) // 5min
    62. .build()
    63. )
    64. .build();
    65. dataGen.sinkTo(fileSink);
    66. env.execute();
    67. }
    68. }

    这里我们创建了一个简单的文件 Sink,通过.withRollingPolicy()方法指定了一个“滚动策略”。“滚动”的概念在日志文件的写入中经常遇到:因为文件会有内容持续不断地写入,所以我们应该给一个标准,到什么时候就开启新的文件,将之前的内容归档保存。也就是说,上面的代码设置了在以下 3 种情况下,我们就会滚动分区文件:
    ⚫ 至少包含 10 秒的数据
    ⚫ 最近 5 分钟没有收到新的数据
    ⚫ 文件大小已达到 1 KB

    通过 withOutputFileConfig()方法指定了输出的文件名前缀和后缀。

    需要特别注意的就是一定要开启检查点,否则我们的数据一直都是正在写入的状态(具体原因后面学习到检查点的时候会详细说)。

    运行结果:

    3、输出到 Kafka

    1. 需要添加 Kafka 依赖(之前导入过了)
    2. 启动 Kafka
    3. 编写示例代码
    1. package com.lyh.sink;
    2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
    3. import org.apache.flink.connector.base.DeliveryGuarantee;
    4. import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
    5. import org.apache.flink.connector.kafka.sink.KafkaSink;
    6. import org.apache.flink.streaming.api.CheckpointingMode;
    7. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    8. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    9. import org.apache.kafka.clients.producer.ProducerConfig;
    10. /**
    11. * @author 刘xx
    12. * @version 1.0
    13. * @date 2023-11-18 11:20
    14. */
    15. public class SinkKafka {
    16. public static void main(String[] args) throws Exception {
    17. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    18. env.setParallelism(1);
    19. // 如果是 精准一次 必须开启 checkpoint
    20. env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    21. SingleOutputStreamOperator sensorDS = env.socketTextStream("localhost", 9999);
    22. KafkaSink kafkaSink = KafkaSink.builder()
    23. // 指定 kafka 的地址和端口
    24. .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
    25. // 指定序列化器 我们是发送方 所以我们是生产者
    26. .setRecordSerializer(
    27. KafkaRecordSerializationSchema.builder()
    28. .setTopic("like")
    29. .setValueSerializationSchema(new SimpleStringSchema())
    30. .build()
    31. )
    32. // 写到 kafka 的一致性级别: 精准一次 / 至少一次
    33. // 如果是精准一次
    34. // 1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
    35. // 2.必须设置事务的前缀
    36. // 3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟
    37. .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
    38. .setTransactionalIdPrefix("lyh-")
    39. .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,10*60*1000+"")
    40. .build();
    41. sensorDS.sinkTo(kafkaSink);
    42. env.execute();
    43. }
    44. }

    启动 kafka 并开启一个消费者:

    kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic like
    

    运行结果:

    需要特别注意的三点:

    如果是精准一次
     1.必须开启检查点 env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
     2.必须设置事务的前缀
     3.必须设置事务的超时时间: 大于 checkpoint间隔 小于 max 15分钟

    自定义序列化器

    我们上面用的自带的序列化器,但是如果我们有 key 的话,就需要自定义序列化器了,替换上面的代码:

    1. .setRecordSerializer(
    2. /**
    3. * 如果要指定写入 kafka 的key 就需要自定义序列化器
    4. * 实现一个接口 重写序列化方法
    5. * 指定key 转为 bytes[]
    6. * 指定value 转为 bytes[]
    7. * 返回一个 ProducerRecord(topic名,key,value)对象
    8. */
    9. new KafkaRecordSerializationSchema() {
    10. @Nullable
    11. @Override
    12. // ProducerRecord 返回一个生产者消息,key,value 分别对应两个字节数组
    13. public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
    14. String[] datas = element.split(",");
    15. byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
    16. byte[] value = element.getBytes(StandardCharsets.UTF_8);
    17. return new ProducerRecord<>("like",key,value);
    18. }
    19. }
    20. )

    运行结果: 

    4、输出到 MySQL

    添加依赖(1.17版本的依赖需要指定仓库才能找到,因为阿里云和默认的maven仓库是没有的):

    1. <dependency>
    2. <groupId>org.apache.flinkgroupId>
    3. <artifactId>flink-connector-jdbcartifactId>
    4. <version>1.17-SNAPSHOTversion>
    5. dependency>
    6. <dependency>
    7. <groupId>mysqlgroupId>
    8. <artifactId>mysql-connector-javaartifactId>
    9. <version>8.0.31version>
    10. dependency>
    11. ....
    12. <repositories>
    13. <repository>
    14. <id>apache-snapshotsid>
    15. <name>apache snapshotsname>
    16. <url>https://repository.apache.org/content/repositories/snapshots/url>
    17. repository>
    18. repositories>

    创建表格 

    编写代码,将输入的数据行分隔为对象参数,每行数据生成一个对象进行处理。 

    1. package com.lyh.sink;
    2. import com.lyh.bean.WaterSensor;
    3. import function.WaterSensorFunction;
    4. import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
    5. import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
    6. import org.apache.flink.connector.jdbc.JdbcSink;
    7. import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
    8. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    9. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    10. import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    11. import java.sql.PreparedStatement;
    12. import java.sql.SQLException;
    13. /**
    14. * @author 刘xx
    15. * @version 1.0
    16. * @date 2023-11-18 12:32
    17. */
    18. public class SinkMySQL {
    19. public static void main(String[] args) throws Exception {
    20. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    21. env.setParallelism(1);
    22. SingleOutputStreamOperator sensorDS = env.socketTextStream("localhost", 9999)
    23. .map(new WaterSensorFunction()); //输入进来的数据自动转为 WaterSensor类型
    24. /**
    25. * todo 写入 mysql
    26. * 1.这里需要用旧的sink写法:addSink
    27. * 2.JDBC的4个参数
    28. * (1) 执行的sql语句
    29. * (2) 对占位符进行填充
    30. * (3) 执行选项 -> 攒批,重试
    31. * (4) 连接选项 -> driver,username,password,url
    32. */
    33. SinkFunction jdbcSink = JdbcSink.sink("insert into flink.ws values(?,?,?)",
    34. // 指定 sql 中占位符的值
    35. new JdbcStatementBuilder() {
    36. @Override
    37. public void accept(PreparedStatement stmt, WaterSensor sensor) throws SQLException {
    38. // 占位符从 1 开始
    39. stmt.setString(1, sensor.getId());
    40. stmt.setLong(2, sensor.getTs());
    41. stmt.setInt(3, sensor.getVc());
    42. }
    43. }, JdbcExecutionOptions.builder()
    44. .withMaxRetries(3) //最多重试3次(不包括第一次,共4次)
    45. .withBatchSize(100) //每收集100条记录进行一次写入
    46. .withBatchIntervalMs(3000) // 批次3s(即使没有达到100条记录,只要过了3s JDBCSink也会进行记录的写入),这有助于确保数据及时写入,而不是无限期地等待批处理大小达到。
    47. .build()
    48. , new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
    49. .withUrl("jdbc:mysql://localhost:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
    50. .withDriverName("com.mysql.cj.jdbc.Driver")
    51. .withUsername("root")
    52. .withPassword("Yan1029.")
    53. // mysql 默认8小时不使用连接就主动断开连接
    54. .withConnectionCheckTimeoutSeconds(60) // 重试连接直接的间隔,上面我们设置最多重试3次,每次间隔60s
    55. .build()
    56. );
    57. sensorDS.addSink(jdbcSink);
    58. env.execute();
    59. }
    60. }

     查询结果:

    5、自定义 Sink 输出

    与 Source 类似,Flink 为我们提供了通用的 SinkFunction 接口和对应的 RichSinkDunction抽象类,只要实现它,通过简单地调用 DataStream 的.addSink()方法就可以自定义写入任何外部存储。

    这里我们自定义实现一个向 HBase 中插入数据的 Sink。

    注意:这里只是做一个简单的 Demo,下面的代码不难发现,我们只是对 nosq:student 表下的 info:name 进行了两次的覆盖。如果要实现复杂的处理功能,需要对数据类型进行定义,因为 HBase 的数据是按列存储的,所以对于复杂的 Hbase 表,我们难以通过 Java bean 来插入数据。而且,一般经常用的连接器,Flink 大部分已经提供了,开发中我们一般也很少自定义 Sink 输出。

    1. package com.lyh.sink;
    2. import com.lyh.utils.HBaseConnection;
    3. import org.apache.flink.configuration.Configuration;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    6. import org.apache.hadoop.hbase.TableName;
    7. import org.apache.hadoop.hbase.client.Connection;
    8. import org.apache.hadoop.hbase.client.Put;
    9. import org.apache.hadoop.hbase.client.Table;
    10. import java.nio.charset.StandardCharsets;
    11. /**
    12. * @author 刘xx
    13. * @version 1.0
    14. * @date 2023-11-18 15:59
    15. */
    16. public class SinkCustomHBase {
    17. public static void main(String[] args) throws Exception {
    18. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    19. env.setParallelism(1);
    20. env.fromElements("tom","bob").addSink(new RichSinkFunction() {
    21. public Connection con;
    22. @Override
    23. public void open(Configuration parameters) throws Exception {
    24. super.open(parameters);
    25. con = HBaseConnection.getConnection("hadoop102:2181");
    26. }
    27. @Override
    28. public void invoke(String value, Context context) throws Exception {
    29. super.invoke(value, context);
    30. Table table = con.getTable(TableName.valueOf("nosql","student"));
    31. Put put = new Put("1001".getBytes(StandardCharsets.UTF_8));
    32. put.addColumn("info".getBytes(StandardCharsets.UTF_8)
    33. ,"name".getBytes(StandardCharsets.UTF_8),
    34. value.getBytes(StandardCharsets.UTF_8));
    35. table.put(put);
    36. table.close();
    37. }
    38. @Override
    39. public void close() throws Exception {
    40. super.close();
    41. HBaseConnection.close();
    42. }
    43. });
    44. env.execute();
    45. }
    46. }

    这里用到一个简单的连接 HBase 的工具类:
     

    1. package com.lyh.utils;
    2. import org.apache.hadoop.conf.Configuration;
    3. import org.apache.hadoop.hbase.client.Connection;
    4. import org.apache.hadoop.hbase.client.ConnectionFactory;
    5. import java.io.IOException;
    6. /**
    7. * @author 刘xx
    8. * @version 1.0
    9. * @date 2023-11-18 16:04
    10. */
    11. public class HBaseConnection {
    12. private static Connection connection;
    13. public static Connection getConnection(String hosts) throws IOException {
    14. Configuration conf = new Configuration();
    15. conf.set("hbase.zookeeper.quorum", hosts);
    16. conf.setInt("hbase.rpc.timeout", 10000); // 设置最大超时 10 s
    17. connection = ConnectionFactory.createConnection(conf);
    18. return connection;
    19. }
    20. public static void close() throws IOException {
    21. if (connection!=null)
    22. connection.close();
    23. }
    24. }

     

  • 相关阅读:
    【附源码】Python计算机毕业设计民宿网站管理系统
    CentOS7 安装 NVIDIA Container Toolkit
    Vue.js快速入门之七:系统权限管理
    IDEA Service窗口
    表情包APP小程序制作开发功能有哪些?
    OTA设计思路
    100天精通Python(可视化篇)——第107天:Pyecharts绘制多种炫酷旭日图参数说明+代码实战
    计算机网络协议------从入门到深化
    MySql 数据库【约束】
    服务器与Tomcat的区别
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/134474275