Flink是流批一体的框架。因此既可以处理以流的方式处理,也可以按批次处理。
- //1st 设置执行环境
- xxxEnvironment env = xxxEnvironment.getEnvironment;
-
- //2nd 设置流
- DataSource xxxDS=env.xxxx();
-
- //3rd 设置转换
- Xxx transformation =xxxDS.xxxx();
-
- //4th 设置sink
- transformation.print();
-
- //5th 可能需要
- env.execute();
- public static void main(String[] args) throws Exception {
- //1,创建一个执行环境
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- //2,获取输入流
- DataSource
lineDS = env.readTextFile("input/word.txt"); - //3,处理数据
- FlatMapOperator
> wordDS = lineDS.flatMap(new FlatMapFunction>() { - @Override
- public void flatMap(String value, Collector
> collector) throws Exception { - //3.1 分隔字符串
- String[] values = value.split(" ");
- //3.2 汇总统计
- for (String word : values) {
- Tuple2
wordTuple = Tuple2.of(word, 1); - collector.collect(wordTuple);
- }
- }
- });
- //4,按单词聚合
- UnsortedGrouping
> tuple2UnsortedGrouping = wordDS.groupBy(0); - //5,分组内聚合
- AggregateOperator
> sum = tuple2UnsortedGrouping.sum(1); -
- //6,输出结果
- sum.print();
- }
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource
lineDS = env.readTextFile("input/word.txt"); -
- SingleOutputStreamOperator
> wordDS = lineDS.flatMap(new FlatMapFunction>() { - @Override
- public void flatMap(String value, Collector
> collector) throws Exception { - String[] words = value.split(" ");
- for (String word : words) {
- Tuple2
temp = Tuple2.of(word, 1); - collector.collect(temp);
- }
- }
- });
-
- KeyedStream
, Tuple> wordCountKeyBy = wordDS.keyBy(0); - SingleOutputStreamOperator
> sum = wordCountKeyBy.sum(1); - sum.print();
- env.execute();
-
- }
- public static void main(String[] args) throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- DataStreamSource
lineDS = env.socketTextStream("192.168.3.11", 9999); -
- SingleOutputStreamOperator
> sum = lineDS.flatMap( - (String value, Collector
> out) -> { - String[] words = value.split(" ");
- for (String word : words) {
- out.collect(Tuple2.of(word, 1));
- }
- }
- ).returns(Types.TUPLE(Types.STRING, Types.INT))
- .keyBy(value -> value.f0)
- .sum(1);
-
- sum.print();
-
- env.execute();
-
- }
往192.168.3.11的9999端口上持续输送数据流,程序端会出现如下统计