• Flink Data Sink


    本专栏案例代码和数据集链接:   https://download.csdn.net/download/shangjg03/88477960

    1. Data Sinks

    在使用 Flink 进行数据处理时,数据经 Data Source 流入,然后通过系列 Transformations 的转化,最终可以通过 Sink 将计算结果进行输出,Flink Data Sinks 就是用于定义数据流最终的输出位置。Flink 提供了几个较为简单的 Sink API 用于日常的开发,具体如下:

    1.1 writeAsText

    `writeAsText` 用于将计算结果以文本的方式并行地写入到指定文件夹下,除了路径参数是必选外,该方法还可以通过指定第二个参数来定义输出模式,它有以下两个可选值:

    WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作;

    WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖。

    使用示例如下:

     streamSource.writeAsText("D:\\out"FileSystem.WriteMode.OVERWRITE);

    以上写出是以并行的方式写出到多个文件,如果想要将输出结果全部写出到一个文件,需要设置其并行度为 1:

    streamSource.writeAsText("D:\\out", FileSystem.WriteMode.OVERWRITE).setParallelism(1);

    1.2 writeAsCsv

    `writeAsCsv` 用于将计算结果以 CSV 的文件格式写出到指定目录,除了路径参数是必选外,该方法还支持传入输出模式,行分隔符,和字段分隔符三个额外的参数,其方法定义如下:

    writeAsCsv(String path, WriteMode writeMode, String rowDelimiter, String fieldDelimiter) 
    
    

    1.3 print /printToErr

    `print / printToErr` 是测试当中最常用的方式,用于将计算结果以标准输出流或错误输出流的方式打印到控制台上。

    1.4 writeUsingOutputFormat

    采用自定义的输出格式将计算结果写出,上面介绍的 `writeAsText` 和 `writeAsCsv` 其底层调用的都是该方法,源码如下:

    1. public DataStreamSink<T> writeAsText(String path, WriteMode writeMode) {
    2. TextOutputFormat<T> tof = new TextOutputFormat<>(new Path(path));
    3.     tof.setWriteMode(writeMode);
    4. return writeUsingOutputFormat(tof);
    5. }

    1.5 writeToSocket

    `writeToSocket` 用于将计算结果以指定的格式写出到 Socket 中,使用示例如下:

    streamSource.writeToSocket("192.168.0.226", 9999, new SimpleStringSchema());
    
    

    2. Streaming Connectors

    除了上述 API 外,Flink 中还内置了系列的 Connectors 连接器,用于将计算结果输入到常用的存储系统或者消息中间件中,具体如下:

    - Apache Kafka (支持 source 和 sink)

    - Apache Cassandra (sink)

    - Amazon Kinesis Streams (source/sink)

    - Elasticsearch (sink)

    - Hadoop FileSystem (sink)

    - RabbitMQ (source/sink)

    - Apache NiFi (source/sink)

    - Google PubSub (source/sink)

    除了内置的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink Sink 相关的连接器如下:

    - Apache ActiveMQ (source/sink)

    - Apache Flume (sink)

    - Redis (sink)

    - Akka (sink)

    这里接着在 Data Sources 章节介绍的整合 Kafka Source 的基础上,将 Kafka Sink 也一并进行整合,具体步骤如下。

    3.整合 Kafka Sink

    3.1 addSink

    Flink 提供了 addSink 方法用来调用自定义的 Sink 或者第三方的连接器,想要将计算结果写出到 Kafka,需要使用该方法来调用 Kafka 的生产者 FlinkKafkaProducer,具体代码如下:

    1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. // 1.指定Kafka的相关配置属性
    3. Properties properties = new Properties();
    4. properties.setProperty("bootstrap.servers", "192.168.200.0:9092");
    5. // 2.接收Kafka上的数据
    6. DataStream<String> stream = env
    7. .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
    8. // 3.定义计算结果到 Kafka ProducerRecord 的转换
    9. KafkaSerializationSchema<String> kafkaSerializationSchema = new KafkaSerializationSchema<String>() {
    10.     @Override
    11. public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
    12. return new ProducerRecord<>("flink-stream-out-topic", element.getBytes());
    13. }
    14. };
    15. // 4. 定义Flink Kafka生产者
    16. FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("flink-stream-out-topic",
    17.                                                                     kafkaSerializationSchema,
    18.                                                                     properties,
    19. FlinkKafkaProducer.Semantic.AT_LEAST_ONCE, 5);
    20. // 5. 将接收到输入元素*2后写出到Kafka
    21. stream.map((MapFunction<String, String>value -> value + value).addSink(kafkaProducer);
    22. env.execute("Flink Streaming");

    3.2 创建输出主题

    创建用于输出测试的主题:

    1. bin/kafka-topics.sh --create \
    2. --bootstrap-server master:9092 \
    3. --replication-factor 1 \
    4. --partitions 1  \
    5. --topic flink-stream-out-topic
    6. # 查看所有主题
    7.  bin/kafka-topics.sh --list --bootstrap-server master:9092

    3.3 启动消费者

    启动一个 Kafka 消费者,用于查看 Flink 程序的输出情况:

    bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic flink-stream-out-topic

    3.4 测试结果

    在 Kafka 生产者上发送消息到 Flink 程序,观察 Flink 程序转换后的输出情况,具体如下:

    可以看到 Kafka 生成者发出的数据已经被 Flink 程序正常接收到,并经过转换后又输出到 Kafka 对应的 Topic 上。

    4. 自定义 Sink

    除了使用内置的第三方连接器外,Flink 还支持使用自定义的 Sink 来满足多样化的输出需求。想要实现自定义的 Sink ,需要直接或者间接实现 SinkFunction 接口。通常情况下,我们都是实现其抽象类 RichSinkFunction,相比于 SinkFunction ,其提供了更多的与生命周期相关的方法。两者间的关系如下:

    这里我们以自定义一个 FlinkToMySQLSink 为例,将计算结果写出到 MySQL 数据库中,具体步骤如下:

    4.1 导入依赖

    首先需要导入 MySQL 相关的依赖:

    1. <dependency>
    2. <groupId>mysql</groupId>
    3. <artifactId>mysql-connector-java</artifactId>
    4. <version>8.0.16</version>
    5. </dependency>

    4.2 自定义 Sink

    继承自 RichSinkFunction,实现自定义的 Sink :

    1. public class FlinkToMySQLSink extends RichSinkFunction<Employee> {
    2. private PreparedStatement stmt;
    3. private Connection conn;
    4.     @Override
    5. public void open(Configuration parameters) throws Exception {
    6. Class.forName("com.mysql.cj.jdbc.Driver");
    7.         conn = DriverManager.getConnection("jdbc:mysql://192.168.0.229:3306/employees" +
    8. "?characterEncoding=UTF-8&serverTimezone=UTC&useSSL=false",
    9. "root",
    10. "123456");
    11. String sql = "insert into emp(name, age, birthday) values(?, ?, ?)";
    12.         stmt = conn.prepareStatement(sql);
    13. }
    14.     @Override
    15. public void invoke(Employee value, Context context) throws Exception {
    16.         stmt.setString(1value.getName());
    17.         stmt.setInt(2value.getAge());
    18.         stmt.setDate(3value.getBirthday());
    19.         stmt.executeUpdate();
    20. }
    21.     @Override
    22. public void close() throws Exception {
    23. super.close();
    24. if (stmt != null) {
    25.             stmt.close();
    26. }
    27. if (conn != null) {
    28.             conn.close();
    29. }
    30. }
    31. }

    4.3 使用自定义 Sink

    想要使用自定义的 Sink,同样是需要调用 addSink 方法,具体如下:

    1. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    2. Date date = new Date(System.currentTimeMillis());
    3. DataStreamSource<Employee> streamSource = env.fromElements(
    4. new Employee("hei", 10date),
    5. new Employee("bai", 20date),
    6. new Employee("ying", 30date));
    7. streamSource.addSink(new FlinkToMySQLSink());
    8. env.execute();

    4.4 测试结果

    启动程序,观察数据库写入情况:

    数据库成功写入,代表自定义 Sink 整合成功。

  • 相关阅读:
    前端进击笔记第八节 一个网络请求是怎么进行的?
    通信基石Socket结合OOP实现程序间的通信
    【甄选靶场】Vulnhub百个项目渗透——45、项目四十五:bulldog-1(waf绕过)
    CF1710C-XOR Triangle【dp】
    【Visual Leak Detector】源码调试 VLD 库
    【无标题】django中logging配置
    大模型引发“暴力计算”,巨头加速推进液冷“降温”
    2-37.1 EmpProject综合案例
    FFmpeg 中 -f 命令参数详解
    c++ openssl实现https
  • 原文地址:https://blog.csdn.net/shangjg03/article/details/134071293