对于有状态的 Flink 应用,推荐给每个算子都指定唯一用户 ID(UUID)。
严格地说,仅需要给有状态的算子设置就足够了。但是因为 Flink 的某些内置算子(如 window)是有状态的,而有些是无状态的,可能用户不是很清楚哪些内置算子是有状态的,哪些不是。
所以从实践经验上来说,建议每个算子都指定上 UUID。
默认情况下,算子 UID 是根据 JobGraph 自动生成的,JobGraph 的更改可能会导致 UUID 改变。手动指定算子 UUID ,可以让 Flink 有效地将算子的状态从 savepoint 映射到作业修改后(拓扑图可能也有改变)的正确的算子上。
比如替换原来的 Operator 实现、增加新的Operator、删除Operator等等,至少我们有可能与Savepoint中存储的Operator 状态对应上。这是 savepoint 在 Flink 应用中正常工作的一个基本要素。
Flink 算子的 UUID 可以通过 uid(String uid) 方法指定,通常也建议指定 name。
#算子.uid("指定 uid")
.reduce((value1, value2) -> Tuple3.of("uv", value2.f1, value1.f2 + value2.f2))
.uid("uv-reduce").name("uv-reduce”)
如果无法从 savepoint 重新启动,也可以在启动命令上增加如下参数:
在提交命令中添加 --allowNonRestoredState 跳过无法恢复的算子。
对于实时的流式处理系统来说,需要关注数据输入、计算和输出的及时性,所以处理延迟是一个比较重要的监控指标,特别是在数据量大或者软硬件条件不佳的环境下。
开箱即用的 LatencyMarker 机制来测量链路延迟。开启如下参数:
metrics.latency.interval: 30000 #默认 0,表示禁用,单位毫秒
监控的粒度,分为以下 3 档:
➢ single:每个算子单独统计延迟;
➢ operator(默认值):每个下游算子都统计自己与 Source 算子之间的延迟;
➢ subtask:每个下游算子的 sub-task 都统计自己与 Source 算子的 sub-task 之间的延迟。
metrics.latency.granularity: operator #默认 operator
一般情况下采用默认的 operator 粒度即可,这样在 Sink 端观察到的 latency metric 就是最想要的全链路(端到端)延迟。subtask 粒度太细,会增大所有并行度的负担,不建议使用。
LatencyMarker 不会参与到数据流的用户逻辑中的,而是直接被各算子转发并统计。
注意:
➢ 保证 Flink 集群内所有节点的时区、时间是同步的:ProcessingTimeService 产生时间戳最终是靠 System.currentTimeMillis()方法,可以用 ntp 等工具来配置。
➢ metrics.latency.interval 的时间间隔宜大不宜小:一般配置成 30000(30 秒)左右。一是因为延迟监控的频率可以不用太频繁,二是因为 LatencyMarker 的处理也要消耗一定性能。
可以通过下面的 metric 查看结果:
flink_taskmanager_job_latency_source_id_operator_id_operator_subtask_index_latency
# 启动命令添加
-Dpipeline.object-reuse=true
# 代码中配置
env.getConfig().enableObjectReuse();
当调用了 enableObjectReuse 方法后,Flink 会把中间深拷贝的步骤都省略掉,SourceFunction 产生的数据直接作为 MapFunction 的输入,可以减少 gc 压力。
但需要特别注意的是,这个方法不能随便调用,必须要确保下游 Function 只有一种,或者下游的Function 均不会改变对象内部的值。否则可能会有线程安全的问题。
当使用细粒度的滑动窗口(窗口长度远远大于滑动步长)时,重叠的窗口过多,一个数据会属于多个窗口,性能会急剧下降。
案例:以 3 分钟的频率实时计算 App 内各个子模块近 24 小时的 PV 和 UV。需要用粒度为 1440 / 3 = 480 的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题。
➢ 状态
对于一个元素,会将其写入对应的(key, window)二元组所圈定的 windowState 状态中。如果粒度为 480,那么每个元素到来,更新 windowState 时都要遍历 480 个窗口并写入,开销是非常大的。
在采用 RocksDB 作为状态后端时,checkpoint 的瓶颈也尤其明显。
➢ 定时器
每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是 registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如 allowedLateness 过期)之后及时清理掉窗口的内部状态。
细粒度滑动窗口会造成维护的定时器增多,内存负担加重。
使用滚动窗口+在线存储+读时聚合的思路作为解决方案。
1.从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度);
2.每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如 Redis,LSM-based NoSQL 存储如 HBase);
3.扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。
注意:Flink 1.13 对SQL 模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。