• 使用Flink的各种技术实现WordCount逻辑


    使用Flink的各种技术实现WordCount逻辑

    在大数据程序中,WordCount程序实现了统计词频的作用,这个WordCount程序也往往在大数据分析处理中一直占着非常重要的地位。统计一天内某网站的访问次数,需要对网站排序后求其词频,统计一段时间内某个用户的登陆次数,也是对网站用户分组后的词频计算.....等等,很多的大数据应用示例都是在WordCount的基础之上进行改良发展,最终实现大数据分析的关键逻辑。

    对于wordcount程序来说,基本思想在于输入文件中有每行的英文单词组成的句子,通过行处理的思想,将每个句子的英文单词分割出来,以(单词,1)这种key-value的形式来计数,再通过分组排序,将相同的单词放在一组,最后对每组的单词进行汇总统计。其思想流程图如下图所示。

    对于wordcount程序的处理框架,Flink提供了不同级别的编程抽象,通过调用抽象的数据集调用算子构建DataFlow就可以实现对分布式的数据进行流式计算和离线计算,DataSet是批处理的抽象数据集,DataStream是流式计算的抽象数据集,他们的方法都分别为Source、Transformation、Sink

    1. Source主要负责数据的读取
    2. Transformation主要负责对数据的转换操作
    3. Sink负责最终计算好的结果数据输出。

    Source读取词频文件的数据,通过Transformation的转换操作将文件中的英文句子切分单词,并分组统计,最终可以Sink到控制台中输出结果。flink的思想结构如下图。

    根据这种思想结构,结合不同的Transformation转换函数,常用算子如:

    map算子对一个DataStream中的每个元素使用用户自定义的map函数进行处理,每个输入元素对应一个输出元素,最终整个数据流被转换成一个新的DataStream。输入每一个句子,就可以通过map后的函数split空格,然后返回形如(单词,1)的新数据流DataStream。

    FlatMap只要处理处理一个输入元素,通过后面的函数可以实现输出一个或者多个输出元素的时候,尤其表现在输出一个元素,如wordcount中输出形如(单词,1)的这种元组类型的元素。就可以用到flatMap()。

    reduce方法可以对分组后的元素进行统计处理。

    当然, wordcount 也可以结合到不同的情况中。如滑动窗口内的wordcount,就需要结合SlidingWindow。

    下面就结合不同情况下使用Flink实现wordCount的词频计算。

    一、第一种情况:读取words.txt文件通过flink进行流式分析。

    这里需要用到DataStream的Source源DataStreamSource.步骤如下。

    1、打开Intellij IDEA,然后点击FIle-->New-->project。

    2、打开Project对话框后,左边点击maven,右边不需要点击,只要确认jdk的版本,然后Next进入下一步。

    3、在弹出的对话框中,输入groupId和artifactId,然后继续点击Next进入下一步。

    4、最后Finish结束配置。

    5、在pom.xml中设置dependency的依赖包。

    6、建立flink的streaming流式wordcount程序。

    注意在敲程序时,设定lambda表达式,方式如下

    File -----> Project Structure...

    在弹出的对话框中,左边选择Modules,右边选择Lambda表达式8,如下图所示。

    最终代码如下。

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    4. import org.apache.flink.streaming.api.datastream.KeyedStream;
    5. import org.apache.flink.api.java.tuple.Tuple2;
    6. import org.apache.flink.util.Collector;
    7. import org.apache.flink.api.common.typeinfo.Types;
    8. public class MyWordCount {
    9. public static void main(String[] args) throws Exception {
    10. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    11. DataStreamSource<String> lineds=env.readTextFile("f://data//words.txt");
    12. SingleOutputStreamOperator<Tuple2<String,Long>> wordstream=lineds.flatMap
    13. ((String line, Collector<Tuple2<String,Long>> out)->{
    14. String[] words=line.split(" ");
    15. for(String word:words){
    16. out.collect(Tuple2.of(word,1L));
    17. }
    18. }).returns(Types.TUPLE(Types.STRING,Types.LONG));
    19. KeyedStream<Tuple2<String,Long>,String> groupds=wordstream.keyBy(ds->ds.f0);
    20. groupds.sum(1).print();
    21. env.execute();
    22. }
    23. }

    程序中读取硬盘中的words.txt文件,运行后的输出结果如下图。

    前面就是线程号,后面进行wordcount词频的统计。

    二、第二种情况,通过flink对words.txt完成批处理的分析。

    这里需要使用Data的数据源Source。其它步骤与前面的DataStreamSource数据源的项目创建步骤一致,这里直接上代码。

    程序如下。

    1. import org.apache.flink.api.java.ExecutionEnvironment;
    2. import org.apache.flink.api.java.operators.DataSource;
    3. import org.apache.flink.api.java.operators.FlatMapOperator;
    4. import org.apache.flink.api.java.operators.UnsortedGrouping;
    5. import org.apache.flink.api.java.operators.AggregateOperator;
    6. import org.apache.flink.api.java.tuple.Tuple2;
    7. import org.apache.flink.util.Collector;
    8. public class YouWordCount {
    9. public static void main(String[] args) throws Exception{
    10. ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
    11. DataSource<String> lineds=env.readTextFile("f://data//words.txt");
    12. FlatMapOperator<String,Tuple2<String,Long>> wordds=lineds.flatMap((String line,Collector> out)->{
    13. String[] words=line.split(" ");
    14. for(String word:words){
    15. out.collect(Tuple2.of(word,1L));
    16. }
    17. });
    18. UnsortedGrouping<Tuple2<String,Long>> groupds=wordds.groupBy(0);
    19. AggregateOperator<Tuple2<String,Long>> result=groupds.sum(1);
    20. result.print();
    21. }
    22. }

    三、第三种情况:Flink收集流数据的word来计算词频

    Wordcount不但可以收集硬盘上的文件,还可以收集linux中的实时流。使用linux系统的 nc 指令提供实时流,命令如下:

    nc -l 9000
    

    后面的9000是流传输的端口号,用flink来监控linux中的实时流。

    代码如下。

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.api.java.tuple.Tuple2;
    4. import org.apache.flink.util.Collector;
    5. public class HeWordCount {
    6. public static void main(String[] args) throws Exception{
    7. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    8. DataStreamSource<String> lineds=env.socketTextStream("192.168.110.156",9000);
    9. lineds.flatMap((String line,Collector<Tuple2<String,Long>> out)->{
    10. String[] words=line.split(" ");
    11. for(String word:words){
    12. out.collect(Tuple2.of(word,1L));
    13. }
    14. }).keyBy(ds->ds.f0).sum(1).print();
    15. env.execute();
    16. }
    17. }

    四、Flink对实时流的Wordcount使用滚动窗口

    这里使用Flink程序对实时流可以使用滚动窗口计算5秒内的英文词频。

    谈到滚动窗口,不免需要说到Flink中的窗口。

    Flink按照时间生成Window,可以根据窗口实现原理的不同分成三类。

    1、滚动窗口(Tumbling Window)

    滚动窗口是将数据依据固定的窗口⻓度对数据进行切片。

    滚动窗口的特点是:时间对⻬、窗口⻓度固定,并且没有重叠。

    滚动窗口中有分配器,分配器会将每个元素分配到一个指定窗口大小的窗口中,滚动窗口有一个固定的大小,并且不会出现重叠。

    其原理图如下。

    2、滑动窗口(Sliding Window)

    滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口⻓度和滑动间隔组成。

    滑动窗口的特点是:时间对⻬、窗口⻓度固定,并且有重叠。

    滑动窗口中也有分配器,这个分配器将元素分配到固定⻓度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率,因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

    其原理图如下图。

    3、会话窗口(Session Window)

    会话窗口是由一系列事件组合一个指定时间⻓度的timeout间隙组成,类似于web应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。

    会话窗口的特点是时间无对⻬。

    会话也叫session,会话窗口中也有分配器,session窗口分配器通过session活动来对元素进行分组,session窗口跟滚动窗口和滑动窗口相比,不会有重叠和固定的开始时间和结束时间的情况,相反,当它在一个固定的时间周期内不再收到元素,即非活动间隔产生,那个这个窗口就会关闭。

    一个session窗口通过一个session间隔来配置,这个session间隔定义了非活跃周期的⻓度。

    当这个非活跃周期产生,那么当前的session将关闭并且后续的元素将被分配到新的session窗口中去。

    其原理图如下所示。

    图中看出,时间是不对齐的。

    这个案例中使用时间对齐不重叠的滚动窗口。

    代码如下。

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.api.java.tuple.Tuple2;
    4. import org.apache.flink.util.Collector;
    5. import org.apache.flink.api.common.typeinfo.Types;
    6. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    7. import org.apache.flink.streaming.api.windowing.time.Time;
    8. public class HeWordCount {
    9. public static void main(String[] args) throws Exception{
    10. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    11. DataStreamSource<String> lineds=env.socketTextStream("192.168.110.156",9000);
    12. lineds.flatMap((String line,Collector<Tuple2<String,Long>> out)->{
    13. String[] words=line.split(" ");
    14. for(String word:words){
    15. out.collect(Tuple2.of(word,1L));
    16. }
    17. }).returns(Types.TUPLE(Types.STRING,Types.LONG))
    18. .keyBy(ds->ds.f0)
    19. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    20. .sum(1)
    21. .print();
    22. env.execute();
    23. }
    24. }

    启动linux虚拟机,在进行nc -l 9000,wordcount程序会统计实时的计算词频的数据流。

    在linux中启动nc命令后输入内容如下图所示。

     

    运行程序后的结果如下图所示。

     

    五、使用flink SQL来实现5秒内的wordcount词频。

    flink sql相当于使用了sql语句来实现5秒内的wordcount词频统计。

    使用flink SQL需要使用flink-sql的依赖。

    在程序实现上,使用flink-SQL实现5秒内的wordcount词频代码如下 。

    1. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    2. import org.apache.flink.streaming.api.datastream.DataStreamSource;
    3. import org.apache.flink.table.api.EnvironmentSettings;
    4. import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    6. import org.apache.flink.api.java.tuple.Tuple2;
    7. import org.apache.flink.util.Collector;
    8. import org.apache.flink.table.api.Table;
    9. import static org.apache.flink.table.api.Expressions.$;
    10. import org.apache.flink.api.common.typeinfo.Types;
    11. public class SheWordCount {
    12. public static void main(String[] args) throws Exception{
    13. StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    14. EnvironmentSettings settings=EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    15. StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env,settings);
    16. DataStreamSource<String> lineds=env.readTextFile("f://data//words.txt");
    17. SingleOutputStreamOperator<Tuple2<String,Long>> wordStream=lineds.flatMap
    18. ((String line,Collector<Tuple2<String,Long>> out)->{
    19. String[] words=line.split(" ");
    20. for(String word:words){
    21. out.collect(Tuple2.of(word,1L));
    22. }
    23. }).returns(Types.TUPLE(Types.STRING,Types.LONG));
    24. Table table=tableEnv.fromDataStream(wordStream,$("word"),$("count"));
    25. Table result=table.groupBy($("word"))
    26. .select($("word"),$("count").sum());
    27. tableEnv.toRetractStream(result,Types.TUPLE(Types.STRING,Types.LONG)).print();
    28. env.execute();
    29. }
    30. }

    至此,使用Flink在各种情况下计算wordcount词频统计基本介绍完毕,

    1. github地址:
    2. https://github.com/wawacode/flink_wordcount
  • 相关阅读:
    selenium-webdriver 阿里云ARMS 自动化巡检
    ROS采用vector动态传递数组参数
    嵌入式通信协议----Wi-Fi协议详解(二)(基于STM32+有人物联网WIFI模块)
    后端防止重复点击
    机器人中的数值优化(十二)——带约束优化问题简介、LP线性规划
    WMS仓储管理系统如何实现供应链协同
    ORACLE 查询SQL优化
    关于pycharm中句号变成点的问题
    维格表项目进度同步到钉钉群
    【LeetCode: 67. 二进制求和 | 位运算 】
  • 原文地址:https://blog.csdn.net/play_big_knife/article/details/128155875