- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_2.11</artifactId>
- <version>1.14.6</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients</artifactId>
- <version>${flink.version}</version>
- </dependency>
1.增添依赖
2.在根目录,添加input文件
- package org.example;
- /*
- * @Auther:huangzhiyang
- * @Date:2023/9/26
- * @Description:wc
- */
- import org.apache.flink.api.common.functions.FlatMapFunction;
- import org.apache.flink.api.java.ExecutionEnvironment;
- import org.apache.flink.api.java.operators.AggregateOperator;
- import org.apache.flink.api.java.operators.DataSource;
- import org.apache.flink.api.java.operators.FlatMapOperator;
- import org.apache.flink.api.java.operators.UnsortedGrouping;
- import org.apache.flink.api.java.tuple.Tuple2;
- import org.apache.flink.util.Collector;
-
- public class wordCountBatchDemo {
- public static void main(String[] args) throws Exception {
- // TODO: 2023/9/26 创建执行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // TODO: 2023/9/26 读取数据
- DataSource<String> lineDS = env.readTextFile("input/word.txt");
- // TODO: 2023/9/26 切分转换
- FlatMapOperator<String, Tuple2<String, Integer>> wordAndOne = lineDS.flatMap(new FlatMapFunction
>() { - @Override
- public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
- // TODO: 2023/9/26 按照空格切分单词
- String[] words = s.split(" ");
- // TODO: 2023/9/26 将单词转为tuple2
- for (String word : words) {
- Tuple2<String, Integer> tuple2 = Tuple2.of(word, 1);
- // TODO: 2023/9/26 使用collector向下游发送数据
- collector.collect(tuple2);
- }
- }
- });
- // TODO: 2023/9/26 按照word分组
- UnsortedGrouping<Tuple2<String, Integer>> wordAndOneGroupBY = wordAndOne.groupBy(0);
- // TODO: 2023/9/26 各分组内聚合
- AggregateOperator<Tuple2<String, Integer>> sum = wordAndOneGroupBY.sum(1);//1是位置,表示第二个元素
- // TODO: 2023/9/26 输出
- sum.print();
- }
- }