
1.将数据切分
2.拍扁

示例:
输出结果:
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object flatMapTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds
val ds = env.fromElements("hadoop is good", "spark is fast", "flink is better")
val flatMapedDs = ds.flatMap(_.split(" "))
flatMapedDs.print()
env.execute()
}
}
输出结果:


1.继承FlatMapFunction类
2.重写flatMap方法
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.util.Collector
object myFlatMapTest {
//defined my flatMap func
//judge the word length,bigger than special length then split,else do nothing
class myFlatMap(maxWordLength:Int) extends FlatMapFunction[String, String]{
override def flatMap(t: String, collector: Collector[String]): Unit = {
if (t.length>maxWordLength){
t.split(" ").foreach(collector.collect)
}
}
}
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create ds
val ds = env.fromElements("this is a good morning, long long long and long", "that day is friday")
//use my flatMap func
val flatMapedDs = ds.flatMap(new myFlatMap(25))
flatMapedDs.print()
env.execute()
}
}
输出结果:
