• 【入门Flink】- 02Flink经典案例-WordCount


    WordCount

    需求:统计一段文字中,每个单词出现的频次

    添加依赖

    	<properties>
            <flink.version>1.17.0flink.version>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-javaartifactId>
                <version>${flink.version}version>
            dependency>
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clientsartifactId>
                <version>${flink.version}version>
            dependency>
        dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    1.批处理

    基本思路:先逐行读入文件数据,然后将每一行文字拆分成单词;接着按照单词分组,统计每组数据的个数。

    1.1.数据准备

    resources目录下新建一个 input 文件夹,并在下面创建文本文件words.txt

    words.txt

    hello flink
    hello world
    hello java
    
    • 1
    • 2
    • 3

    1.2.代码编写

    public class BatchWordCount {
        public static void main(String[] args) throws Exception {
            // 1. 创建执行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            // 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)
            String filePath = Objects.requireNonNull(
                   BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
            DataSource<String> lineDS = env.readTextFile(filePath);
    
            // 3. 转换数据格式
            FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(
                    new FlatMapFunction<String, Tuple2<String, Long>>() {
                        @Override
                        public void flatMap(String line, Collector<Tuple2<String, Long>> out) {
                            String[] words = line.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1L));
                            }
                        }
                    });
    
            // 4. 按照 word 进行分组
            UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);
    
            // 5. 分组内聚合统计
            AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);
    
            // 6. 打印结果
            sum.print();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    打印结果如下:(结果正确)

    image-20231031193224024

    上述代码是基于 DataSet API 的,也就是对数据的处理转换,是看作数据集来进行操作的。

    事实上 Flink 本身是流批统一的处理架构,批量的数据集本质上也是流,没有必要用两套不同的 API 来实现。从Flink 1.12 开始,官方推荐的做法是直接使用 DataStream API,在提交任务时通过将执行模式设为BATCH来进行批处理:

    bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar
    
    • 1

    2.流处理

    DataStreamAPI可以直接处理批处理和流处理的所有场景

    2.1读取文件

    还是上述words.txt文件

    代码实现:

    public class StreamWordCount {
        public static void main(String[] args) throws Exception {
            // 1. 创建流式执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2.读取文件
            String filePath = Objects.requireNonNull(
                    StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
            DataStreamSource<String> lineStream = env.readTextFile(filePath);
    
            // 3. 转换、分组、求和,得到统计结果
            SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                        @Override
                        public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                            String[] words = line.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1L));
                            }
                        }
                    }).keyBy(data -> data.f0)
                    .sum(1);
    
            // 4. 打印
            sum.print();
            // 5. 执行
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    与批处理程序BatchWordCount有几点不同:

    • 创建执行环境的不同,流处理程序使用的是 StreamExecutionEnvironment
    • 转换处理之后,得到的数据对象类型不同。
    • 分组操做调用的是 keyBy 方法,可以传入一个匿名函数作为键选择器(KeySelector),指定当前分组的key。
    • 最后执行execute方法,开始执行任务。

    2.2读取Socket文件流

    实际生产中,真正的数据多是无界的,需要持续地捕获数据。为了模拟这种场景,可以监听 socket 端口,然后向该端口不断的发送数据。

    1. 简单改动,只需将StreamWordCount 代码中读取文件数据的 readTextFile 方法,替换成读取socket文本流的方法socketTextStream
    public class StreamSocketWordCount {
        public static void main(String[] args) throws Exception {
            // 1. 创建流式执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // 2.读取文件
            DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);
    
            // 3. 转换、分组、求和,得到统计结果
            SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                        @Override
                        public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                            String[] words = line.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1L));
                            }
                        }
                    }).keyBy(data -> data.f0)
                    .sum(1);
    
            // 4. 打印
            sum.print();
            // 5. 执行
            env.execute();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    1. 在 Linux 环境的主机 124.222.253.33 上,执行下列命令,发送数据进行测试
    nc -lk 7777
    
    • 1

    注意:要先启动端口,后启动 StreamSocketWordCount 程序,否则会报超时连接异常。

    1. 从Linux发送数据

    1、输入“hello flink”,输出如下内容

    image-20231031201232801

    2、再输入“hello world”,输出如下内容

    image-20231031201316467

    Flink 还具有一个类型提取系统,可以分析函数的输入和返回类型,自动获取类型信息,从而获得对应的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情况下(比如 Lambda 表达式中),自动提取的信息是不够精细的,对于 flatMap 里传入的 Lambda 表达式,系统只能推断出返回的是Tuple2类型,而无法得到 Tuple2。需要显式地告诉系统当前的返回类型,才能正确地解析出完整数据

  • 相关阅读:
    Python爬取公交线路信息及站点shp数据 文末附数据下载地址
    Vue的基本使用
    《JAVASE系列》String 类
    k8s常用命令
    Qt 综合练习小项目--反金币(2/2)
    spring cloud搭建教程
    deepspeed 训练多机多卡报错 ncclSystemError Last error
    VMware Workstation中桥接模式、NAT模式、仅主机模式
    什么是RabbitMQ
    等保评测是什么意思
  • 原文地址:https://blog.csdn.net/qq_43417581/article/details/134176911