




示例:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._
object mapTest {
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create the data source
val ds = env.fromElements(1, 2, 3, 4, 5)
val newDs = ds.map(x => x + 10)
newDs.print()
env.execute("map test")
}
}
执行结果:

1.创建自定义map类,继承 RichMapFunction 类
2.重写RichMapFunction 类的 map 方法,编辑 in 的处理逻辑
import org.apache.flink.api.common.functions.RichMapFunction
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object myMapTest {
//override map func
class MyMapFunc extends RichMapFunction[Int,String]{
override def map(in: Int): String = {
"Input: "+in.toString+", OutPut:"+(in*3).toString
}
}
def main(args: Array[String]): Unit = {
//create env
val env = StreamExecutionEnvironment.getExecutionEnvironment
//create dataSource
val ds = env.fromElements(1, 2, 3, 4, 5)
//use myMap func to transformation
val newDs = ds.map(new MyMapFunc)
//pirnt
newDs.print()
//execute
env.execute()
}
}
输出结果:
