• Flink学习16:算子介绍map


    1.算子类型简介

     

     

     

    2.map

     

    示例:

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    
    object mapTest {
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //create the data source
        val ds = env.fromElements(1, 2, 3, 4, 5)
    
        val newDs = ds.map(x => x + 10)
        
        newDs.print()
    
        env.execute("map test")
    
    
    
    
      }
    
    }
    

    执行结果:

     

    3.自定义map

     

    核心操作:

    1.创建自定义map类,继承 RichMapFunction 类

    2.重写RichMapFunction 类的 map 方法,编辑 in 的处理逻辑

     示例:

    import org.apache.flink.api.common.functions.RichMapFunction
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    
    object myMapTest {
    
    
    
    
      //override map func
      class MyMapFunc extends RichMapFunction[Int,String]{
        override def map(in: Int): String = {
          "Input: "+in.toString+", OutPut:"+(in*3).toString
          }
        }
    
      def main(args: Array[String]): Unit = {
    
        //create env
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //create dataSource
        val ds = env.fromElements(1, 2, 3, 4, 5)
        
        //use myMap func to transformation
        val newDs = ds.map(new MyMapFunc)
    
        //pirnt
        newDs.print()
    
        //execute
        env.execute()
    
    
      }
    
    }
    

    输出结果:

     

  • 相关阅读:
    计算机网络:运输层
    VMware虚拟机安装Ubuntu教程
    P 算法与 K 算法
    chrome 插件 Mobile simulator
    XJTUSE-数据结构-homework2
    1.7.1、常见的计算机网络体系结构
    ArrayDeque类的常见用法(java)
    RabbitMQ相关问题
    D2. Sage‘s Birthday (hard version)
    数字孪生技术在光网络中的应用与问题
  • 原文地址:https://blog.csdn.net/hzp666/article/details/126259789