• Spark Streaming状态管理函数


    本帖转载如下地址

    Spark Streaming状态管理函数
    (一)—updateStateByKey和mapWithState
    https://blog.csdn.net/m0_37914799/article/details/84702378
    (二)—updateStateByKey的使用(scala版)
    https://blog.csdn.net/m0_37914799/article/details/84703057
    (三)—MapWithState的使用(scala版)
    https://blog.csdn.net/m0_37914799/article/details/84703854

    结论
      mapWithState它会按照时间线在每一个批次间隔返回之前的发生改变的或者新的key的状态,不发生变化的不返回;同时mapWithState可以不用设置checkpoint,返回的数据量少。而updateStateByKey统计的是全局Key的状态,就算没有数据输入也会在每个批次的时候返回之前的Key的状态,当数据量大时而且使用checkpoint会占用较大的存储。因此,mapWithState性能和效率要比updateStateByKey好。

    mapWithState 官方demo
    https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
     

    mapWithState过期时间设置
    mapWithState(...).timeout(3s))


    时间参数说明:timeout()传入一个时间间隔参数,如果一个key在大于此间隔没有此key的数据流入,则被认为是空闲的。如上设置为3s。


    但实际情况,3s过后发现key并没有过期,也不会被清除,大概30S之后被清除。
    当超时时,状态数据并不会立即删除当前key的数据,而是打上“删除标记”。

    1. override def remove(key: K): Unit = {
    2. val stateInfo = deltaMap(key)
    3. if (stateInfo != null) {
    4. stateInfo.markDeleted() //only marked for deletion
    5. } else {
    6. val newInfo = new StateInfo[S](deleted = true)
    7. deltaMap.update(key, newInfo)
    8. }
    9. }

    如上,超时的events 会收集到 deltaMap中。

    当doFullScan为true的时候,才会触发过期key的清除,updateRecordWithData()负责全面扫描清除过期key。doFullScan的默认值是false。

    1. override def checkpoint(): Unit = {
    2. super.checkpoint()
    3. doFullScan = true
    4. }
    5. ...
    6. removeTimedoutData = doFullScan // remove timedout data only when full scan is enabled
    7. ...
    8. // Get the timed out state records, call the mapping function on each and collect the
    9. // data returned
    10. if (removeTimedoutData && timeoutThresholdTime.isDefined) {
    11. ...

    默认情况下,发生10个迭代时,才会清除,因此本例中当我设置窗口为3s时,checkpoint周期就是30s,30s才会清理一次过期key。


    一段英文描述
    a key this current batch timeout then the key have to wait for "doFullScan = true" which means "batchtime*DEFAULT_CHECKPOINT_DURATION_MULTIPLIER(defalut:10)"

  • 相关阅读:
    servlet开发-通过Tomcat部署一个简单的webapp
    被华为、阿里等知名4位一线技术专家联手吹爆的SSM实战文档
    MySQL 怎么保证备份数据的一致性?
    java计算机毕业设计医院人事档案管理系源码+系统+mysql数据库+lw文档
    【编程题】【Scratch四级】2020.06 正话反说
    煤炉、Newegg测评自养号环境搭建技术
    从C语言到C++(五)
    破茧化蝶,从Ring Bus到Mesh网络,CPU片内总线的进化之路
    图像算法七 —— 自助采样法 & Bagging算法 & 随机森林
    内网学习笔记(4)
  • 原文地址:https://blog.csdn.net/willyan2007/article/details/126746887