spark中我们经常使用groupby算子对数据进行聚合。flink中,不仅有groupby算法,还有keyby算子,那么这两者的区别在哪里?
直接说结论:
groupby是用在DataSet系列API中,Table/SQL等操作也是使用groupby。
keyby是用在DataStream系列API中。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class GroupBy {
public static void groupbycode() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataSet text = env.fromElements("java python java python python c");
DataSet> dataSet = text.flatMap(new FlatMapFunction>() {
@Override
public void flatMap(String value, Collector> out) throws Exception {
for(String word: value.split(" ")) {
out.collect(new Tuple2<>(word, 1));
}
}
});
dataSet = dataSet.groupBy(0)
.sum(1);
dataSet.print();
}
public static void main(String[] args) throws Exception {
groupbycode();
}
}
上面可以认为是batch版的wordcount操作,对于DataSet使用的就是groupBy操作。
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamWordCount {
public static final class Splitter implements FlatMapFunction> {
@Override
public void flatMap(String s, Collector> collector) throws Exception {
for(String word: s.split(" ")) {
collector.collect(new Tuple2(word, 1));
}
}
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.sum(1);
dataStream.print();
env.execute("Window WordCount");
}
}
上面是stream版的wordcount操作,对于DataStream数据,使用的则是keyby算子。