• Flink学习(七)-单词统计


    前言

    Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。

    一、代码基础格式

    1. //1st 设置执行环境
    2. xxxEnvironment env = xxxEnvironment.getEnvironment;
    3. //2nd 设置流
    4. DataSource xxxDS=env.xxxx();
    5. //3rd 设置转换
    6. Xxx transformation =xxxDS.xxxx();
    7. //4th 设置sink
    8. transformation.print();
    9. //5th 可能需要
    10. env.execute();

    二、Demo1 批处理

    • 源码

    1. public static void main(String[] args) throws Exception {
    2. //1,创建一个执行环境
    3. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    4. //2,获取输入流
    5. DataSource lineDS = env.readTextFile("input/word.txt");
    6. //3,处理数据
    7. FlatMapOperator> wordDS = lineDS.flatMap(new FlatMapFunction>() {
    8. @Override
    9. public void flatMap(String value, Collector> collector) throws Exception {
    10. //3.1 分隔字符串
    11. String[] values = value.split(" ");
    12. //3.2 汇总统计
    13. for (String word : values) {
    14. Tuple2 wordTuple = Tuple2.of(word, 1);
    15. collector.collect(wordTuple);
    16. }
    17. }
    18. });
    19. //4,按单词聚合
    20. UnsortedGrouping> tuple2UnsortedGrouping = wordDS.groupBy(0);
    21. //5,分组内聚合
    22. AggregateOperator> sum = tuple2UnsortedGrouping.sum(1);
    23. //6,输出结果
    24. sum.print();
    25. }
    • 效果展示

    三、Demo2 流处理

    • 源码

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. DataStreamSource lineDS = env.readTextFile("input/word.txt");
    4. SingleOutputStreamOperator> wordDS = lineDS.flatMap(new FlatMapFunction>() {
    5. @Override
    6. public void flatMap(String value, Collector> collector) throws Exception {
    7. String[] words = value.split(" ");
    8. for (String word : words) {
    9. Tuple2 temp = Tuple2.of(word, 1);
    10. collector.collect(temp);
    11. }
    12. }
    13. });
    14. KeyedStream, Tuple> wordCountKeyBy = wordDS.keyBy(0);
    15. SingleOutputStreamOperator> sum = wordCountKeyBy.sum(1);
    16. sum.print();
    17. env.execute();
    18. }
    • 效果展示

    四、Demo3 无边界流处理

    • 源码

    1. public static void main(String[] args) throws Exception {
    2. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. DataStreamSource lineDS = env.socketTextStream("192.168.3.11", 9999);
    4. SingleOutputStreamOperator> sum = lineDS.flatMap(
    5. (String value, Collector> out) -> {
    6. String[] words = value.split(" ");
    7. for (String word : words) {
    8. out.collect(Tuple2.of(word, 1));
    9. }
    10. }
    11. ).returns(Types.TUPLE(Types.STRING, Types.INT))
    12. .keyBy(value -> value.f0)
    13. .sum(1);
    14. sum.print();
    15. env.execute();
    16. }
    • 效果展示 

    往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计

  • 相关阅读:
    【建造者设计模式详解】Java/JS/Go/Python/TS不同语言实现
    ubuntu生成pem证书连接服务器(已验证)
    二十三、商城 - 商品录入-新增商品(11)
    【PG】PostgreSQL 预写日志(WAL)、checkpoint、LSN
    【RocketMQ系列十二】RocketMQ集群核心概念之主从复制&生产者负载均衡策略&消费者负载均衡策略
    配置国际化解析文件及生效探究
    四工业控制系统的攻击场景研究
    SpringBoot SpringBoot 开发实用篇 5 整合第三方技术 5.22 RabbitMQ 安装
    使用AWS的API Gateway实现websocket
    《数据结构与算法》之队列与链表复习
  • 原文地址:https://blog.csdn.net/a80C51/article/details/138046933