在流式处理系统中,流量控制(rate control/rate limit)是一个非常重要的话题。对系统进行流控,主要目的是为了保证运行的稳定性,防止突发大流量造成整个系统的扰动(throttle),长时间或剧烈的扰动甚至会使系统宕机。另外,为了保证系统的吞吐量最大化,也需要设计合理的流控门槛,避免系统空转使资源利用率降低。
Spark Streaming作为基于微批次(micro-batch)的流处理框架,其流量的理想状态就是官方文档中所说的“batches of data should be processed as fast as they are being generated”,即每一批次的处理时长batch_process_time需要小于(但是又比较接近)我们设定的批次间隔batch_interval。如果batch_process_time > batch_interval,说明程序的处理能力不足,积累的数据越来越多,最终会造成Executor内存溢出。如果batch_process_time << batch_interval,说明系统有很长时间是空闲的,应该适当提升流量。
Spark Streaming通过Executor里的Receiver组件源源不断地接收外部数据,并通过BlockManager将外部数据转化为Spark中的块进行存储。Spark Streaming机制的简单框图如下所示。

要限制Receiver接收数据的速率,可以在SparkConf中设置配置项spark.streaming.receiver.maxRate,单位为数据条数/秒。如果采用的是基于Direct Stream方式的Kafka连接,不经过Receiver,就得设置配置项spark.streaming.kafka.maxRatePerPartition来限流,单位是每分区的数据条数/秒。
这两种方式的优点是设置非常简单,只需要通过实际业务的吞吐量估算一下使批次间隔和处理耗时基本达到平衡的速率就可以了。缺点是一旦业务量发生变化,就只能手动修改参数并重启Streaming程序。另外,人为估计的参数毕竟有可能不准,设置得太激进或太保守都不好。
所以,Spark后来提出了动态流量控制的方案,能够根据当前系统的处理速度智能地调节流量阈值,名为反压(back pressure)机制。其在1.5版本开始加入,ASF JIRA中对应的issue是SPARK-7398。要启用它,只需要将配置项spark.streaming.backpressure.enabled设为true就可以(默认值为false)。
反压机制看似简单,但它背后有一套非常精巧的控制逻辑,下面就来深入看一看。
动态流量控制器
o.a.s.streaming.scheduler.RateController抽象类是动态流量控制的核心。其源码不甚长,抄录如下。
- private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
- extends StreamingListener with Serializable {
- init()
-
- protected def publish(rate: Long): Unit
-
- @transient
- implicit private var executionContext: ExecutionContext = _
-
- @transient
- private var rateLimit: AtomicLong = _
-
- private def init() {
- executionContext = ExecutionContext.fromExecutorService(
- ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
- rateLimit = new AtomicLong(-1L)
- }
-
- private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
- ois.defaultReadObject()
- init()
- }
-
- private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
- Future[Unit] {
- val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
- newRate.foreach { s =>
- rateLimit.set(s.toLong)
- publish(getLatestRate())
- }
- }
-
- def getLatestRate(): Long = rateLimit.get()
-
- override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- val elements = batchCompleted.batchInfo.streamIdToInputInfo
-
- for {
- processingEnd <- batchCompleted.batchInfo.processingEndTime
- workDelay <- batchCompleted.batchInfo.processingDelay
- waitDelay <- batchCompleted.batchInfo.schedulingDelay
- elems <- elements.get(streamUID).map(_.numRecords)
- } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
- }
- }
可见,RateController抽象类继承自StreamingListener特征,表示它是一个Streaming监听器。在之前的Spark Core源码精读系列文章中已经讲过了监听器和事件总线机制,因此不再多说了。
RateController的主要工作如下:
监听StreamingListenerBatchCompleted事件,该事件表示一个批次已经处理完成。
从该事件的BatchInfo实例中取得:处理完成的时间戳processingEndTime、实际处理时长processingDelay(从批次的第一个job开始处理到最后一个job处理完成经过的时间)、调度时延schedulingDelay(从批次被提交给Streaming JobScheduler到第一个job开始处理经过的时间)。
另外从事件的StreamInputInfo实例中取得批次输入数据的条数numRecords。
将取得的以上4个参数传递给速率估算器RateEstimator,计算出新的流量阈值,并将其发布出去。
通过RateController的子类ReceiverRateController实现的publish()抽象方法可知,新的流量阈值是发布给了ReceiverTracker。
- private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
- extends RateController(id, estimator) {
- override def publish(rate: Long): Unit =
- ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
- }
不过下面先看速率估算器RateEstimator的实现,稍后再回来看ReceiverTracker之后的事情。
基于PID机制的速率估算器
o.a.s.streaming.scheduler.rate.RateEstimator是一个很短的特征,其中只给出了计算流量阈值的方法compute()的定义。它还有一个伴生对象用于创建速率估算器的实例,其中写出了更多关于反压机制的配置参数。
- object RateEstimator {
- def create(conf: SparkConf, batchInterval: Duration): RateEstimator =
- conf.get("spark.streaming.backpressure.rateEstimator", "pid") match {
- case "pid" =>
- val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0)
- val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2)
- val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0)
- val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100)
- new PIDRateEstimator(batchInterval.milliseconds, proportional, integral, derived, minRate)
-
- case estimator =>
- throw new IllegalArgumentException(s"Unknown rate estimator: $estimator")
- }
- }
目前RateEstimator的唯一实现类是PIDRateEstimator,亦即spark.streaming.backpressure.rateEstimator配置项的值只能为pid。其具体代码如下。
- private[streaming] class PIDRateEstimator(
- batchIntervalMillis: Long,
- proportional: Double,
- integral: Double,
- derivative: Double,
- minRate: Double
- ) extends RateEstimator with Logging {
- private var firstRun: Boolean = true
- private var latestTime: Long = -1L
- private var latestRate: Double = -1D
- private var latestError: Double = -1L
-
- def compute(
- time: Long,
- numElements: Long,
- processingDelay: Long,
- schedulingDelay: Long
- ): Option[Double] = {
- this.synchronized {
- if (time > latestTime && numElements > 0 && processingDelay > 0) {
- val delaySinceUpdate = (time - latestTime).toDouble / 1000
- val processingRate = numElements.toDouble / processingDelay * 1000
- val error = latestRate - processingRate
-
- val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
- val dError = (error - latestError) / delaySinceUpdate
-
- val newRate = (latestRate - proportional * error -
- integral * historicalError -
- derivative * dError).max(minRate)
-
- latestTime = time
- if (firstRun) {
- latestRate = processingRate
- latestError = 0D
- firstRun = false
- None
- } else {
- latestRate = newRate
- latestError = error
- Some(newRate)
- }
- } else {
- None
- }
- }
- }
- }
PIDRateEstimator充分运用了工控领域中常见的PID控制器的思想。所谓PID控制器,即比例(Proportional)-积分(Integral)-微分(Derivative)控制器,本质上是一种反馈回路(loop feedback)。它把收集到的数据和一个设定值(setpoint)进行比较,然后用它们之间的差计算新的输入值,该输入值可以让系统数据尽量接近或者达到设定值。
下图示出PID控制器的基本原理。

