• Flink(一)【WordCount 快速入门】


    前言

            学完了 Hadoop、Spark,本想着先把 Kafka、Flume 这些工具先学完的,但想了想还是把核心的技术先学完最后再去把那些工具学学。

            最近心有点累哈哈哈,偷偷立个 flag,反正也没人看,明年的今天来这里还愿哈,愿望这种事情我是从来是不会说出来的,毕竟言以泄败,事以密成嘛。

    那我隐晦低表达一下,摘录自《解忧杂货店》的一条句子:

            这是克朗对自己梦想的描述,其实他不是自不量力,而是假如放弃了这个梦想,他的生活就失去了光,他未来的几十年生活会枯燥无味,会活的没有一点激情。
            就像一个曾经自己深爱过的姑娘一样,明明无法在一起,却还是始终记挂着,因为心里眼里只有她,所以别人在你眼中,都会黯然失色的,没有色彩的东西,又怎么能投入激情去爱呢?

            我的愿望有两个,在上面中有所体现,但我希望结果不要是遗憾,第一个愿望明年这会大概知道结果了,第二个愿望应该会晚一点,也许在2025年的春天,也许会更早一点...

    API 环境搭建

    添加依赖

    pom.xml

    1. <properties>
    2. <flink.version>1.13.0flink.version>
    3. <java.version>1.8java.version>
    4. <scala.binary.version>2.12scala.binary.version>
    5. <slf4j.version>1.7.30slf4j.version>
    6. properties>
    7. <dependencies>
    8. <dependency>
    9. <groupId>org.apache.flinkgroupId>
    10. <artifactId>flink-javaartifactId>
    11. <version>${flink.version}version>
    12. dependency>
    13. <dependency>
    14. <groupId>org.apache.flinkgroupId>
    15. <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
    16. <version>${flink.version}version>
    17. dependency>
    18. <dependency>
    19. <groupId>org.apache.flinkgroupId>
    20. <artifactId>flink-clients_${scala.binary.version}artifactId>
    21. <version>${flink.version}version>
    22. dependency>
    23. <dependency>
    24. <groupId>org.slf4jgroupId>
    25. <artifactId>slf4j-apiartifactId>
    26. <version>${slf4j.version}version>
    27. dependency>
    28. <dependency>
    29. <groupId>org.slf4jgroupId>
    30. <artifactId>slf4j-log4j12artifactId>
    31. <version>${slf4j.version}version>
    32. dependency>
    33. <dependency>
    34. <groupId>org.apache.logging.log4jgroupId>
    35. <artifactId>log4j-to-slf4jartifactId>
    36. <version>2.14.0version>
    37. dependency>
    38. dependencies>

    log4j.properties 

    1. log4j.rootLogger=error, stdout
    2. log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    3. log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    4. log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

     入门案例

    0、数据准备

    在 根目录下创建 words.txt

    1. hello flink
    2. hello java
    3. hello spark
    4. hello hadoop

    1、批处理

    批处理所用到的算子API 都继承自 DataSet,而新版的 Flink 已经做到了流批一体,这里只做演示,以后这类 API 应该是要被弃用了。

    1. import org.apache.flink.api.common.typeinfo.Types;
    2. import org.apache.flink.api.java.ExecutionEnvironment;
    3. import org.apache.flink.api.java.operators.AggregateOperator;
    4. import org.apache.flink.api.java.operators.DataSource;
    5. import org.apache.flink.api.java.operators.FlatMapOperator;
    6. import org.apache.flink.api.java.operators.UnsortedGrouping;
    7. import org.apache.flink.api.java.tuple.Tuple2;
    8. import org.apache.flink.util.Collector;
    9. public class BatchWordCount {
    10. public static void main(String[] args) throws Exception {
    11. // 1. 创建一个执行批式数据处理环境
    12. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    13. // 2. 从文件中读取数据 String类型 批式数据处理环境得到的 DataSource 继承自 DataSet
    14. DataSource lineDS = env.readTextFile("input/words.txt");
    15. // 3. 将每行数据转换成一个二元组类型
    16. // 输入类型: String 输出类型: Tuple2
    17. FlatMapOperator> wordAndOne =
    18. // String lines: 输入数据行 Collector> out: 输出类型
    19. lineDS.flatMap((String line, Collector> out) -> {
    20. String[] words = line.split(" ");
    21. for (String word : words) {
    22. out.collect(Tuple2.of(word, 1L));
    23. }
    24. }).returns(Types.TUPLE(Types.STRING, Types.LONG)); //使用 Java 泛型的时候, 由于泛型擦除的存在, 需要显示信息返回返回值类型
    25. // 4. 根据 word 分组
    26. UnsortedGrouping> wordGroup = wordAndOne.groupBy(0); // 0 是索引位置
    27. // 5. 分组内进行聚合
    28. AggregateOperator> res = wordGroup.sum(1); // 1 也是索引位置
    29. // 6. 打印结果
    30. res.print();
    31. }
    32. }

    运行结果:

    1. (hadoop,1)
    2. (flink,1)
    3. (hello,4)
    4. (java,1)
    5. (spark,1)
    6. Process finished with exit code 0

    因为现在已经是流批一体的框架了,所以提交 Flink 批处理任务需要用下面的语句:

    $ bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

    2、流处理

    2.1、有界数据流处理

    这里我们用离线数据(提前创建好的文件)用流处理API DataStream 的算子来做处理。

    1. import org.apache.flink.api.common.typeinfo.Types;
    2. import org.apache.flink.api.java.tuple.Tuple2;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.datastream.KeyedStream;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. import org.apache.flink.util.Collector;
    8. public class BoundedStreamWordCount {
    9. public static void main(String[] args) throws Exception {
    10. // 1. 创建一个流式的执行环境
    11. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    12. // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
    13. DataStreamSource lineDS = env.readTextFile("input/words.txt");
    14. // 3. flatMap 打散数据 返回元组
    15. SingleOutputStreamOperator> wordAndOne = lineDS.flatMap((String line, Collector> out) -> {
    16. String[] words = line.split(" ");
    17. for (String word : words) {
    18. out.collect(Tuple2.of(word, 1L));
    19. }
    20. }).returns(Types.TUPLE(Types.STRING, Types.LONG));
    21. // 4. 根据 word 分组
    22. KeyedStream, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);
    23. // 5. 根据键对索引为 1 处的值进行合并
    24. SingleOutputStreamOperator> res = wordGroupByKey.sum(1);
    25. // 6. 输出结果
    26. res.print();
    27. // 7. 执行
    28. env.execute(); // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    29. }
    30. }

    运行结果:

    1. 3> (java,1)
    2. 13> (flink,1)
    3. 1> (spark,1)
    4. 5> (hello,1)
    5. 5> (hello,2)
    6. 5> (hello,3)
    7. 5> (hello,4)
    8. 15> (hadoop,1)

            我们可以发现,输出的单词的顺序是乱序的,因为集群模式下数据流不是在本地执行的,而是在多个节点中执行,所以也就无法保证先输入的单词最先输出。

            Idea下Flink API 会使用多线程来模拟集群下的多节点并行处理,而我们每行数据前面的 "编号>" 代表的就是线程的 id(对应 Flink 运行时占据的最小资源,也叫任务槽),默认使用当前电脑的所有 CPU 数。

            我们还可以发现,hello是同一个节点上处理的,这是因为我们在做分组的时候,把分组后的数据分到了同一个节点(子任务)上。

    2.2、无界数据流处理

    这里我们使用 netcat 来模拟产生数据流

    1. import org.apache.flink.api.common.typeinfo.Types;
    2. import org.apache.flink.api.java.tuple.Tuple2;
    3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    4. import org.apache.flink.streaming.api.datastream.KeyedStream;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    7. import org.apache.flink.util.Collector;
    8. public class UnBoundedStreamWordCount {
    9. public static void main(String[] args) throws Exception {
    10. // 1. 创建一个流式的执行环境
    11. StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
    12. // 2. 流式数据处理环境得到的 DataSource 继承自 DataStream
    13. ParameterTool parameterTool = ParameterTool.fromArgs(args);
    14. String host = parameterTool.get("host");
    15. Integer port = parameterTool.getInt("port");
    16. DataStreamSource lineDS = env.socketTextStream(host,port);
    17. // 3. flatMap 打散数据 返回元组
    18. SingleOutputStreamOperator> wordAndOne = lineDS.flatMap((String line, Collector> out) -> {
    19. String[] words = line.split(" ");
    20. for (String word : words) {
    21. out.collect(Tuple2.of(word, 1L));
    22. }
    23. }).returns(Types.TUPLE(Types.STRING, Types.LONG));
    24. // 4. 根据 word 分组
    25. KeyedStream, String> wordGroupByKey = wordAndOne.keyBy(t -> t.f0);
    26. // 5. 根据键对索引为 1 处的值进行合并
    27. SingleOutputStreamOperator> res = wordGroupByKey.sum(1);
    28. // 6. 输出结果
    29. res.print();
    30. // 7. 执行
    31. env.execute(); // 这里我们的数据是有界的,但是真正开发环境是无界的,这里需要用execute方法等待新数据的到来
    32. }
    33. }

    运行结果: 

            可以看到,处理是相当快的,毕竟数据量很小,但是会想到 SparkStreaming 的处理过程,我们之前用 SparkStreaming 的时候还需要设置 Reciver 的接收间隔,而我们的 Flink 则是真正的实时处理。

    总结

            Flink 的学习终于开始了,还是一样的要求,不照搬视频课件内容,每行代码要有自己的思考,每行博客也要是自己思考的总结。

            还有,最近感觉愈发词穷,该多看书了,以后养成每次博客加一条书摘的习惯。

  • 相关阅读:
    分布式事务解决方案
    基于51单片机火灾监测自动灭火装置Proteus仿真
    C语言:写文件(附完整源码)
    c# - - - CentOS 7 部署ASP.Net Core项目
    虚拟环境和包
    UI设计是什么意思?一文给你讲清楚
    六、colab训练模型
    C++~auto关键字
    wallys/QCN9074/WiFi 6 (802.11ax) 4×4 MU-MIMO 2.4GHz Single Band Wireless Module
    被DDOS了怎么办 要如何应对
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/134251332