• Flink快速上手 完整使用 (第二章)


    本教程使用到的工具
    NC链接

    一、环境准备

    系统环境为Windows 10。
    需提前安装Java 8。
    集成开发环境(IDE)使用IntelliJ IDEA,具体的安装流程参见IntelliJ官网。
    安装IntelliJ IDEA之后,还需要安装一些插件——Maven和Git。Maven用来管理项目依赖;通过Git可以轻松获取我们的示例代码,并进行本地代码的版本控制。

    二、创建项目

    1、创建项目在这里插入图片描述

    2、添加项目依赖

    在项目的pom文件中,增加标签设置属性,然后增加标签引入需要的依赖。我们需要添加的依赖最重要的就是Flink的相关组件,包括flink-java、flink-streaming-java,以及flink-clients(客户端,也可以省略)。另外,为了方便查看运行日志,我们引入slf4j和log4j进行日志管理。

    <properties>
        <flink.version>1.13.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>
    
    <dependencies>
    <!-- 引入Flink相关依赖-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
    </dependency>
    <!-- 引入日志管理相关依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
    </dependency>
    </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    在属性中,我们定义了,这指代的是所依赖的Scala版本。这有一点奇怪:Flink底层是Java,而且我们也只用Java API,为什么还会依赖Scala呢?这是因为Flink的架构中使用了Akka来实现底层的分布式通信,而Akka是用Scala开发的。我们本书中用到的Scala版本为2.12。

    3、配置日志管理

    在目录src/main/resources下添加文件:log4j.properties,内容配置如下:

    log4j.rootLogger=error, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
    
    • 1
    • 2
    • 3
    • 4

    4、编写代码

    搭好项目框架,接下来就是我们的核心工作——往里面填充代码。我们会用一个最简单的示例来说明Flink代码怎样编写:统计一段文字中,每个单词出现的频次。这就是传说中的WordCount程序——它是大数据领域非常经典的入门案例,地位等同于初学编程语言时的Hello World。

    我们的源码位于src/main/java目录下。首先新建一个包,命名为com.atguigu.wc,在这个包下我们将编写Flink入门的WordCount程序。

    我们已经知道,尽管Flink自身的定位是流式处理引擎,但它同样拥有批处理的能力。所以接下来,我们会针对不同的处理模式、不同的输入数据形式,分别讲述WordCount代码的实现。

    5) 批处理

    对于批处理而言,输入的应该是收集好的数据集。这里我们可以将要统计的文字,写入一个文本文档,然后读取这个文件处理数据就可以了。

    (1)在工程根目录下新建一个input文件夹,并在下面创建文本文件words.txt
    (2)在words.txt中输入一些文字,例如:

    hello world
    hello flink
    hello java
    
    • 1
    • 2
    • 3

    (3)在com.atguigu.chapter02包下新建Java类BatchWordCount,在静态main方法中编写测试代码。

    我们进行单词频次统计的基本思路是:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数,就是对应单词的频次。

    package com.example.wc;
    
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.AggregateOperator;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.FlatMapOperator;
    import org.apache.flink.api.java.operators.UnsortedGrouping;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    
    /**
     * 批处理
     * WOrd Count
     */
    public class BachWordCount {
        public static void main(String[] args) throws Exception {
            //1、创建执行环境
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
            DataSource<String> textFile = executionEnvironment.readTextFile("input/words.txt");
    
            //将每行数据进行分词,转换成二元组类型
            FlatMapOperator<String, Tuple2<String, Long>> wordAndOneTuple = textFile.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                        //将一行文本进行分词
                        String[] words = line.split(" ");
                        //将每个单词转换成二元组输出
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1l));
                        }
                    })
                    .returns(Types.TUPLE(Types.STRING, Types.LONG));
    
            //按照word分组
            UnsortedGrouping<Tuple2<String, Long>> groupBy = wordAndOneTuple.groupBy(0);
    
            //5、分组内进行聚合统计
            AggregateOperator<Tuple2<String, Long>> operator = groupBy.sum(1);
            operator.print();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    (4) 代码说明

    ① Flink在执行应用程序前应该获取执行环境对象,也就是运行时上下文环境。
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    ② Flink同时提供了Java和Scala两种语言的API,有些类在两套API中名称是一样的。所以在引入包时,如果有Java和Scala两种选择,要注意选用Java的包。

    ③ 直接调用执行环境的readTextFile方法,可以从文件中读取数据。

    ④我们的目标是将每个单词对应的个数统计出来,所以调用flatmap方法可以对一行文字进行分词转换。将文件中每一行文字拆分成单词后,要转换成(word,count)形式的二元组,初始count都为1。returns方法指定的返回数据类型Tuple2,就是Flink自带的二元组数据类型。

    ⑤ 在分组时调用了groupBy方法,它不能使用分组选择器,只能采用位置索引或属性名称进行分组。

    // 使用索引定位
    dataStream.groupBy(0)
    // 使用类属性名称
    dataStream.groupBy("id")
    
    • 1
    • 2
    • 3
    • 4

    ⑤ 在分组之后调用sum方法进行聚合,同样只能指定聚合字段的位置索引或属性名称。

    (4) 运行程序,控制台会打印出结果:

    (java,1)
    (flink,1)
    (world,1)
    (hello,3)
    
    • 1
    • 2
    • 3
    • 4

    可以看到,我们将文档中的所有单词的频次,全部统计出来,以二元组的形式在控制台打印输出了。

    需要注意的是,这种代码的实现方式,是基于DataSet API的,也就是我们对数据的处理转换,是看作数据集来进行操作的。事实上Flink本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的API来实现。所以从Flink 1.12开始,官方推荐的做法是直接使用DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:
    $ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
    这样,DataSet API就已经处于“软弃用”(soft deprecated)的状态,在实际应用中我们只要维护一套DataStream API就可以了。这里只是为了方便大家理解,我们依然用DataSet API做了批处理的实现。

    6、流处理

    我们已经知道,用DataSet API可以很容易地实现批处理;与之对应,流处理当然可以用DataStream API来实现。对于Flink而言,流才是整个处理逻辑的底层核心,所以流批统一之后的DataStream API更加强大,可以直接处理批处理和流处理的所有场景。
    DataStream API作为“数据流”的处理接口,又怎样处理批数据呢?
    回忆一下上一章中我们讲到的Flink世界观。在Flink的视角里,一切数据都可以认为是流,流数据是无界流,而批数据则是有界流。所以批处理,其实就可以看作有界流的处理。
    对于流而言,我们会在获取输入数据后立即处理,这个过程是连续不断的。当然,有时我们的输入数据可能会有尽头,这看起来似乎就成了一个有界流;但是它跟批处理是截然不同的——在输入结束之前,我们依然会认为数据是无穷无尽的,处理的模式也仍旧是连续逐个处理。
    下面我们就针对不同类型的输入数据源,用具体的代码来实现流处理。

    1、读取文件

    我们同样试图读取文档words.txt中的数据,并统计每个单词出现的频次。这是一个“有界流”的处理,整体思路与之前的批处理非常类似,代码模式也基本一致。

    package com.example.wc;
    
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * 流式处理
     * wordCount
     * 并行度就是当前任务分成多少分、做多线程并行处理的他的程度个数
     *
     * 没有设置多个
     *      就是电脑CPU核心数量
     */
    public class BoundedStreamWordCount {
        public static void main(String[] args) throws Exception {
    
            //1、创建流式执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //读取文件
            DataStreamSource<String> textFile = env.readTextFile("input/words.txt");
    
            SingleOutputStreamOperator<Tuple2<String, Long>> returns = textFile.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            //包装二元组
                            out.collect(Tuple2.of(word, 1L));
                        }
                    })
                    .returns(Types.TUPLE(Types.STRING, Types.LONG));
    
            //分组
            //returns.keyBy(0) 这样提示放弃了
            KeyedStream<Tuple2<String, Long>, String> keyBy = returns.keyBy(data -> data.f0);
    
            //求和
            SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyBy.sum(1);
    
            sum.print();
    
            env.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    package com.example.wc;
    //package com.example.wc.StreamWordCount;
    
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * 无界数据流
     */
    public class StreamWordCount {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //2、读取文本
            DataStreamSource<String> textFile = executionEnvironment.socketTextStream("hadoop102", 7777);
    
            SingleOutputStreamOperator<Tuple2<String, Long>> returns = textFile.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            //包装二元组
                            out.collect(Tuple2.of(word, 1L));
                        }
                    })
                    .returns(Types.TUPLE(Types.STRING, Types.LONG));
    
            //分组
            //returns.keyBy(0) 这样提示放弃了
            KeyedStream<Tuple2<String, Long>, String> keyBy = returns.keyBy(data -> data.f0);
    
            //求和
            SingleOutputStreamOperator<Tuple2<String, Long>> sum = keyBy.sum(1);
            sum.print();
            executionEnvironment.execute();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    代码说明和注意事项:

    socket文本流的读取需要配置两个参数:发送端主机名和端口号。这里代码中指定了主机

    在实际项目应用中,主机名和端口号这类信息往往可以通过配置文件,或者传入程序运行参数的方式来指定。

    socket文本流数据的发送,可以通过Linux系统自带的netcat工具进行模拟。

    (2)在windows环境的安装NC、发送数据进行测试:

    nc lp -7777
    hello flink
    hello world
    hello java
    
    
    • 1
    • 2
    • 3
    • 4
    • 5

    可以看到控制台输出结果如下

    4> (flink,1)
    2> (hello,1)
    3> (world,1)
    2> (hello,2)
    2> (hello,3)
    1> (java,1)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    二、本章总结

    本章主要实现一个Flink开发的入门程序——词频统计WordCount。通过批处理和流处理两种不同模式的实现,可以对Flink的API风格和编程方式有所熟悉,并且更加深刻地理解批处理和流处理的不同。另外,通过读取有界数据(文件)和无界数据(socket文本流)进行流处理的比较,我们也可以更加直观地体会到Flink流处理的方式和特点。

    这是我们Flink长征路上的第一步,是后续学习的基础。有了这番初体验,想必大家会发现Flink提供了非常易用的API,基于它进行开发并不是难事。之后我们会逐步深入展开,为大家打开Flink神奇世界的大门。

  • 相关阅读:
    ArrayList与顺序表
    XILINX XC7A200T-2FBG676C PLC可编程逻辑控制器
    Spring Ioc源码分析系列--Ioc容器注册BeanPostProcessor后置处理器以及事件消息处理
    Node.JS是什么
    计算机毕业设计(附源码)python在线考试主观题评分系统
    算法——前缀和之除自身以外数组的乘积、和为K的子数组、和可被K整除的子数组、连续数组、矩阵区域和
    【服务器搭建】教程一:没钱买服务器怎么玩 进来看
    Github上都在疯找的京东内部“架构师进阶手册”终于来了
    2024牛客寒假算法基础集训营4(视频讲解题目)
    AtCoder Beginner Contest 276 G - Count Sequences 差分
  • 原文地址:https://blog.csdn.net/qq_42082701/article/details/126241016