亦即:

其中e(t)代表误差,即设定值与回授值之间的差。也就是说,比例单元对应当前误差,积分单元对应过去累积误差,而微分单元对应将来误差。控制三个单元的增益因子分别为Kp、Ki、Kd。
回到PIDRateEstimator的源码来,对应以上的式子,我们可以得知:
处理速率的设定值其实就是上一批次的处理速率latestRate,回授值就是这一批次的速率processingRate,误差error自然就是两者之差。
过去累积误差在这里体现为调度时延的过程中数据积压的速度,也就是schedulingDelay * processingRate / batchInterval。
将来误差就是上面算出的error对时间微分的结果。
将上面三者综合起来,就可以根据Spark Streaming在上一批次以及这一批次的处理速率,估算出一个合适的用于下一批次的流量阈值。比例增益Kp由spark.streaming.backpressure.pid.proportional控制,默认值1.0;积分增益Ki由spark.streaming.backpressure.pid.integral控制,默认值0.2;微分增益Kd由spark.streaming.backpressure.pid.derived控制,默认值0.0。
除了上述参数之外,还有两个参数与反压机制相关。一是spark.streaming.backpressure.initialRate,用于控制初始化时的处理速率。二是spark.streaming.backpressure.pid.minRate,用于控制最小处理速率,默认值100条/秒。
通过RPC发布流量阈值
回来看ReceiverTracker,顾名思义,它负责追踪Receiver的状态。其sendRateUpdate()方法如下。
- def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
- if (isTrackerStarted) {
- endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
- }
- }
其中endpoint是RPC端点的引用,具体来说,是ReceiverTrackerEndpoint的引用。这个方法会将流ID与新的流量阈值包装在UpdateReceiverRateLimit消息中发送过去。
ReceiverTrackerEndpoint收到这条消息后,会再将其包装为UpdateRateLimit消息并发送给Receiver注册时的RPC端点(位于ReceiverSupervisorImpl类中)。
- private val endpoint = env.rpcEnv.setupEndpoint(
- "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
- override val rpcEnv: RpcEnv = env.rpcEnv
-
- override def receive: PartialFunction[Any, Unit] = {
- case StopReceiver =>
- logInfo("Received stop signal")
- ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
- case CleanupOldBlocks(threshTime) =>
- logDebug("Received delete old batch signal")
- cleanupOldBlocks(threshTime)
- case UpdateRateLimit(eps) =>
- logInfo(s"Received a new rate limit: $eps.")
- registeredBlockGenerators.asScala.foreach { bg =>
- bg.updateRate(eps)
- }
- }
- })
可见,收到该消息之后调用了BlockGenerator.updateRate()方法。BlockGenerator是RateLimiter的子类,它负责将收到的流数据转化成块存储。updateRate()方法是在RateLimiter抽象类中实现的。
- private[receiver] def updateRate(newRate: Long): Unit =
- if (newRate > 0) {
- if (maxRateLimit > 0) {
- rateLimiter.setRate(newRate.min(maxRateLimit))
- } else {
- rateLimiter.setRate(newRate)
- }
- }
这里最终借助了Guava中的限流器RateLimiter实现限流(Spark是不会重复造轮子的),其中maxRateLimit就是前面提到过的spark.streaming.receiver.maxRate参数。至此,新的流量阈值就设置好了。
以上就是与反压机制有关的全部细节,整个流程可以用下面的框图表示。

