• SPark学习笔记:13 Spark Streaming 的Transform算子和Action算子


    概述

    和RDD类似,DStreams也有一些转换算子用于处输入流中的数据。DStream中有很多转换算子和RDD的转换算子一样,同时也提供了一些额外的算子。此文将总结DStreams的各种算子的使用。

    Transformations on DStreams

    map

    作用在DStream上,用法同RDD的map.一个输入对应一个输出。

    flatMap

    说明:对源DStream中的每一个元素,作为flatMap函数的输入进行计算处理生成一个新的DStream,一个输入对应一个或者多个输出

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.0.52",8888)
    val sensorDs:DStream[String] = line.flatMap(data=>data.split(","))
    
    • 1
    • 2

    filter

    说明:过滤符合条件的记录,true保留,false过滤

    val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.0.52",8888)
    val sensorDs:DStream[(String,SensorReading)] = line
      .filter(_.nonEmpty)
        .map(data=>{
        val arr = data.split(",")
          (arr(0),SensorReading(arr(0),arr(1).toLong,arr(2).toDouble))
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    repartiton

    说明:重分区

    union

    说明:合并两个DStream,DStream的元素的数据类型必须一致

    count

    说明:统计DStream中元素的个数,和RDD的count操作不同,DStream的count是一个懒加载的操作。

    countByValue

    说明: 对DStream中的元素按照VALUE进行统计,输出(V,Long)类型的DStream。

    reduce

    说明: 对DStream[K]中的每个对象进行reduce运算,输出DStream[K]类型的数据

    sensorDs.reduce{ case(first:(String,SensorReading),second:(String,SensorReading))=>
        if(first._2.temperature>second._2.temperature){
          first
        }else{
          second
        }
    }.print()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    reduceByKey

    说明: 对DStream[K,V]类型的DStream中的元素按照key分组,进行reduce运算,输出DStream[K,V]类型的数据

    sensorDs.reduceByKey((first:SensorReading,second:SensorReading)=>{
      if(first.temperature>second.temperature){
        first
      }else{
        second
      }
    }).print()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    join

    说明 类似于关系型数据库表的join操作,连接两个DStream,作用在DStream[K,V]和DStream[K,W]的两个DStream上,输出一个DStream[K,(V,W)]类型的DStream。

    cogroup

    说明 作用在两个DStream[K,V]和DStream[K,W]类型的DStream上,输出一个新的DStream[K,SEQ[V],SEQ[W]]类型的DStream。

    transform

    说明: Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以很方便的扩展DStream的API。该函数每一批次调度一次。

    val conf:SparkConf = new SparkConf()
    conf.setMaster("local[*]").setAppName("DStreamTestApp")
    
    val ssc:StreamingContext = new StreamingContext(conf,Seconds(3))
    import StreamingContext._
    val line: ReceiverInputDStream[String] = ssc.socketTextStream("192.168.0.52",8888)
    val sensorDs:DStream[(String,SensorReading)] = line
      .filter(_.nonEmpty)
        .map(data=>{
        val arr = data.split(",")
          (arr(0),SensorReading(arr(0),arr(1).toLong,arr(2).toDouble))
    })
    val transedDs:DStream[SensorReading] = sensorDs.transform(data=>{
      //data是一个RDD,可以使用RDD的API进行操作
      val data2:RDD[(String,SensorReading)] = data.filter(el=>{
        if(el._2.temperature>60){
          true
        }else{
          false
        }
      })
      //取最大温度的记录,并转将RDD(String,SensorReading)转换为RDD(SensorReading)
      val data3:RDD[SensorReading] = data2.reduceByKey((first,second)=>{
        if(first.temperature>second.temperature){
          first
        }else{
          second
        }
      }).map(_._2)
      
      //结果返回另一个RDD
      data3
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 该函数的使用场景有很多,比如从文件中读取一个DataSet,然后可以使用该方法与实时流中的DStream中的RDD进程合并等操作。

    updateStateByKey

    说明: updateStateByKey用于记录历史记录的状态值,有时候我们需要在DStream中跨批次卫华状态(例如WordCount中统计Word的累加值)。针对这种情况,updateStateByKey提供了一个对状态变量的访问。对于键值形式的DStream,给定一个由(键、事件)对个偶成的DStream,并传递一个指定如何根据新的事件更新每个键值对应状态的函数,他可以构建出一个新的DStream。
    updateStateByKey操作使得我们可以在用新的信息进行更新时保持任意的状态。只要两步,我们就可以使用这个功能:

    • 定义状态,状态可以是一个任意的数据类型
    • 定义状态更新函数,此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。

    示例一:wordcount,统计输入流中每个word出现的频率。

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StateApp {
    
      def main(args: Array[String]): Unit = {
        val conf:SparkConf = new SparkConf()
        conf.setAppName("StateAppTest").setMaster("local[*]")
    
        //构建StreamContext
        val ssc:StreamingContext = new StreamingContext(conf,Seconds(3))
        
        //使用updateStateByKey算子,需要设定checkpoint的目录
        ssc.checkpoint("./checkpoint")
        
        //构建一个socket文本流
        val strDs:DStream[String] = ssc.socketTextStream("192.168.0.52",8888)
        
        //构建DStream[(String,Long)]键值对类型的DStream
        val paris:DStream[(String,Long)] = strDs.filter(_.nonEmpty)
          .flatMap(data=>{data.split(",")}).map((_,1))
        
        //定义一个LONG类型的状态,并定义状态更新函数
        paris.updateStateByKey[Long]((values:Seq[Long],state:Option[Long])=>{
         //状态更新函数有两个参数:
         //参数一:是新的批次的以Key分组后的值的序列
         //参数二:是上一批次处理完毕时记录的状态的值
         
         //取上一批次的状态值
          val prev_ttls:Long = state.getOrElse(0L)
          //当前批次的值处理
          val current_ttls = values.foldLeft(0L)((data1,data2)=>data1+data2)
          
          //更新状态,为上一次的值+这一批次的值
          Some(prev_ttls+current_ttls)
        }).print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    示例二:记录每一个温度传感器的最高温度

    import com.hjt.yxh.hw.sparksql.SensorReading
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.DStream
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object StateApp {
    
      def main(args: Array[String]): Unit = {
        val conf:SparkConf = new SparkConf()
        conf.setAppName("StateAppTest").setMaster("local[*]")
    
        //构建StreamContext
        val ssc:StreamingContext = new StreamingContext(conf,Seconds(3))
        ssc.checkpoint("./checkpoint")
        val strDs:DStream[String] = ssc.socketTextStream("192.168.0.52",8888)
    
        val sensorDs:DStream[(String,SensorReading)] =
          strDs.filter(_.nonEmpty)
            .map(data=>{
            val arry = data.split(",")
            val sensor = SensorReading(arry(0),arry(1).toLong,arry(2).toDouble)
              (sensor.id,sensor)
          })
    
          val updateSensorState = (values:Seq[SensorReading],state:Option[SensorReading])=>{
            //定义的状态类型是SensorReading
            val prevSensor = state.getOrElse(values.apply(0))
            
            //迭代,记录温度最高的Sensor
            val max = values.foldLeft(prevSensor)((maxSensor,data)=>{
              if(data.temperature>maxSensor.temperature){
                data
              }else{
                maxSensor
              }
            })
            //更新状态
            Some(max)
          }
            
        sensorDs.updateStateByKey[SensorReading](updateSensorState).print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    Tips: 包括windows的相关算子在内,以上所有的转换算子都是“懒执行”的,如果整个应用中都没有行动算子,那么相关的计算操作将不会被执行。

    Spark Streaming的行动算子 Output Operations On DStream

    输出操作允许将DStream中的数据推送到外部系统,比如数据库或者文件系统。由于输出操作实际允许外部系统使用转换后的数据,所以他们会触发所有的转换算子的执行。(同RDD的行动算子)

    print

    说明: 在Driver节点上打印出DStream的每一批次中的前10条记录。通常用于开发调试阶段。

    saveAsTextFiles(prefix,[suffix])

    说明: 将DSteam的内容保存为文本文件,每一个批次生成一个文件,文件名以prefix前缀-时间(毫秒)[.fuffix]后缀命名。

    saveASObjectFiles(prefix,[suffix])

    说明: 将DSteam的内容保存为一个序列化的对象文件,使用java的Object序列化。每一个批次生成一个文件,文件名以prefix前缀-时间(毫秒)[.fuffix]后缀命名。

    saveAsHadoopFiles(prefix, [suffix])

    说明: 将DSteam的内容保存为hadoop的文件,使用java的Object序列化。每一个批次生成一个文件,文件名以prefix前缀-时间(毫秒)[.fuffix]后缀命名。

    foreachRDD(func)

    说明:
    这是最通用的输出操作,即将函数func用于产生于stream的每一个RDD。其中参数传入的函数 func 应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和 transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark 中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。

    //写入到数据库中
    sensorStream.foreachRDD(rdd=>{
      //在Driver端执行
      //todolist
      println("executor at driver end")
    
      rdd.foreachPartition(
        rddPartiton=>{
          //在Executor端执行
          //创建数据库连接
          println("executor at driver Executor")
            for (elem <- rddPartiton) {
            println(elem)
              //每条记录执行一次
    
          }
          //在Executor端执行,每个Partition执行一次
    
        }
      )
    })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    注意:

    • 连接不能写在 driver 层面,因为数据的存取操作是在Executor中完成的,在Driver端创建的连接没办法在Executor中使用。(跨机器或者跨了JVM进程了)
    • 如果写在 foreach则每个RDD中的每一条数据都创建,得不偿失;
    • 增加 foreachPartition,在分区创建(获取)
  • 相关阅读:
    编译器优化:何为别名分析
    Oracle故障诊断方法
    TDengine 3.0 三大创新详解
    交替合并字符串
    概率论与数理统计
    【精】一张图搞懂是SQL join,图解SQL的7种JOIN
    MedNeRF:用于从单个X射线重建3D感知CT投影的医学神经辐射场
    鄙视测试,理解测试,成为测试
    【Educoder离散数学实训】生成真值表
    SOLIDWORKS参数化设计之干涉检查
  • 原文地址:https://blog.csdn.net/wangzhongyudie/article/details/126391904