DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._才能在Scala中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。
package com.atguigu.transform
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
object Transform {
def main(args: Array[String]): Unit = {
//创建SparkConf
val sparkConf: SparkConf = new SparkConf().setMaster(“local[*]”).setAppName(“WordCount”)
//创建StreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(3))
//创建DStream
val lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream(“hadoop102”, 9999)
//转换为RDD操作
val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {
val words: RDD[String] = rdd.flatMap(_.split(” “))
val wordAndOne: RDD[(String, Int)] = words.map((_, 1))
val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)
value
})
//打印
wordAndCountDStream.print
//启动
ssc.start()
ssc.awaitTermination()
}
}