• Flink Data Source


    一、内置 Data Source

    Flink Data Source 用于定义 Flink 程序的数据来源,Flink 官方提供了多种数据获取方法,用于帮助开发者简单快速地构建输入流,具体如下:

    1.1 基于文件构建

    1. readTextFile(path):按照 TextInputFormat 格式读取文本文件,并将其内容以字符串的形式返回。示例如下:

    env.readTextFile(filePath).print();
    
    • 1

    2. readFile(fileInputFormat, path) :按照指定格式读取文件。

    3. readFile(inputFormat, filePath, watchType, interval, typeInformation):按照指定格式周期性的读取文件。其中各个参数的含义如下:

    • inputFormat:数据流的输入格式。
    • filePath:文件路径,可以是本地文件系统上的路径,也可以是 HDFS 上的文件路径。
    • watchType:读取方式,它有两个可选值,分别是 FileProcessingMode.PROCESS_ONCEFileProcessingMode.PROCESS_CONTINUOUSLY:前者表示对指定路径上的数据只读取一次,然后退出;后者表示对路径进行定期地扫描和读取。需要注意的是如果 watchType 被设置为 PROCESS_CONTINUOUSLY,那么当文件被修改时,其所有的内容 (包含原有的内容和新增的内容) 都将被重新处理,因此这会打破 Flink 的 exactly-once 语义。
    • interval:定期扫描的时间间隔。
    • typeInformation:输入流中元素的类型。

    使用示例如下:

    final String filePath = "D:\\log4j.properties";
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.readFile(new TextInputFormat(new Path(filePath)),
                 filePath,
                 FileProcessingMode.PROCESS_ONCE,
                 1,
                 BasicTypeInfo.STRING_TYPE_INFO).print();
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    1.2 基于集合构建

    1. fromCollection(Collection):基于集合构建,集合中的所有元素必须是同一类型。示例如下:

    env.fromCollection(Arrays.asList(1,2,3,4,5)).print();
    
    • 1

    2. fromElements(T …): 基于元素构建,所有元素必须是同一类型。示例如下:

    env.fromElements(1,2,3,4,5).print();
    
    • 1

    3. generateSequence(from, to):基于给定的序列区间进行构建。示例如下:

    env.generateSequence(0,100);
    
    • 1

    4. fromCollection(Iterator, Class):基于迭代器进行构建。第一个参数用于定义迭代器,第二个参数用于定义输出元素的类型。使用示例如下:

    env.fromCollection(new CustomIterator(), BasicTypeInfo.INT_TYPE_INFO).print();
    
    • 1

    其中 CustomIterator 为自定义的迭代器,这里以产生 1 到 100 区间内的数据为例,源码如下。需要注意的是自定义迭代器除了要实现 Iterator 接口外,还必须要实现序列化接口 Serializable ,否则会抛出序列化失败的异常:

    import java.io.Serializable;
    import java.util.Iterator;
    
    public class CustomIterator implements Iterator<Integer>, Serializable {
        private Integer i = 0;
    
        @Override
        public boolean hasNext() {
            return i < 100;
        }
    
        @Override
        public Integer next() {
            i++;
            return i;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    5. fromParallelCollection(SplittableIterator, Class):方法接收两个参数,第二个参数用于定义输出元素的类型,第一个参数 SplittableIterator 是迭代器的抽象基类,它用于将原始迭代器的值拆分到多个不相交的迭代器中。

    1.3 基于 Socket 构建

    Flink 提供了 socketTextStream 方法用于构建基于 Socket 的数据流,socketTextStream 方法有以下四个主要参数:

    • hostname:主机名;
    • port:端口号,设置为 0 时,表示端口号自动分配;
    • delimiter:用于分隔每条记录的分隔符;
    • maxRetry:当 Socket 临时关闭时,程序的最大重试间隔,单位为秒。设置为 0 时表示不进行重试;设置为负值则表示一直重试。示例如下:
     env.socketTextStream("192.168.0.229", 9999, "\n", 3).print();
    
    • 1

    二、自定义 Data Source

    2.1 SourceFunction

    除了内置的数据源外,用户还可以使用 addSource 方法来添加自定义的数据源。自定义的数据源必须要实现 SourceFunction 接口,这里以产生 [0 , 1000) 区间内的数据为例,代码如下:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.addSource(new SourceFunction<Long>() {
        
        private long count = 0L;
        private volatile boolean isRunning = true;
    
        public void run(SourceContext<Long> ctx) {
            while (isRunning && count < 1000) {
                // 通过collect将输入发送出去 
                ctx.collect(count);
                count++;
            }
        }
    
        public void cancel() {
            isRunning = false;
        }
    
    }).print();
    env.execute();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    2.2 ParallelSourceFunction 和 RichParallelSourceFunction

    上面通过 SourceFunction 实现的数据源是不具有并行度的,即不支持在得到的 DataStream 上调用 setParallelism(n) 方法,此时会抛出如下的异常:

    Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source
    
    • 1

    如果你想要实现具有并行度的输入流,则需要实现 ParallelSourceFunction 或 RichParallelSourceFunction 接口,其与 SourceFunction 的关系如下图:

    在这里插入图片描述

    ParallelSourceFunction 直接继承自 ParallelSourceFunction,具有并行度的功能。RichParallelSourceFunction 则继承自 AbstractRichFunction,同时实现了 ParallelSourceFunction 接口,所以其除了具有并行度的功能外,还提供了额外的与生命周期相关的方法,如 open() ,closen() 。

    三、Streaming Connectors

    3.1 内置连接器

    除了自定义数据源外, Flink 还内置了多种连接器,用于满足大多数的数据收集场景。当前内置连接器的支持情况如下:

    • Apache Kafka (支持 source 和 sink)
    • Apache Cassandra (sink)
    • Amazon Kinesis Streams (source/sink)
    • Elasticsearch (sink)
    • Hadoop FileSystem (sink)
    • RabbitMQ (source/sink)
    • Apache NiFi (source/sink)
    • Twitter Streaming API (source)
    • Google PubSub (source/sink)

    除了上述的连接器外,你还可以通过 Apache Bahir 的连接器扩展 Flink。Apache Bahir 旨在为分布式数据分析系统 (如 Spark,Flink) 等提供功能上的扩展,当前其支持的与 Flink 相关的连接器如下:

    • Apache ActiveMQ (source/sink)
    • Apache Flume (sink)
    • Redis (sink)
    • Akka (sink)
    • Netty (source)

    随着 Flink 的不断发展,可以预见到其会支持越来越多类型的连接器,关于连接器的后续发展情况,可以查看其官方文档:Streaming Connectors 。在所有 DataSource 连接器中,使用的广泛的就是 Kafka,所以这里我们以其为例,来介绍 Connectors 的整合步骤。

    3.2 整合 Kakfa

    1. 导入依赖

    整合 Kafka 时,一定要注意所使用的 Kafka 的版本,不同版本间所需的 Maven 依赖和开发时所调用的类均不相同,具体如下:

    Maven 依赖Flink 版本Consumer and Producer 类的名称Kafka 版本
    flink-connector-kafka-0.8_2.111.0.0 +FlinkKafkaConsumer08
    FlinkKafkaProducer08
    0.8.x
    flink-connector-kafka-0.9_2.111.0.0 +FlinkKafkaConsumer09
    FlinkKafkaProducer09
    0.9.x
    flink-connector-kafka-0.10_2.111.2.0 +FlinkKafkaConsumer010
    FlinkKafkaProducer010
    0.10.x
    flink-connector-kafka-0.11_2.111.4.0 +FlinkKafkaConsumer011
    FlinkKafkaProducer011
    0.11.x
    flink-connector-kafka_2.111.7.0 +FlinkKafkaConsumer
    FlinkKafkaProducer
    >= 1.0.0

    这里我使用的 Kafka 版本为 kafka_2.12-2.2.0,添加的依赖如下:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.9.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 代码开发

    这里以最简单的场景为例,接收 Kafka 上的数据并打印,代码如下:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    Properties properties = new Properties();
    // 指定Kafka的连接位置
    properties.setProperty("bootstrap.servers", "hadoop001:9092");
    // 指定监听的主题,并定义Kafka字节消息到Flink对象之间的转换规则
    DataStream<String> stream = env
        .addSource(new FlinkKafkaConsumer<>("flink-stream-in-topic", new SimpleStringSchema(), properties));
    stream.print();
    env.execute("Flink Streaming");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3.3 整合测试

    1. 启动 Kakfa

    Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:

    # zookeeper启动命令
    bin/zkServer.sh start
    
    # 内置zookeeper启动命令
    bin/zookeeper-server-start.sh config/zookeeper.properties
    
    • 1
    • 2
    • 3
    • 4
    • 5

    启动单节点 kafka 用于测试:

    # bin/kafka-server-start.sh config/server.properties
    
    • 1

    2. 创建 Topic

    # 创建用于测试主题
    bin/kafka-topics.sh --create \
                        --bootstrap-server hadoop001:9092 \
                        --replication-factor 1 \
                        --partitions 1  \
                        --topic flink-stream-in-topic
    
    # 查看所有主题
     bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    3. 启动 Producer

    这里 启动一个 Kafka 生产者,用于发送测试数据:

    bin/kafka-console-producer.sh --broker-list hadoop001:9092 --topic flink-stream-in-topic
    
    • 1

    4. 测试结果

    在 Producer 上输入任意测试数据,之后观察程序控制台的输出:

    在这里插入图片描述

    程序控制台的输出如下:

    在这里插入图片描述

    可以看到已经成功接收并打印出相关的数据。

  • 相关阅读:
    全面理解链表数据结构:各种节点操作、做题技巧,易错点分析与题目清单(C++代码示例,不断更新)
    github:配置ssh密钥
    海外推广必备|如何制定领英LinkedIn营销战略?
    软件测试/测试开发丨App自动化—CSS 定位与原生定位
    WPF中使用WinForm Chart控件(1)----实时曲线
    贪心算法 Problem N 1013 求最大盈利
    微信小程序:独家全新娱乐性超高的喝酒神器
    Docker的基础命令
    服务器数据恢复—云服务器mysql数据库表被truncate的数据恢复案例
    Python文件操作1--知识点
  • 原文地址:https://blog.csdn.net/mxk4869/article/details/125533927