DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输
出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及
各种 Window 相关的原语
无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每
一个 RDD。部分无状态转化操作列在了下表中。注意,针对键值对的 DStream 转化操作(比如
reduceByKey())要添加 import StreamingContext._才能在 Scala 中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个 DStream 在内部
是由许多 RDD(批次)组成,且无状态转化操作是分别应用到每个 RDD 上的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。即使这些函数并没有在 DStream
的 API 中暴露出来,通过该函数可以方便的扩展 Spark API。该函数每一批次调度一次。其实也
就是对 DStream 中的 RDD 应用转换。
package com.spack.bigdata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* DStream转换
* Transform状态操作
*/
object SparkStreaming05_State_Transform {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val scc = new StreamingContext(sparkConf, Seconds(3))
scc.checkpoint("cp") //需要设定检查点路径
val lines = scc.socketTextStream("localhost", 9999)
//transform方法可以将底层RDD获取到后进行操作
//1、DStream功能不完善
//TODO 2、需要代码周期性的执行
// Code: Driver端
val newDS: DStream[String] = lines.transform(
rdd => {
//Code Driver端,(周期性执行) 向 Executor端 传数据
rdd.map(
str => {
//Code: Executor端 --RDD执行
str
}
)
}
)
//Code : Driver
val newDS1 = lines.map(
data => {
//Code : Executor端
data
}
)
scc.start()
scc.awaitTermination()
}
}
两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是
对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。
package com.spack.bigdata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* DStream转换
* join状态操作
*/
object SparkStreaming05_State_Join {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val scc = new StreamingContext(sparkConf, Seconds(3))
scc.checkpoint("cp") //需要设定检查点路径
val data9999 = scc.socketTextStream("localhost", 9999)
val data8888 = scc.socketTextStream("localhost", 8888)
val map9999: DStream[(String, Int)] = data9999.map((_, 9))
val map8888: DStream[(String, Int)] = data8888.map((_, 8))
//所谓的DStream的join操作,其实就是两个RDD的join
val joinDS: DStream[(String, (Int, Int))] = map9999.join(map8888)
joinDS.print()
scc.start()
scc.awaitTermination()
}
}
结果
Time: 1658284401000 ms
-------------------------------------------
(a,(9,8))
(a,(9,8))
(a,(9,8))
-------------------------------------------
传输
UpdateStateByKey 原语用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例
如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量
的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指
定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数
据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对
应的(键,状态)对组成的。
updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功
能,需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更
新。
使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态。
更新版的 wordcount
package com.spack.bigdata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* DStream转换
* 状态操作
*/
object SparkStreaming05_State {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val scc = new StreamingContext(sparkConf, Seconds(3))
scc.checkpoint("cp") //需要设定检查点路径
/**
* 无状态数据操作,只对当前的采集周期内的数据进行处理
* 在某些场合下,需要保留数据统计结果(状态),实现数据的汇总
* 使用有状态操作时,需要设定检查点路径
*/
val datas = scc.socketTextStream("localhost", 9999)
val wordToOne = datas.map((_, 1))
// val value = wordToOne.reduceByKey((_ + _))
/**
* updateStateByKey :根据Key对数据的状态进行更新
* 传递的参数中含有两个值
* 第一个值表示相同的Key的Value数据
* 第二个值表示缓冲区相同的Key的Value数据
*/
val state = wordToOne.updateStateByKey(
(seq: Seq[Int], opt: Option[Int]) => {
val newCount = opt.getOrElse(0) + seq.sum
Option(newCount) //更新
}
)
state.print()
scc.start()
scc.awaitTermination()
}
}
Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许
状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
➢ 窗口时长:计算内容的时间范围;
➢ 滑动步长:隔多久触发一次计算。
注意:这两者都必须为采集周期大小的整数倍。
WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒。
package com.spack.bigdata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* DStream转换
* Window状态操作
*/
object SparkStreaming05_State_Window {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val scc = new StreamingContext(sparkConf, Seconds(3))
scc.checkpoint("cp") //需要设定检查点路径
val lines = scc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_, 1))
//TODO 窗口范围应该是采集周期的整数倍
//窗口可以滑动的、但是默认情况下,一个采集周期进行滑动
//这样的话,可能会出现重复数据的计算,为了避免这种情况,可以改变滑动的滑动(步长)
//TODO 第一个采集周期、第二个滑动步长
val windowDS: DStream[(String, Int)] = wordToOne.window(Seconds(6),Seconds(6))
val wordCount = windowDS.reduceByKey(_ + _)
wordCount.print()
scc.start()
scc.awaitTermination()
}
}
关于 Window 的操作还有如下方法:
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个
新的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间
流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)
对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数
据使用 reduce 函数来整合每个 key 的 value 值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函
数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。
通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例
子是随着窗口滑动对 keys 的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”
可逆的 reduce 函数”,也就是这些 reduce 函数有相应的”反 reduce”函数(以参数 invFunc 形式
传入)。如前述函数,reduce 任务的数量通过可选参数来配置。
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
{(x, y) => x + y},
{(x, y) => x - y},
Seconds(30),
Seconds(10))
//加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长
countByWindow()和 countByValueAndWindow()作为对数据进行计数操作的简写。
countByWindow()返回一个表示每个窗口中元素个数的 DStream,而 countByValueAndWindow()
返回的 DStream 则包含窗口中每个值的个数
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30),
Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
package com.spack.bigdata.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* DStream转换
* reduceByKeyAndWindow状态操作
*/
object SparkStreaming05_State_Window1 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
val scc = new StreamingContext(sparkConf, Seconds(3))
scc.checkpoint("cp") //需要设定检查点路径
val lines = scc.socketTextStream("localhost", 9999)
val wordToOne = lines.map((_, 1))
/**
* reduceByKeyAndWindow: 当窗口范围比较大,但是滑动幅度比较小,那么可以采用增加数据和删除的方式
* 无需重复计算
*/
val windowDS: DStream[(String, Int)] =
wordToOne.reduceByKeyAndWindow(
(x: Int, y: Int) => {x+y},
(x: Int, y: Int) => {x-y},
Seconds(9),Seconds(3)
)
windowDS.print()
scc.start()
scc.awaitTermination()
}
}