还有最后一个小问题,流量阈值设定好之后是如何生效的?这其实已经超出了本文的范畴,简单看一下。
Receiver在收到一条数据之后,会调用BlockGenerator.addData()方法,将数据存入缓存。然后再从缓存取数据,并包装成一个个block。
- def addData(data: Any): Unit = {
- if (state == Active) {
- waitToPush()
- synchronized {
- if (state == Active) {
- currentBuffer += data
- } else {
- throw new SparkException(
- "Cannot add data as BlockGenerator has not been started or has been stopped")
- }
- }
- } else {
- throw new SparkException(
- "Cannot add data as BlockGenerator has not been started or has been stopped")
- }
- }
注意到在真正存入缓存之前,先调用了waitToPush()方法,它本质上就是Guava的RateLimiter.acquire()方法。
- @CanIgnoreReturnValue
- public double acquire() {
- return acquire(1);
- }
-
- @CanIgnoreReturnValue
- public double acquire(int permits) {
- long microsToWait = reserve(permits);
- stopwatch.sleepMicrosUninterruptibly(microsToWait);
- return 1.0 * microsToWait / SECONDS.toMicros(1L);
- }
-
- @Override
- final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
- resync(nowMicros);
- long returnValue = nextFreeTicketMicros;
- double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
- double freshPermits = requiredPermits - storedPermitsToSpend;
- long waitMicros =
- storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
- + (long) (freshPermits * stableIntervalMicros);
-
- this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
- this.storedPermits -= storedPermitsToSpend;
- return returnValue;
- }
Guava的限流器是计算机网络中经典限流方法——令牌桶(token bucket)算法的典型实现。acquire()方法的作用是从RateLimiter获取一个令牌(这里叫permit),如果能够取到令牌才将数据缓存,如果不能取到令牌就会被阻塞。RateLimiter.setRate()方法就是通过改变向令牌桶中放入令牌的速率(参数名称permitsPerSecond)来实现流量控制的。
关于令牌桶算法的细节,可以参见英文维基,也可以参考Guava源码,内容十分丰富。下图只是一个简单的示意。
