• flink groupby keyby区别


    1.groupby与keyby区别

    spark中我们经常使用groupby算子对数据进行聚合。flink中,不仅有groupby算法,还有keyby算子,那么这两者的区别在哪里?
    直接说结论:
    groupby是用在DataSet系列API中,Table/SQL等操作也是使用groupby。
    keyby是用在DataStream系列API中。

    2.groupby简单实例

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class GroupBy {
        public static void groupbycode() throws Exception {
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataSet text = env.fromElements("java python java python python c");
            DataSet> dataSet = text.flatMap(new FlatMapFunction>() {
                @Override
                public void flatMap(String value, Collector> out) throws Exception {
                    for(String word: value.split(" ")) {
                        out.collect(new Tuple2<>(word, 1));
                    }
                }
            });
            dataSet = dataSet.groupBy(0)
                    .sum(1);
            dataSet.print();
        }
    
        public static void main(String[] args) throws Exception {
            groupbycode();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    上面可以认为是batch版的wordcount操作,对于DataSet使用的就是groupBy操作。

    3.keyby简单实例

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    
    public class StreamWordCount {
    
        public static final class Splitter implements FlatMapFunction> {
            @Override
            public void flatMap(String s, Collector> collector) throws Exception {
                for(String word: s.split(" ")) {
                    collector.collect(new Tuple2(word, 1));
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream> dataStream = env
                    .socketTextStream("localhost", 9999)
                    .flatMap(new Splitter())
                    .keyBy(value -> value.f0)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                    .sum(1);
    
            dataStream.print();
            env.execute("Window WordCount");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    上面是stream版的wordcount操作,对于DataStream数据,使用的则是keyby算子。

  • 相关阅读:
    过滤器和拦截器的区别
    C - 指针与数组的区别
    小程序中实现付款功能
    Qt应用软件【测试工具】Dr.Memory动态测试内存
    探索程序员需要掌握的算法?
    链表-真正的动态数据结构
    LeetCode - 698 划分为k个相等的子集
    线上问题排查实例分析|关于 Redis 内存泄漏
    LCOV 工具来统计 Google Test 的代码覆盖率
    【GA-LSSVM预测】基于遗传算法优化最小二乘支持向量机的回归预测(MATLAB代码实现)
  • 原文地址:https://blog.csdn.net/bitcarmanlee/article/details/126902557