• Flink / Scala- BroadCast 广播流数据先到再处理 Source 数据


    一.引言

    Flink 支持增加 DataStream KeyBy 之后 conncet BroadCastStream 形成 BroadConnectedStream,广播流内数据一般为不间断更新的上下文信息,在本例中,需要针对数据流中的用户信息,基于用于信息 + 广播流内的物料库实现推荐逻辑,针对 BroadConnectedStream 流,需要实现 KeyedBroadCastProcessFunction 完成用户流与广播流的处理,主要方法为:

    ProcessElement - 根据用户流生成用户信息,根据物料库进行推荐

    ProcessBroadcastElement - 获取物料库,并同步至 Context

    由于任务启动时第一批物料库生成需要一定时间,而用户流则源源不断,从而导致物料库生成之前的来的用户都没有物料库进行推荐,为了保证不遗漏用户推荐,这里需要实现数据等待逻辑,让先到的用户流等待广播流的物料库生成完毕再进行推荐,从而保证不遗漏用户。 

    二.While True 尝试

    一开始尝试带入离线的思维,既然物料库未生成无法完成推荐,则进行 while 判断和 TimeUnit 时间等待,重复判断物料库是否生成并造成线程阻塞,待物料库生成完毕再开始推荐,好处是保证不丢弃一个用户,坏处是前期需要线程堵塞,如果用户流数据过大则背压严重。

    1. override def processElement(bs: BatchSendInfo, readOnlyContext: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#ReadOnlyContext, collector: Collector[SendInfo]): Unit = {
    2. materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB
    3. // 第一批造成堵塞,知道物料库生成
    4. while (materialDataBase == null) {
    5. TimeUnit.SECONDS.sleep(60)
    6. materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB
    7. }
    8. val sendInfos = RankUtil.batchRank(bs.userObjects, materialDataBase)
    9. sendInfos.foreach(collector.collect)
    10. }
    11. override def processBroadcastElement(db: MaterialDataBase, context: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#Context, collector: Collector[SendInfo]): Unit = {
    12. val broadCastValue: BroadcastState[String, MaterialDataBase] = context.getBroadcastState(materialDBDescriptor)
    13. // 更新 DB
    14. if (db.isValid) {
    15. broadCastValue.put("MaterialDBContext", db)
    16. }
    17. }

    BatchSendInfo 内存储一批待推荐的用户类,下述统称 UserObject,我的思路是 whilt true 检查物料库是否生成,未生成则等待 60s 再重新从 readOnlyContext 上下文中获取,待物料库不为 null 时执行 BatchRank 的批量排序逻辑,看上去很美好,但是实践后得到的是死循环。

    原因分析:

    对于当前处理的 bs: BatchSendInfo,其 context 在 processBroadcastElement 后已经不再更新,我理想化的情况是等到新的 MaterialDataBase 传输后在这里更新 context,但是由于 context 在当前 processFunction 内不再更新,所以我的 while true 是死循环,所以这个方案 pass,这个方案只能适用于 MaterialDataBase 在另外线程生成并能更新到当前线程的场景。

    三.ValueState 缓存尝试 👍

    还有另外一种方法,就是当物料库不可用时,将先到的数据存到 ValueState 中并设置延时处理,延时时长可以设定为物料库初始化时间左右,待 onTimer 时判断物料库状态,如果物料库初始化成功则执行推荐逻辑,未成功则继续存储至 ValueState,其实本质上和 While True 类似,只不过变成一直存储了,缺点是如果前期数据过多会造成缓存量较大,不过可以通过加大 Heap 或者采用 RocksDB 轻松解决。

    1. override def processElement(bs: BatchSendInfo, readOnlyContext: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#ReadOnlyContext, collector: Collector[SendInfo]): Unit = {
    2. materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB
    3. val lastBatchUserObject = state.value
    4. val combineBS = if (lastBatchUserObject == null) {
    5. bs
    6. } else {
    7. val allUser = new ArrayBuffer[DpaUserObject]()
    8. allUser ++= bs.userObjects
    9. allUser ++= lastBatchUserObject.userObjects
    10. BatchSendInfo(allUser.toArray, readOnlyContext.getCurrentKey)
    11. }
    12. if (materialDataBase == null) {
    13. // 物料库不可用
    14. readOnlyContext.timerService.registerEventTimeTimer(System.currentTimeMillis() + expireTime)
    15. state.update(combineBS)
    16. } else {
    17. // 物料库可用
    18. val sendInfos = RankUtil.batchRank(combineBS.userObjects, materialDataBase)
    19. sendInfos.foreach(sendInfo => {
    20. collector.collect(sendInfo)
    21. })
    22. }
    23. }
    24. override def processBroadcastElement(db: MaterialDataBase, context: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#Context, collector: Collector[SendInfo]): Unit = {
    25. val broadCastValue: BroadcastState[String, MaterialDataBase] = context.getBroadcastState(materialDBDescriptor)
    26. // 更新 DB
    27. if (db.isValid) {
    28. broadCastValue.put("MaterialDBContext", db)
    29. }
    30. }

    ProcessBroadcastElement 方法未改变,只是修改了 ProcessElement  方法:

    A.lastBatchUserObject 判断当前 key 是否存在已经缓存的批用户

    B.CombineBS 用户合并当前 key 需要处理的用户批

    C.如果物料库为 null,则将当前批用户存入 ValueState 并设置 expire 过期时间,这个时间可以基于你物料库生成时间,例如物料库正常情况下50s生成,则设置60s过期,保证到期后物料库可用,不需要持续缓存

    D.如果物料库已经可用则直接执行 BatchRank 推荐逻辑

    所以这里主要就两件事,合并批用户,判断物料库状态决定批用户是存储还是计算。

    除了 Process 函数,还包含 onTimer 函数:

    1. override def onTimer(timestamp: Long, ctx: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#OnTimerContext, out: Collector[SendInfo]): Unit = {
    2. val batchBS = state.value()
    3. materialDataBase = ctx.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB
    4. if (!batchBS.equals(null) && !materialDataBase.equals(null)) {
    5. // 物料库可用,批量下发
    6. val sendInfos = RankUtil.batchRank(batchBS.userObjects, materialDataBase)
    7. sendInfos.foreach(sendInfo => {
    8. out.collect(sendInfo)
    9. })
    10. } else {
    11. // 清除状态
    12. state.clear()
    13. }
    14. }

    onTimer 单独处理到期的批用户,这里重新获取 materialDataBase,如果批用户和物料库都不为 null 则执行批推荐逻辑,否则清理批用户 state.clear(),我这里会损失数据,如果不想损失数据则将 else 逻辑修改为与 ProcessElement 一致,如果物料库经过 expireTime 还未成功,则继续缓存数据,直到下一个 expireTime 周期,循环往复

    1. context.timerService.registerEventTimeTimer(System.currentTimeMillis() + expireTime)
    2. state.update(combineBS)

    四.总结

    上述代码中的 BatchSendInfo 可以看做是自己的 Source 类,MaterialDataBase 可以看做是自己的广播流上下文,面对需要等到广播流初始化完毕的需求则修改上述对应代码即可,expireTime 则根据广播流变量初始化时间进行设定,缓存方法本地测试缓存159批数据,到期处理159批数据,延迟和存储要求都不高,非常的奈斯~

  • 相关阅读:
    【LeetCode周赛】LeetCode第364场周赛
    比特熊故事汇独家|英特尔“非典型性女博士”的大跨步人生
    java计算机毕业设计基层党支部建设平台MyBatis+系统+LW文档+源码+调试部署
    详解KMP
    云表:MES系统是工业4.0数字化转型的核心
    three.js点滴yan(整理后)
    set_input_delay如何使用?
    React Hooks批量更新问题
    搜维尔科技:【周刊】适用于虚拟现实VR中的OptiTrack
    kubernetes-nvidia-plugin设计解读
  • 原文地址:https://blog.csdn.net/BIT_666/article/details/126285741