• Spark Streaming状态管理函数updateStateByKey和mapWithState


    一、关于updateStateByKey

             updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

    示例:

    1. package cn.yy.streaming
    2. import org.apache.spark.SparkConf
    3. import org.apache.spark.storage.StorageLevel
    4. import org.apache.spark.streaming.dstream.DStream
    5. import org.apache.spark.streaming.{Seconds, StreamingContext}
    6. /**
    7. * 需求:统计整个流中所有出现的单词数量,而不是一个批中的数量
    8. * 可以使用状态来记录中间结果, 从而每次来一批数据, 计算后和中间状态求和, 于是就完成了总数的统计
    9. * 使用 updateStateByKey 可以做到这件事---updateStateByKey 会将中间状态存入 CheckPoint 中
    10. */
    11. object GlobalWordCount {
    12. def main(args: Array[String]): Unit = {
    13. // 1. 创建 Context
    14. val conf = new SparkConf().setAppName("updateStateBykey").setMaster("local[6]")
    15. val ssc = new StreamingContext(conf, Seconds(1))
    16. ssc.sparkContext.setLogLevel("WARN")
    17. // 2. 读取数据生成 DStream
    18. val source = ssc.socketTextStream(
    19. hostname = "192.168.100.100",
    20. port = 9999,
    21. storageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    22. )
    23. // 3. 词频统计
    24. val wordsTuple: DStream[(String, Int)] = source.flatMap(_.split(" ")).map((_, 1))
    25. // 4. 全局聚合---使用 updateStateByKey 必须设置 Checkpoint 目录
    26. ssc.checkpoint("hdfs://192.168.100.100:9000/spark/data")
    27. def updateFunc(newValue: Seq[Int], runningValue: Option[Int]): Option[Int] = {
    28. // newValue : 对应当前批次中 Key 对应的所有 Value---newValue 之所以是一个 Seq, 是因为它是某一个 Batch 的某个 Key 的全部 Value
    29. // runningValue : 当前的中间结果
    30. val currBatchValue = newValue.sum
    31. val state = runningValue.getOrElse(0) + currBatchValue
    32. //返回的这个 Some(state) 会再次进入 Checkpoint 中当作状态存储
    33. Some(state)
    34. }
    35. val result: DStream[(String, Int)] = wordsTuple.updateStateByKey[Int](updateFunc _)
    36. // 5. 输出
    37. result.print()
    38. ssc.start()
    39. ssc.awaitTermination()
    40. }
    41. }

    输入:

    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. huluwa huluwa dawa erwa

    spark计算结果:

    1. 18/11/13 14:19:00 INFO CheckpointWriter: Submitted checkpoint of time 1542089940000 ms to writer queue
    2. 18/11/13 14:19:00 INFO CheckpointWriter: Saving checkpoint for time 1542089940000 ms to file 'hdfs://192.168.137.251:9000/spark/data/checkpoint-1542089940000'
    3. -------------------------------------------
    4. Time: 1542089940000 ms
    5. -------------------------------------------
    6. (huluwa,2)
    7. (dawa,1)
    8. (erwa,1)
    9. ...
    10. ...
    11. ...
    12. -------------------------------------------
    13. Time: 1542089945000 ms
    14. -------------------------------------------
    15. (huluwa,2)
    16. (dawa,1)
    17. (erwa,1)
    18. ...
    19. ...
    20. ...
    21. -------------------------------------------
    22. Time: 1542089950000 ms
    23. -------------------------------------------
    24. (huluwa,2)
    25. (dawa,1)
    26. (erwa,1)

    如果不输入新的数据,会一直展示之前的结果
    再输入一条新的数据:

    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. huluwa huluwa dawa erwa
    3. huluwa huluwa dae erwa

    输出结果:

    1. -------------------------------------------
    2. Time: 1542089970000 ms
    3. -------------------------------------------
    4. (dae,1)
    5. (huluwa,4)
    6. (dawa,1)
    7. (erwa,2)

    继续输入数据:

    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. huluwa huluwa dawa erwa
    3. huluwa huluwa dae erwa
    4. huluwa huluwa dawa erwa
    5. huluwa huluwa dawa erwa
    6. huluwa huluwa dawa erwa
    7. huluwa huluwa dae erwa
    8. huluwa huluwa dae erwa

    结果:

    1. -------------------------------------------
    2. Time: 1542090080000 ms
    3. -------------------------------------------
    4. (dae,3)
    5. (huluwa,14)
    6. (dawa,4)
    7. (erwa,7)

    查看checkpoint文件夹下,发现有很多类似于checkpoint-1542090065000的状态文件

    1. [hadoop@hadoop000 data]$ hadoop fs -ls /spark/data
    2. Found 15 items
    3. -rw-r--r-- 1 ÃÎcandybear supergroup 3514 2018-11-13 22:21 /spark/data/checkpoint-1542090065000
    4. -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090065000.bk
    5. -rw-r--r-- 1 ÃÎcandybear supergroup 3511 2018-11-13 22:21 /spark/data/checkpoint-1542090070000
    6. -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090070000.bk
    7. -rw-r--r-- 1 ÃÎcandybear supergroup 3514 2018-11-13 22:21 /spark/data/checkpoint-1542090075000
    8. -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090075000.bk
    9. -rw-r--r-- 1 ÃÎcandybear supergroup 3511 2018-11-13 22:21 /spark/data/checkpoint-1542090080000
    10. -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090080000.bk
    11. -rw-r--r-- 1 ÃÎcandybear supergroup 3512 2018-11-13 22:21 /spark/data/checkpoint-1542090085000
    12. -rw-r--r-- 1 ÃÎcandybear supergroup 3516 2018-11-13 22:21 /spark/data/checkpoint-1542090085000.bk
    13. drwxr-xr-x - ÃÎcandybear supergroup 0 2018-11-13 22:21 /spark/data/e02daf3a-0805-4612-b612-67ca34d32ff8
    14. drwxr-xrwx - ÃÎcandybear supergroup 0 2018-11-13 22:21 /spark/data/receivedBlockMetadata

    这些checkpoint文件都是小文件,对hdfs的压力很大,怎么解决呢?下文会讲

    附:updateStateByKey源码(1.6版本之前用这个)

    1. /**
    2. * Return a new "state" DStream where the state for each key is updated by applying
    3. * the given function on the previous state of the key and the new values of each key.
    4. * In every batch the updateFunc will be called for each state even if there are no new values.
    5. * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
    6. * @param updateFunc State update function. If `this` function returns None, then
    7. * corresponding state key-value pair will be eliminated.
    8. * @tparam S State type
    9. */
    10. def updateStateByKey[S: ClassTag](
    11. updateFunc: (Seq[V], Option[S]) => Option[S]
    12. ): DStream[(K, S)] = ssc.withScope {
    13. updateStateByKey(updateFunc, defaultPartitioner())
    14. }

    1.6版本之后用mapWithState

    1. /**
    2. * :: Experimental ::
    3. * Return a [[MapWithStateDStream]] by applying a function to every key-value element of
    4. * `this` stream, while maintaining some state data for each unique key. The mapping function
    5. * and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
    6. * transformation can be specified using `StateSpec` class. The state data is accessible in
    7. * as a parameter of type `State` in the mapping function.
    8. *
    9. * Example of using `mapWithState`:
    10. * {{{
    11. * // A mapping function that maintains an integer state and return a String
    12. * def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
    13. * // Use state.exists(), state.get(), state.update() and state.remove()
    14. * // to manage state, and return the necessary string
    15. * }
    16. *
    17. * val spec = StateSpec.function(mappingFunction).numPartitions(10)
    18. *
    19. * val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
    20. * }}}
    21. *
    22. * @param spec Specification of this transformation
    23. * @tparam StateType Class type of the state data
    24. * @tparam MappedType Class type of the mapped data
    25. */
    26. @Experimental
    27. def mapWithState[StateType: ClassTag, MappedType: ClassTag](
    28. spec: StateSpec[K, V, StateType, MappedType]
    29. ): MapWithStateDStream[K, V, StateType, MappedType] = {
    30. new MapWithStateDStreamImpl[K, V, StateType, MappedType](
    31. self,
    32. spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
    33. )
    34. }

    二、关于mapWithState

            mapWithState也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,类似于增量的感觉。

            需要自己写一个匿名函数func来实现自己想要的功能。如果有初始化的值的需要,可以使用initialState(RDD)来初始化key的值。 另外,还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在func中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。

            mapWithState它会按照时间线在每一个批次间隔返回之前的发生改变的或者新的key的状态,不发生变化的不返回。同时mapWithState可以不用设置checkpoint,返回的数据量少,性能和效率都比updateStateByKey好。

    示例:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
    3. object MapWithStateApp {
    4. def main(args: Array[String]): Unit = {
    5. val conf = new SparkConf().setMaster("local[2]").setAppName("MapWithStateApp")
    6. val ssc = new StreamingContext(conf,Seconds(5))
    7. ssc.checkpoint("hdfs://192.168.100.100:9000/spark/data")
    8. val lines = ssc.socketTextStream("hadoop000",9999)
    9. val words = lines.flatMap(_.split(" "))
    10. val pairs = words.map(x=>(x,1)).reduceByKey(_+_)
    11. // Update the cumulative count using mapWithState
    12. // This will give a DStream made of state (which is the cumulative count of the words)
    13. val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
    14. val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
    15. val output = (word, sum)
    16. state.update(sum)
    17. output
    18. }
    19. val wordcounts = pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Minutes(60)))
    20. wordcounts.print()
    21. ssc.start()
    22. ssc.awaitTermination()
    23. }
    24. }

            mapWithState接收的参数是一个StateSpec对象,在StateSpec中封装了状态管理的函数。我们定义了一个状态更新函数mappingFunc,该函数会更新指定用户的状态,同时会返回更新后的状态,将该函数传给mapWithState,并设置状态超时时间。SparkStreaming通过根据我们定义的更新函数,在每个计算时间间隔内更新内部维护的状态,同时返回经过mappingFunc处理后的结果数据流。

    在控制台输入一条数据:

    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. hadoop spark spark hive

    spark输出结果:

    1. -------------------------------------------
    2. Time: 1542098100000 ms
    3. -------------------------------------------
    4. (hive,1)
    5. (spark,2)
    6. (hadoop,1)

    再输入两条数据:

    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. hadoop spark spark hive
    3. hadoop spark spark hive
    4. hadoop spark spark hive

    输出:

    1. -------------------------------------------
    2. Time: 1542098115000 ms
    3. -------------------------------------------
    4. (hive,3)
    5. (spark,6)
    6. (hadoop,3)

    下面输入一条新的数据:

    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. hadoop spark spark hive
    3. hadoop spark spark hive
    4. hadoop spark spark hive
    5. huluwa huluwa dawa erwa

    发现spark计算结果只展示与输入数据相匹配的结果:

    1. -------------------------------------------
    2. Time: 1542098120000 ms
    3. -------------------------------------------
    4. (huluwa,2)
    5. (dawa,1)
    6. (erwa,1)
    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. hadoop spark spark hive
    3. hadoop spark spark hive
    4. hadoop spark spark hive
    5. huluwa huluwa dawa erwa
    6. huluwa huluwa dawa erwa
    1. -------------------------------------------
    2. Time: 1542098125000 ms
    3. -------------------------------------------
    4. (huluwa,4)
    5. (dawa,2)
    6. (erwa,2)

    那之前的计算结果是否还存在?验证一下:

    1. [hadoop@hadoop000 ~]$ nc -lk 9999
    2. hadoop spark spark hive
    3. hadoop spark spark hive
    4. hadoop spark spark hive
    5. huluwa huluwa dawa erwa
    6. huluwa huluwa dawa erwa
    7. hadoop spark spark hive

    从输出结果可以看到,之前的统计结果还存在,只是选择性的展示出来:

    1. -------------------------------------------
    2. Time: 1542098135000 ms
    3. -------------------------------------------
    4. (hive,4)
    5. (spark,8)
    6. (hadoop,4)

    打开checkpoint目录,和updateStateByKey一样,有很多checkpoint-时间戳的小文件存在

    1. [hadoop@hadoop000 data]$ hadoop fs -ls /spark/data
    2. Found 12 items
    3. -rw-r--r-- 1 ÃÎcandybear supergroup 3974 2018-11-14 00:35 /spark/data/checkpoint-1542098120000
    4. -rw-r--r-- 1 ÃÎcandybear supergroup 3978 2018-11-14 00:35 /spark/data/checkpoint-1542098120000.bk
    5. -rw-r--r-- 1 ÃÎcandybear supergroup 3975 2018-11-14 00:35 /spark/data/checkpoint-1542098125000
    6. -rw-r--r-- 1 ÃÎcandybear supergroup 3979 2018-11-14 00:35 /spark/data/checkpoint-1542098125000.bk
    7. -rw-r--r-- 1 ÃÎcandybear supergroup 3975 2018-11-14 00:35 /spark/data/checkpoint-1542098130000
    8. -rw-r--r-- 1 ÃÎcandybear supergroup 3979 2018-11-14 00:35 /spark/data/checkpoint-1542098130000.bk
    9. -rw-r--r-- 1 ÃÎcandybear supergroup 4037 2018-11-14 00:35 /spark/data/checkpoint-1542098135000
    10. -rw-r--r-- 1 ÃÎcandybear supergroup 3979 2018-11-14 00:35 /spark/data/checkpoint-1542098135000.bk
    11. -rw-r--r-- 1 ÃÎcandybear supergroup 4043 2018-11-14 00:35 /spark/data/checkpoint-1542098140000
    12. -rw-r--r-- 1 ÃÎcandybear supergroup 4047 2018-11-14 00:35 /spark/data/checkpoint-1542098140000.bk
    13. drwxr-xr-x - ÃÎcandybear supergroup 0 2018-11-14 00:35 /spark/data/d55f3470-753c-4735-9b18-1b2c75f3a300
    14. drwxr-xrwx - ÃÎcandybear supergroup 0 2018-11-14 00:34 /spark/data/receivedBlockMetadata

    那么updateStateByKey和mapWithState产生这些小文件应该怎么处理或者怎么规避产生这么多的小文件呢?
    其实解决办法很简单,想要统计从某个时间段内的数据,可以不使用这两个算子,每个批次的数据处理之后在后面附上一个处理时间,然后保存到数据库比如MySQL中,等需要的时候,再取出历史数据进行统计,这样就从源头上避免了小文件的产生,数据库保存格式如下:

    1. +---------+-------+---------------+
    2. | word | count | timestamp |
    3. +---------+-------+---------------+
    4. | hive | 1 | 1542098135000 |
    5. | spark | 2 | 1542098135000 |
    6. | hadoop | 1 | 1542098135000 |
    7. | hive | 1 | 1542098140000 |
    8. | spark | 2 | 1542098140000 |
    9. | hadoop | 1 | 1542098140000 |
    10. | hive | 1 | 1542098145000 |
    11. | spark | 2 | 1542098145000 |
    12. | hadoop | 1 | 1542098145000 |
    13. +---------+-------+---------------+

    三、updateStateByKey和mapWithState的区别

            updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。

            mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(在生产环境中建议使用这个)。

    四、适用场景

            updateStateByKey可以用来统计历史数据。例如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的访问量等指标。

            mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里的余额信息。

  • 相关阅读:
    HJ86 求最大连续bit数
    缓存和数据库数据一致性解决方案
    项目管理之系统交付
    Python—list 和 dict 的复制
    使用 WSLg 的 vGPU 硬件加速新特性创建重度混合生产环境
    【transformer】ViT
    selenium新版使用find_element/find_elements函数锁定元素(替换原有find_element_by_xx)
    云原生Kubernetes:K8S实用插件和工具
    架构设计 - MySQL 插入数据性能优化策略
    钉钉漏洞通知脚本dingtalkBot—配套Automated_bounty_Hunter
  • 原文地址:https://blog.csdn.net/u010147215/article/details/126308507