• 使用Flink批处理实现WordCount


    Flink作为一个非常优秀的大数据实时计算框架,在很多从事大数据开发的公司都是必备的技能,接下来我将通过Flink以批处理来实现入门案例WordCount

    1:步骤一

    idea新建设maven项目,并且自己配置好maven环境 

    2:步骤二

    在pom文件中加入下面的依赖和配置

    1. UTF-8
    2. 1.13.0
    3. 1.8
    4. 1.7.30
    5. 2.12
    6. ${java.version}
    7. ${java.version}
    8. org.apache.flink
    9. flink-java
    10. ${flink.version}
    11. org.apache.flink
    12. flink-streaming-java_${scala.binary.version}
    13. ${flink.version}
    14. org.apache.flink
    15. flink-clients_2.11
    16. ${flink.version}
    17. org.slf4j
    18. slf4j-log4j12
    19. ${slf4j.version}
    20. org.slf4j
    21. slf4j-api
    22. ${slf4j.version}
    23. org.apache.logging.log4j
    24. log4j-to-slf4j
    25. 2.14.0
    26. org.codehaus.mojo
    27. exec-maven-plugin
    28. 1.6.0
    29. java
    30. test
    31. org.apache.maven.plugins
    32. maven-compiler-plugin
    33. 8
    34. 8

    3:步骤三

    配置 log4j,在resources下面建一个文件log4j.properties,里面内容如下

    1. # Output pattern : date [thread] priority category - message
    2. log4j.rootLogger=WARN,CONSOLE,RollingFile
    3. #CONSOLE
    4. log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
    5. log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
    6. log4j.appender.CONSOLE.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
    7. #RollingFile
    8. log4j.appender.RollingFile=org.apache.log4j.DailyRollingFileAppender
    9. log4j.appender.RollingFile.File=logs/signserver.log
    10. log4j.appender.RollingFile.layout=org.apache.log4j.PatternLayout
    11. log4j.appender.RollingFile.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
    12. #Project default level
    13. log4j.logger.com.ntko.sign=debug

    4:步骤四

    新建一个input目录,并且新建words.txt文件,文件中输入如下内容

    hello world
    hello flink
    hello java

    5:步骤五

     新建一个BatchWordCount类,里面代码如下

    1. public class BatchWordCount {
    2. public static void main(String[] args) throws Exception {
    3. //1:创建执行环境
    4. ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    5. //2:从文件读取数据
    6. DataSource dataSource = executionEnvironment.readTextFile("input/words.txt");
    7. //将每行数据进行分词,转换成二元组类型
    8. FlatMapOperator> returns = dataSource.
    9. flatMap((String line, Collector> out) -> {
    10. //将一行文本进行分词
    11. String[] words = line.split(" ");
    12. //将每个单词转换成二元组输出
    13. for (String word : words) {
    14. out.collect(Tuple2.of(word, 1L));
    15. }
    16. }).returns(Types.TUPLE(Types.STRING, Types.LONG));
    17. //按照word进行分组
    18. UnsortedGrouping> tuple2UnsortedGrouping = returns.groupBy(0);
    19. //分组内进行聚合统计
    20. AggregateOperator> sum = tuple2UnsortedGrouping.sum(1);
    21. sum.print();
    22. }
    23. }

    6:步骤六

    输出结果 可以看到flink单词出现了1次,world出现了1次,hello出现了三次,java出现了一次

  • 相关阅读:
    微信小程序接入火山引擎埋点数据
    算法的时间复杂度和空间复杂度
    R语言参数自抽样法Bootstrap:估计MSE、经验功效、杰克刀Jackknife、非参数自抽样法可视化
    数据结构:循环队列
    【Python】操作符梅花号*、乘号*的用法汇总与小结
    WZOI-222最小公倍数
    JavaSE之反射
    python异步编程之asyncio低阶API
    BATJ高频面试249道题:微服务+多线程+分布式+MyBatis +Spring
    javaScript:DOM中常用尺寸
  • 原文地址:https://blog.csdn.net/W_317/article/details/127941808