• Flink学习(七)-单词统计


    前言

    Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

    一、代码基础格式

    1. //1st 设置执行环境
    2. xxxEnvironment env = xxxEnvironment.getEnvironment;
    3. //2nd 设置流
    4. DataSource xxxDS=env.xxxx();
    5. //3rd 设置转换
    6. Xxx transformation =xxxDS.xxxx();
    7. //4th 设置sink
    8. transformation.print();
    9. //5th 可能需要
    10. env.execute();

    二、Demo1 批处理

    • 源码

    1. public static void main(String[] args) throws Exception {
    2. //1,创建一个执行环境
    3. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    4. //2,获取输入流
    5. DataSource lineDS = env.readTextFile("input/word.txt");
    6. //3,处理数据
    7. FlatMapOperator> wordDS = lineDS.flatMap(new FlatMapFunction>() {
    8. @Override
    9. public void flatMap(String value, Collector> collector) throws Exception {
    10. //3.1 分隔字符串
    11. String[] values = value.split(" ");
    12. //3.2 汇总统计
    13. for (String word : values) {
    14. Tuple2 wordTuple = Tuple2.of(word, 1);
    15. collector.collect(wordTuple);
    16. }
    17. }
    18. });
    19. //4,按单词聚合
    20. UnsortedGrouping> tuple2UnsortedGrouping = wordDS.groupBy(0);
    21. //5,分组内聚合
    22. AggregateOperator> sum = tuple2UnsortedGrouping.sum(1);
    23. //6,输出结果
    24. sum.print();
    25. }
    • 效果展示

    三、Demo2 流处理

    • 源码

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. DataStreamSource lineDS = env.readTextFile("input/word.txt");
    4. SingleOutputStreamOperator> wordDS = lineDS.flatMap(new FlatMapFunction>() {
    5. @Override
    6. public void flatMap(String value, Collector> collector) throws Exception {
    7. String[] words = value.split(" ");
    8. for (String word : words) {
    9. Tuple2 temp = Tuple2.of(word, 1);
    10. collector.collect(temp);
    11. }
    12. }
    13. });
    14. KeyedStream, Tuple> wordCountKeyBy = wordDS.keyBy(0);
    15. SingleOutputStreamOperator> sum = wordCountKeyBy.sum(1);
    16. sum.print();
    17. env.execute();
    18. }
    • 效果展示

    四、Demo3 无边界流处理

    • 源码

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. DataStreamSource lineDS = env.socketTextStream("192.168.3.11", 9999);
    4. SingleOutputStreamOperator> sum = lineDS.flatMap(
    5. (String value, Collector> out) -> {
    6. String[] words = value.split(" ");
    7. for (String word : words) {
    8. out.collect(Tuple2.of(word, 1));
    9. }
    10. }
    11. ).returns(Types.TUPLE(Types.STRING, Types.INT))
    12. .keyBy(value -> value.f0)
    13. .sum(1);
    14. sum.print();
    15. env.execute();
    16. }
    • 效果展示 

    往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计

  • 相关阅读:
    keepalived+nginx高可用 脑裂监控
    计算机设计大赛 深度学习猫狗分类 - python opencv cnn
    小白能理解的奈奎斯特采样及延伸出的理论
    小程序分销商城功能展示;
    【SQL 语言艺术】数据库三范式
    分析思路:数据结构
    Python中的索引和切片
    爬虫 day 02 bs4的作用
    如何使用 PHP 的内置 Web 服务器快速测试网站
    [RK3568][Android11] Tasklet
  • 原文地址:https://blog.csdn.net/a80C51/article/details/138046933