背压(back pressure)机制主要用于解决流处理系统中,业务流量在短时间内剧增,造成巨大的流量毛刺,数据流入速度远高于数据处理速度,对流处理系统构成巨大的负载压力的问题。
如果不能处理流量毛刺或者持续的数据过高速率输入,可能导致Executor端出现OOM的情况或者任务崩溃。
就是 通过限制最大消费速度(这个要人为压测预估)
可以配置spark.streaming.receiver.maxRate
参数来限制每个receiver没每秒最大可以接收的数据量
可以配置 spark.streaming.kafka.maxRatePerPartition
参数来限制每个kafka分区最多读取的数据量。
新版的背压机制不需要手动干预,spark streaming 能够根据当前数据量以及集群状态来预估下个批次最优速率。
flink 的背压特性是逐渐反向背压,从下游的算子开始逐渐排查是哪个算子处理数据处理不过来了。 然后上游减缓发送速度。当fink自动逐级背压处理不过来的时候就需要人为手动来干预了。
背压监测通过反复获取正在运行的任务的堆栈跟踪的样本来工作,JobManager 对作业重复调用 Thread.getStackTrace()。
下面是官方提供的示意图:
如果采样(samples)显示任务线程卡在某个内部方法调用中,则表示该任务存在背压。
默认情况下,JobManager 每50ms为每个任务触发100个堆栈跟踪,来确定背压。在Web界面中看到的比率表示在内部方法调用中有多少堆栈跟踪被阻塞,例如,0.01表示该方法中只有1个被卡住。状态和比率的对照如下:
为了不使堆栈跟踪样本对 TaskManager 负载过高,每60秒会刷新采样数据。
可以使用以下配置 JobManager 的采样数:
背压状态可以大致锁定背压可能存在的算子,但具体背压是由于当前Task自身处理速度慢还是由于下游Task处理慢导致的,需要通过metric监控进一步判断。
**原理:**BackPressure界面会周期性的对Task线程栈信息采样,通过线程被阻塞在请求Buffer的频率来判断节点是否处于背压状态。计算缓冲区阻塞线程数与总线程数的比值 rate。其中,rate < 0.1 为 OK,0.1 <= rate <= 0.5 为 LOW,rate > 0.5 为 HIGH。
缓冲区的数据处理不过来,barrier流动慢,导致checkpoint生成时间长, 出现超时的现象。input 和 output缓冲区都占满。
首先,背压不会直接导致系统的崩盘,只是处在一个不健康的运行状态。
(1)背压会导致流处理作业数据延迟的增加。
(2)影响到Checkpoint,导致失败,导致状态数据保存不了,如果上游是kafka数据源,在一致性的要求下,可能会导致offset的提交不上。
原理: 由于Flink的Checkpoint机制需要进行Barrier对齐,如果此时某个Task出现了背压,Barrier流动的速度就会变慢,导致Checkpoint整体时间变长,如果背压很严重,还有可能导致Checkpoint超时失败。
(3)影响state的大小,还是因为checkpoint barrier对齐要求。导致state变大。
原理: 接受到较快的输入管道的barrier后,它后面数据会被缓存起来但不处理,直到较慢的输入管道的barrier也到达。这些被缓存的数据会被放到state 里面,导致state变大。
Flink不需要一个特殊的机制来处理背压,因为Flink中的数据传输相当于已经提供了应对背压的机制。所以只有从代码上与资源上去做一些调整。
(1)背压部分原因可能是由于数据倾斜造成的,我们可以通过 Web UI 各个 SubTask 的 指标值来确认。Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。解决方式把数据分组的 key 预聚合来消除数据倾斜。
(2)代码的执行效率问题,阻塞或者性能问题。
(3)TaskManager 的内存大小导致背压。
参考:
https://blog.csdn.net/may_fly/article/details/103922862
https://www.zhihu.com/question/345381979