• Spark Streaming状态管理函数


    本帖转载如下地址

    Spark Streaming状态管理函数
    (一)—updateStateByKey和mapWithState
    https://blog.csdn.net/m0_37914799/article/details/84702378
    (二)—updateStateByKey的使用(scala版)
    https://blog.csdn.net/m0_37914799/article/details/84703057
    (三)—MapWithState的使用(scala版)
    https://blog.csdn.net/m0_37914799/article/details/84703854

    结论
      mapWithState它会按照时间线在每一个批次间隔返回之前的发生改变的或者新的key的状态,不发生变化的不返回;同时mapWithState可以不用设置checkpoint,返回的数据量少。而updateStateByKey统计的是全局Key的状态,就算没有数据输入也会在每个批次的时候返回之前的Key的状态,当数据量大时而且使用checkpoint会占用较大的存储。因此,mapWithState性能和效率要比updateStateByKey好。

    mapWithState 官方demo
    https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
     

    mapWithState过期时间设置
    mapWithState(...).timeout(3s))


    时间参数说明:timeout()传入一个时间间隔参数,如果一个key在大于此间隔没有此key的数据流入,则被认为是空闲的。如上设置为3s。


    但实际情况,3s过后发现key并没有过期,也不会被清除,大概30S之后被清除。
    当超时时,状态数据并不会立即删除当前key的数据,而是打上“删除标记”。

    1. override def remove(key: K): Unit = {
    2. val stateInfo = deltaMap(key)
    3. if (stateInfo != null) {
    4. stateInfo.markDeleted() //only marked for deletion
    5. } else {
    6. val newInfo = new StateInfo[S](deleted = true)
    7. deltaMap.update(key, newInfo)
    8. }
    9. }

    如上,超时的events 会收集到 deltaMap中。

    当doFullScan为true的时候,才会触发过期key的清除,updateRecordWithData()负责全面扫描清除过期key。doFullScan的默认值是false。

    1. override def checkpoint(): Unit = {
    2. super.checkpoint()
    3. doFullScan = true
    4. }
    5. ...
    6. removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
    7. ...
    8. // Get the timed out state records, call the mapping function on each and collect the
    9. // data returned
    10. if (removeTimedoutData && timeoutThresholdTime.isDefined) {
    11. ...

    默认情况下,发生10个迭代时,才会清除,因此本例中当我设置窗口为3s时,checkpoint周期就是30s,30s才会清理一次过期key。


    一段英文描述
    a key this current batch timeout then the key have to wait for "doFullScan = true" which means "batchtime*DEFAULT_CHECKPOINT_DURATION_MULTIPLIER(defalut:10)"

  • 相关阅读:
    【动手学深度学习】李沐——循环神经网络
    centos7 arm服务器编译安装onnxruntime-gpu
    Web自动化成长之路:selenium中三种等待方式/三大切换操作
    使用html2canva截图生成图片
    Zero-Reference Deep Curve Estimation for Low-Light Image Enhancement
    计算机毕业设计选题推荐-一周穿搭推荐微信小程序/安卓APP-项目实战
    更新Xcode 版本后运行项目出现错误 Unable to boot the Simulator 解决方法
    sqlalchemy 连接池
    OpenAI 现已开始考虑自研 AI 芯片战略
    (附源码)ssm在线学习网站 毕业设计 080833
  • 原文地址:https://blog.csdn.net/willyan2007/article/details/126746887