• 大数据flink篇之二-基础实例wordcount


    flink既支持批数据处理,也支持流数据处理。flink1.12版本后,批流进行了api统一。开发语言可以选择java和scala,这里选择java。下面以wordcount为例,讲解flink编程的流程。
    开发前提:

    • idea
    • maven
    • jdk 1.8

    一、maven依赖

    <properties>
            <flink.version>1.15.0</flink.version>
            <flink.scala.version>2.12</flink.scala.version>
        </properties>
        <dependencies>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-hadoop-compatibility_${flink.scala.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    二、数据结构

    wordcount.txt数据结构:
    zhangsan,lisi,wangwu
    ajdhaj,hdgaj,zhangsan
    lisi,wangwu

    三、DataSet API读取离线文件实现wordcount统计

    package com.first.example;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.*;
    import org.apache.flink.api.java.operators.AggregateOperator;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.FilterOperator;
    import org.apache.flink.api.java.operators.FlatMapOperator;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;
    import org.apache.flink.util.Collector;
    
    
    /**
     * @author xxxx
     * @date 2023-09-26 12:48
     */
    public class DataSetWordCount {
        public static void main(String[] args) throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource<String> dataSource = env.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");
            FlatMapOperator<String, Tuple2<String, Integer>> tuple2FlatMapOperator = dataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] split = s.split(",");
                    for (String s1 : split) {
                        if (!StringUtils.isEmpty(s1)) {
                            collector.collect(new Tuple2<>(s1, 1));
                        }
                    }
                }
            });
    
            AggregateOperator<Tuple2<String, Integer>> total = tuple2FlatMapOperator
                    .groupBy(0)
                    .sum(1);
            //打印输出
    
            total.print();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    编程模式从获取执行环境env开始; readTextFile读取文件;flatMap对数据进行处理;最后经过groupBy和sum进行统一处理。
    结果:
    (ajdhaj,1)
    (wangwu,2)
    (lisi,2)
    (hdgaj,1)
    (zhangsan,2)

    四、DataStream API以流执行方式实现wordcount统计

    package com.first.example;
    
    import org.apache.commons.compress.archivers.StreamingNotSupportedException;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    
    /**
     * @date 2023-09-26 13:21
     */
    public class StreamingWordCount {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStreamSource<String> textFile = senv.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");
            DataStream<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] split = s.split(",");
                    for (String s1 : split) {
                        if (!StringUtils.isEmpty(s1)) {
                            collector.collect(Tuple2.of(s1, 1));
                        }
                    }
                }
            });
            tuple2SingleOutputStreamOperator.keyBy(ele -> ele.f0)
                    .sum(1).print(">>>");
            senv.execute("first streaming....");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    执行过程:
    获取执行环境;读取文件转为流;经过flatMap转换;然后通过keyBy和sum算子进行聚合操作。

    结果:

    :6> (lisi,1)
    :4> (wangwu,1)
    :4> (zhangsan,1)
    :6> (hdgaj,1)
    :6> (lisi,2)
    :4> (zhangsan,2)
    :4> (wangwu,2)
    :2> (ajdhaj,1)

    批流对比结果发现:批是整体一个输出,流是按数据逐条统计输出

    五、使用批流统一api批数据统计

    flink 1.12后使用批流统一api进行处理

    package com.first.example;
    
    import org.apache.commons.lang3.StringUtils;
    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * 批流完全联合统一
     * @date 2023-09-26 15:38
     */
    public class BatchStreamWordCount {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
            // 默认为流,此处设为批
            senv.setRuntimeMode(RuntimeExecutionMode.BATCH);
    
            DataStreamSource<String> textFile = senv.readTextFile("E:\\Centos\\flinkExercise\\data\\wordcount.txt");
            DataStream<Tuple2<String, Integer>> tuple2SingleOutputStreamOperator = textFile.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    String[] split = s.split(",");
                    for (String s1 : split) {
                        if (!StringUtils.isEmpty(s1)) {
                            collector.collect(Tuple2.of(s1, 1));
                        }
                    }
                }
            });
            tuple2SingleOutputStreamOperator.keyBy(ele -> ele.f0)
                    .sum(1).print(">>>");
            senv.execute("first streaming....");
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    与流处理过程基本一致,添加了运行环境设置:
    senv.setRuntimeMode(RuntimeExecutionMode.BATCH);

    结果为:

    :2> (ajdhaj,1)
    :4> (wangwu,2)
    :4> (zhangsan,2)
    :6> (lisi,2)
    :6> (hdgaj,1)

    下一节讲解Flink的安装和部署。

  • 相关阅读:
    linux进程间通信之消息队列
    21级数据结构与算法实验8——排序
    16位数码管驱动及键盘控制芯片CH456
    docker搭建elk(各组件版本均为7.17.5)所遇问题
    couldn‘t find “libopencv_java3.so“
    【计算机网络】 集线器、网桥、交换机、路由器看这一篇就懂了。实验: 路由器的作用,以及没有路由器的情况下,如何用三层交换机实现路由器的功能
    教你基于MindSpore用DCGAN生成漫画头像
    Builder模式
    Docker 常用命令总结
    在一次又一次的失败中, 我总结了这份万字的《MySQL 性能调优笔记》
  • 原文地址:https://blog.csdn.net/a18852867035/article/details/133312654