• flink 一个简单的wordcount


    package com.jackray.soullan.flink;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.AggregateOperator;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.operators.FlatMapOperator;
    import org.apache.flink.api.java.operators.UnsortedGrouping;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class WordCount {
    
        public static void main(String[] args) throws Exception {
            // todo 1、创建集群环境
            ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
    
            // todo 2、读取数据、从文件中读取
            DataSource<String> stringDataSource = executionEnvironment.readTextFile("input/mydata.txt");
    
            // todo 3、切分、转换(word 1), 基于interface写一个匿名类
            FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = stringDataSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    // todo 3.1 按照空格切分数组
                    String[] words = value.split(" ");
                    for (String word : words) {
                        Tuple2<String, Integer> wordTuple2 = Tuple2.of(word, 1);
                        // todo 3.2 使用collector 向下下游发送数据
                        out.collect(wordTuple2);
                    }
                }
            });
    
            // todo 4、各分组内聚
            UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBy = wordAndOne.groupBy(0);
    
            // todo 5、各个分组内聚合
            AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroupBy.sum(1); //这里的1是位置
    
            // todo 6、输出
            sum.print();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    	
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>com.jackray.soullandgroupId>
        <artifactId>soullandartifactId>
        <version>1.0-SNAPSHOTversion>
    
        <properties>
            <flink.version>1.17.0flink.version>
        properties>
    
        <dependencies>
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-streaming-javaartifactId>
                <version>${flink.version}version>
            dependency>
    
            
            <dependency>
                <groupId>org.apache.flinkgroupId>
                <artifactId>flink-clientsartifactId>
                <version>${flink.version}version>
            dependency>
    
    
        dependencies>
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
  • 相关阅读:
    vue3 源码解析(1)— reactive 响应式实现
    01_深度学习基础知识
    java伪共享问题
    Clickhouse分布式集群搭建
    XSS测试
    使用Scanner类进行控制台输入
    【表面缺陷检测】表面缺陷检测数据集汇总
    基于信通院 Serverless 工具链模型的实践:Serverless Devs
    opencv的相机校准和3D建模的理论知识
    torch.distributed.launch 指定端口rdzv_endpoint
  • 原文地址:https://blog.csdn.net/ppwwp/article/details/133256234