DataStream API主要分为3块:DataSource、Transformation、DataSink
代码:
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
-
- import org.apache.flink.api.scala._
- val text1 = env.fromCollection(Array(1, 2, 3, 4, 5))
- val text2 = env.fromCollection(Array(6, 7, 8, 9, 10))
-
- val unionStream = text1.union(text2)
- unionStream.print().setParallelism(1)
- env.execute("StreamUnionScala")
- }
结果:
两个流被connect之后,只是被放到了同一个流中,它们内部依然保持各自的数据和形式不发生任何变 化,两个流相互独立。 connect方法会返回connectedStream,在connectedStream中需要使用CoMap、CoFlatMap这种函 数,类似于map和flatmap
-
- val text1 = env.fromElements("user:libaowen,age:18")
-
- val text2 = env.fromElements("user:jack_age:20")
-
- val connectionStream = text1.connect(text2)
- connectionStream.map(new CoMapFunction[String, String, String] {
- override def map1(value: String): String = {
- value.replace(",", "-")
- }
- override def map2(value: String): String = {
- value.replace("_ ", "-")
- }
- }).print().setParallelism(1)
结果:
split切分的流无法进行二次切分,并且split方法已经标记为过时了,官方不推荐使用,现在官方推荐 使用side output的方式实现。
- def main(args: Array[String]): Unit = {
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
- //按照数据的奇偶性对数据进行分流
- //首先定义两个sideoutput来准备保存切分出来的数据
- val outputTag1 = new OutputTag[Int]("even") {} //保存偶数
- val outputTag2 = new OutputTag[Int]("odd") {} //保存奇数
-
- val subOutputStream = text.process(new ProcessFunction[Int, Int] {
- override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context, collector: Collector[Int]): Unit = {
-
- if (value <= 5) {
- ctx.output(outputTag1, value)
- } else {
- ctx.output(outputTag2, value)
- }
- }
- })
- val lowStream = subOutputStream.getSideOutput(outputTag1)
- //获取大于5的数据流
- val highStream = subOutputStream.getSideOutput(outputTag2)
- lowStream.print().setParallelism(1)
- highStream.print().setParallelism(1)
- env.execute("StreamUnionScala")
- }
-
-
结果:
rescale与rebalance的区别是rebalance会产生全量重分区,而rescale不会。