• Flink 细粒度滑动窗口性能优化


    图片

    大数据技术AI

    Flink/Spark/Hadoop/数仓,数据分析、面试,源码解读等干货学习资料

    118篇原创内容

    公众号

    1、概述

    1.1 细粒度滑动的影响

    当使用细粒度的滑动窗口(窗口长度远远大于滑动步长)时,重叠的窗口过多,一个数据会属于多个窗口,性能会急剧下降。

    图片

    以1分钟的频率实时计算App内各个子模块近24小时的PV和UV。

    我们需要用粒度为1440 / 1 = 1440的滑动窗口来实现它,但是细粒度的滑动窗口会带来性能问题,有两点:

    • 状态

    对于一个元素,会将其写入对应的(key, window)二元组所圈定的windowState状态中。如果粒度为1440,那么每个元素到来,更新windowState时都要遍历1440个窗口并写入,开销是非常大的。在采用RocksDB作为状态后端时,checkpoint的瓶颈也尤其明显。

    • 定时器

    每一个(key, window)二元组都需要注册两个定时器:一是触发器注册的定时器,用于决定窗口数据何时输出;二是registerCleanupTimer()方法注册的清理定时器,用于在窗口彻底过期(如allowedLateness过期)之后及时清理掉窗口的内部状态。

    细粒度滑动窗口会造成维护的定时器增多,内存负担加重。

    1.2 解决思路

    DataStreamAPI中,自己解决(https://issues.apache.org/jira/browse/FLINK-7001)

    图片

    我们一般使用滚动窗口+在线存储+读时聚合的思路作为解决方案:

    (1)从业务的视角来看,往往窗口的长度是可以被步长所整除的,可以找到窗口长度和窗口步长的最小公约数作为时间分片(一个滚动窗口的长度);

    (2)每个滚动窗口将其周期内的数据做聚合,存到下游状态或打入外部在线存储(内存数据库如Redis,LSM-based NoSQL存储如HBase);

    (3)扫描在线存储中对应时间区间(可以灵活指定)的所有行,并将计算结果返回给前端展示。

    1.3 Flink Sql

    Flink 1.13对SQL模块的 Window TVF 进行了一系列的性能优化,可以自动对滑动窗口进行切片解决细粒度滑动问题。

    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-tvf/

    2、过滤数据

    2.1 过滤页面数据

     // 2.0 过滤页面数据
     .filter(x =>
             StringUtils.isNullOrWhitespaceOnly(x.getString("start"))
            )
     .keyBy(_.getJSONObject("common").getString("mid"))
     // 3.0 按照mid分组,过滤掉不是今天第一次访问的数据
     .filter(new FilterFirstMidFunc)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.2 过滤第一个访问的数据

     package com.duo.app2.func
     
     import com.alibaba.fastjson.JSONObject
     import org.apache.flink.api.common.functions.RichFilterFunction
     import org.apache.flink.api.common.state.{StateTtlConfig, ValueState, ValueStateDescriptor}
     import org.apache.flink.configuration.Configuration
     
     import java.text.SimpleDateFormat
     
     class FilterFirstMidFunc extends RichFilterFunction[JSONObject]{
       private var firstVisitDateState: ValueState[String] = null
       private var simpleDateFormat: SimpleDateFormat = null
     
       override def open(parameters: Configuration): Unit = {
         simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd")
     
         val firstVisitDesc = new ValueStateDescriptor[String](
           "firstVisitDateState",
           classOf[String]
         )
         // 设置状态的过期时间ttl
         val ttlConfig = StateTtlConfig
           .newBuilder(org.apache.flink.api.common.time.Time.days(1))
           .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
           .build()
         firstVisitDesc.enableTimeToLive(ttlConfig)
     
         firstVisitDateState = getRuntimeContext.getState(firstVisitDesc)
     
       }
     
       override def filter(in: JSONObject) = {
         val lastPageId = in.getJSONObject("page").getString("last_page_id")
         //判断是否存在上一个页面
         if (lastPageId == null || lastPageId.length <= 0) {
           val firstVisitDate = firstVisitDateState.value()
           val ts = in.getLong("ts")
           val curDate = simpleDateFormat.format(ts)
           // 如果状态数据=null,或者 状态中时间!=当前时间,说明是第一次访问
           if (firstVisitDate == null || !firstVisitDate.equals(curDate)) {
             firstVisitDateState.update(curDate)
             true
           } else {
             false
           }
         } else {
           false
         }
       }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    3、滑动窗口实现

    3.1 元组实现报错

     uvDS.map(x => 1L)
       .windowAll(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(1)))
       .reduce(
         _ + _,
         new ProcessAllWindowFunction[Long, (String, String, Long), TimeWindow] {
           private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:SS")
     
           override def process(context: Context, elements: Iterable[Long], out: Collector[(String, String, Long)]): Unit = {
             val uvCount = elements.iterator.next()
             val startDate = format.format(context.window.getStart)
             val endDate = format.format(context.window.getEnd)
             out.collect((startDate, endDate, uvCount))
           }
         })
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    错误代码:

     type mismatch;
     found   : org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
     required: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Long, ?]
     Note: Object <: Any (and org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows <: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]), but Java-defined class WindowAssigner is invariant in type T.
     You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
     .windowAll(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(1)))
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    猜一下窗口不能识别map(x => 1L)中1L的类型

    3.2 样例类实现

     uvDS.map(x => {
       UV(1L)
     })
       .windowAll(SlidingProcessingTimeWindows.of(Time.hours(1), Time.minutes(1)))
       .reduce(
         (x: UV, y: UV) => UV(x.uvCount + y.uvCount),
         new UVReduceFunc
       )
       .print()
     
     case class UV(uvCount: Long)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    打印结果:

     (2022-05-24 19:00:00,2022-05-24 20:00:00,13784)
     (2022-05-24 19:01:00,2022-05-24 20:01:00,145162)
    
    • 1
    • 2

    4、滚动窗口+状态

    4.1 主程序

     val resultDS = uvDS.map(
       x => {
         UV(1L)
       }
     )
       .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(60)))
       .reduce(
         (x: UV, y: UV) => UV(x.uvCount + y.uvCount),
         new UVReduceFunc
       )
       .keyBy(_._1)
       .process(new SplitWindowAggFunc)
     
     resultDS.print()
     env.execute()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    4.2 聚合

     package com.duo.app2.func
     
     import com.duo.app2.source.UV
     import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
     import org.apache.flink.streaming.api.windowing.windows.TimeWindow
     import org.apache.flink.util.Collector
     
     import java.text.SimpleDateFormat
     
     class UVReduceFunc extends ProcessAllWindowFunction[UV, (String, String, Long), TimeWindow]{
       private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
     
       override def process(context: Context, elements: Iterable[UV], out: Collector[(String, String, Long)]): Unit = {
         val uvCount = elements.iterator.next().uvCount
         val startDate = format.format(context.window.getStart)
         val endDate = format.format(context.window.getEnd)
         out.collect((startDate, endDate, uvCount))
       }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    4.3 计算分片数据

     package com.duo.app2.func
     
     import com.duo.app2.source.UVResult
     import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
     import org.apache.flink.configuration.Configuration
     import org.apache.flink.streaming.api.functions.KeyedProcessFunction
     import org.apache.flink.util.Collector
     
     import scala.collection.mutable.ListBuffer
     
     class SplitWindowAggFunc extends KeyedProcessFunction[String, (String, String, Long), UVResult]{
       // 返回结果
       val resultAgg: UVResult = UVResult(null, null, 0L)
       // 存储分片数据的状态
       var listState: ValueState[ListBuffer[(String, String, Long)]] = null
       // 每次计算的临时变量
       var list: ListBuffer[(String, String, Long)] = new ListBuffer[(String, String, Long)]()
       // 分片数
       var splitNum: Int = 0
     
     
       override def open(parameters: Configuration): Unit = {
         listState = getRuntimeContext
           .getState(
             new ValueStateDescriptor[ListBuffer[(String, String, Long)]](
               "listState",
               classOf[ListBuffer[(String, String, Long)]]
             ))
         list = ListBuffer.empty
         // 1小时的窗口,1分钟的滑动 ===》 分片数=60分钟/1分钟
         splitNum = 60
       }
     
       override def processElement(value: (String, String, Long),
                                   ctx: KeyedProcessFunction[String, (String, String, Long), UVResult]#Context,
                                   out: Collector[UVResult]) = {
         // 清空结果
         resultAgg.uvCount = 0L
         if (listState.value() != null) {
           // 从状态中拿到分片数据
           list ++= listState.value()
         }
         // 将新的时间分片数据添加到list
         list += value
         // 一小时之前,集合是不满60的
         if (list.size >= splitNum) {
           // 如果集合+当前数据=61,则删除第一个
           if (list.size == (splitNum + 1)) {
             list.remove(0)
           }
         }
         // 遍历集合中分片数据
         for (elem <- list) {
           // 如果是第一个元素,拿到第一个元素的开始时间,即:窗口的开始时间
           if (list.head == elem) {
             resultAgg.start = elem._1
           }
           // 如果是最后一个元素,拿到最后一个元素的结束时间,即:窗口的结束时间
           if (list.last == elem) {
             resultAgg.end = elem._2
           }
           // 分片数据相加
           resultAgg.uvCount += elem._3
         }
         listState.clear()
         // 将分片数据存入状态中
         listState.update(list)
     
         out.collect(resultAgg)
       }
     }
     
     case class UVResult(var start: String, var end: String, var uvCount: Long)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 相关变量和状态

    • resultAgg:返回结果

    • listState:存储分片数据的状态

    • list:每次计算的临时变量

    • splitNum:分片数(1小时的窗口,1分钟的滑动,即:分片数=60分钟/1分钟)

    • 初始化

    • 初始化状态

    • 初始化分片数

    • 初始化list集合

    • 计算逻辑

    • 如果是第一个元素,拿到第一个元素的开始时间,即:窗口的开始时间

    • 如果是最后一个元素,拿到最后一个元素的结束时间,即:窗口的结束时间

    • 分片数据相加

    • 清空结果

    • 从状态中拿到分片数据

    • 将新的时间分片数据添加到list

    • 如果集合+当前数据=61,则删除第一个

    • 遍历集合中分片数据

    • 将分片数据存入状态中

    图片

  • 相关阅读:
    python简介常考面试题目:python是什么,有什么好处,python2和python3的主要区别
    【老生谈算法】matlab实现蒙特卡罗定积分源码——蒙特卡罗定积分
    Codeforces Round #810 (Div. 2) D. Rain (线段树差分)
    用于验证的verilog语法--1
    计算机网络——理论知识总结(上)
    关于排序算法的小结
    计算机毕业设计Java华夏球迷俱乐部网站设计与实现(源码+系统+mysql数据库+lw文档)
    强化学习------DQN算法
    下载Windows 10操作系统和在VMware虚拟机中配置完成
    赛码网输入输出(js v8)问题并配置赛码网vscode本地环境
  • 原文地址:https://blog.csdn.net/hyunbar/article/details/125435245