updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
示例:
- package cn.yy.streaming
-
- import org.apache.spark.SparkConf
- import org.apache.spark.storage.StorageLevel
- import org.apache.spark.streaming.dstream.DStream
- import org.apache.spark.streaming.{Seconds, StreamingContext}
-
- /**
- * 需求:统计整个流中所有出现的单词数量,而不是一个批中的数量
- * 可以使用状态来记录中间结果, 从而每次来一批数据, 计算后和中间状态求和, 于是就完成了总数的统计
- * 使用 updateStateByKey 可以做到这件事---updateStateByKey 会将中间状态存入 CheckPoint 中
- */
- object GlobalWordCount {
- def main(args: Array[String]): Unit = {
- // 1. 创建 Context
- val conf = new SparkConf().setAppName("updateStateBykey").setMaster("local[6]")
- val ssc = new StreamingContext(conf, Seconds(1))
- ssc.sparkContext.setLogLevel("WARN")
-
- // 2. 读取数据生成 DStream
- val source = ssc.socketTextStream(
- hostname = "192.168.100.100",
- port = 9999,
- storageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
- )
-
- // 3. 词频统计
- val wordsTuple: DStream[(String, Int)] = source.flatMap(_.split(" ")).map((_, 1))
-
- // 4. 全局聚合---使用 updateStateByKey 必须设置 Checkpoint 目录
- ssc.checkpoint("hdfs://192.168.100.100:9000/spark/data")
-
- def updateFunc(newValue: Seq[Int], runningValue: Option[Int]): Option[Int] = {
- // newValue : 对应当前批次中 Key 对应的所有 Value---newValue 之所以是一个 Seq, 是因为它是某一个 Batch 的某个 Key 的全部 Value
- // runningValue : 当前的中间结果
- val currBatchValue = newValue.sum
- val state = runningValue.getOrElse(0) + currBatchValue
- //返回的这个 Some(state) 会再次进入 Checkpoint 中当作状态存储
- Some(state)
- }
-
- val result: DStream[(String, Int)] = wordsTuple.updateStateByKey[Int](updateFunc _)
-
- // 5. 输出
- result.print()
- ssc.start()
- ssc.awaitTermination()
- }
- }
输入:
- [hadoop@hadoop000 ~]$ nc -lk 9999
- huluwa huluwa dawa erwa
spark计算结果:
- 18/11/13 14:19:00 INFO CheckpointWriter: Submitted checkpoint of time 1542089940000 ms to writer queue
- 18/11/13 14:19:00 INFO CheckpointWriter: Saving checkpoint for time 1542089940000 ms to file 'hdfs://192.168.137.251:9000/spark/data/checkpoint-1542089940000'
- -------------------------------------------
- Time: 1542089940000 ms
- -------------------------------------------
- (huluwa,2)
- (dawa,1)
- (erwa,1)
- ...
- ...
- ...
- -------------------------------------------
- Time: 1542089945000 ms
- -------------------------------------------
- (huluwa,2)
- (dawa,1)
- (erwa,1)
- ...
- ...
- ...
- -------------------------------------------
- Time: 1542089950000 ms
- -------------------------------------------
- (huluwa,2)
- (dawa,1)
- (erwa,1)
如果不输入新的数据,会一直展示之前的结果
再输入一条新的数据:
- [hadoop@hadoop000 ~]$ nc -lk 9999
- huluwa huluwa dawa erwa
- huluwa huluwa dae erwa
输出结果:
- -------------------------------------------
- Time: 1542089970000 ms
- -------------------------------------------
- (dae,1)
- (huluwa,4)
- (dawa,1)
- (erwa,2)
继续输入数据:
- [hadoop@hadoop000 ~]$ nc -lk 9999
- huluwa huluwa dawa erwa
- huluwa huluwa dae erwa
- huluwa huluwa dawa erwa
- huluwa huluwa dawa erwa
- huluwa huluwa dawa erwa
- huluwa huluwa dae erwa
- huluwa huluwa dae erwa
结果:
- -------------------------------------------
- Time: 1542090080000 ms
- -------------------------------------------
- (dae,3)
- (huluwa,14)
- (dawa,4)
- (erwa,7)
查看checkpoint文件夹下,发现有很多类似于checkpoint-1542090065000的状态文件
- [hadoop@hadoop000 data]$ hadoop fs -ls /spark/data
- Found 15 items
- -rw-r--r-- 1 ÃÎcandybear supergroup 3514 2018-11-13 22:21 /spark/data/checkpoint-1542090065000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090065000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 3511 2018-11-13 22:21 /spark/data/checkpoint-1542090070000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090070000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 3514 2018-11-13 22:21 /spark/data/checkpoint-1542090075000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090075000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 3511 2018-11-13 22:21 /spark/data/checkpoint-1542090080000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3518 2018-11-13 22:21 /spark/data/checkpoint-1542090080000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 3512 2018-11-13 22:21 /spark/data/checkpoint-1542090085000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3516 2018-11-13 22:21 /spark/data/checkpoint-1542090085000.bk
- drwxr-xr-x - ÃÎcandybear supergroup 0 2018-11-13 22:21 /spark/data/e02daf3a-0805-4612-b612-67ca34d32ff8
- drwxr-xrwx - ÃÎcandybear supergroup 0 2018-11-13 22:21 /spark/data/receivedBlockMetadata
这些checkpoint文件都是小文件,对hdfs的压力很大,怎么解决呢?下文会讲
附:updateStateByKey源码(1.6版本之前用这个)
- /**
- * Return a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of each key.
- * In every batch the updateFunc will be called for each state even if there are no new values.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
- * @param updateFunc State update function. If `this` function returns None, then
- * corresponding state key-value pair will be eliminated.
- * @tparam S State type
- */
- def updateStateByKey[S: ClassTag](
- updateFunc: (Seq[V], Option[S]) => Option[S]
- ): DStream[(K, S)] = ssc.withScope {
- updateStateByKey(updateFunc, defaultPartitioner())
- }
1.6版本之后用mapWithState
- /**
- * :: Experimental ::
- * Return a [[MapWithStateDStream]] by applying a function to every key-value element of
- * `this` stream, while maintaining some state data for each unique key. The mapping function
- * and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this
- * transformation can be specified using `StateSpec` class. The state data is accessible in
- * as a parameter of type `State` in the mapping function.
- *
- * Example of using `mapWithState`:
- * {{{
- * // A mapping function that maintains an integer state and return a String
- * def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = {
- * // Use state.exists(), state.get(), state.update() and state.remove()
- * // to manage state, and return the necessary string
- * }
- *
- * val spec = StateSpec.function(mappingFunction).numPartitions(10)
- *
- * val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec)
- * }}}
- *
- * @param spec Specification of this transformation
- * @tparam StateType Class type of the state data
- * @tparam MappedType Class type of the mapped data
- */
- @Experimental
- def mapWithState[StateType: ClassTag, MappedType: ClassTag](
- spec: StateSpec[K, V, StateType, MappedType]
- ): MapWithStateDStream[K, V, StateType, MappedType] = {
- new MapWithStateDStreamImpl[K, V, StateType, MappedType](
- self,
- spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]]
- )
- }
mapWithState也会统计全局的key的状态,但是如果没有数据输入,便不会返回之前的key的状态,类似于增量的感觉。
需要自己写一个匿名函数func来实现自己想要的功能。如果有初始化的值的需要,可以使用initialState(RDD)来初始化key的值。 另外,还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在func中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。
mapWithState它会按照时间线在每一个批次间隔返回之前的发生改变的或者新的key的状态,不发生变化的不返回。同时mapWithState可以不用设置checkpoint,返回的数据量少,性能和效率都比updateStateByKey好。
示例:
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}
-
- object MapWithStateApp {
- def main(args: Array[String]): Unit = {
- val conf = new SparkConf().setMaster("local[2]").setAppName("MapWithStateApp")
- val ssc = new StreamingContext(conf,Seconds(5))
- ssc.checkpoint("hdfs://192.168.100.100:9000/spark/data")
- val lines = ssc.socketTextStream("hadoop000",9999)
- val words = lines.flatMap(_.split(" "))
- val pairs = words.map(x=>(x,1)).reduceByKey(_+_)
- // Update the cumulative count using mapWithState
- // This will give a DStream made of state (which is the cumulative count of the words)
- val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => {
- val sum = one.getOrElse(0) + state.getOption.getOrElse(0)
- val output = (word, sum)
- state.update(sum)
- output
- }
-
- val wordcounts = pairs.mapWithState(StateSpec.function(mappingFunc).timeout(Minutes(60)))
- wordcounts.print()
- ssc.start()
- ssc.awaitTermination()
- }
- }
mapWithState接收的参数是一个StateSpec对象,在StateSpec中封装了状态管理的函数。我们定义了一个状态更新函数mappingFunc,该函数会更新指定用户的状态,同时会返回更新后的状态,将该函数传给mapWithState,并设置状态超时时间。SparkStreaming通过根据我们定义的更新函数,在每个计算时间间隔内更新内部维护的状态,同时返回经过mappingFunc处理后的结果数据流。
在控制台输入一条数据:
- [hadoop@hadoop000 ~]$ nc -lk 9999
- hadoop spark spark hive
spark输出结果:
- -------------------------------------------
- Time: 1542098100000 ms
- -------------------------------------------
- (hive,1)
- (spark,2)
- (hadoop,1)
再输入两条数据:
- [hadoop@hadoop000 ~]$ nc -lk 9999
- hadoop spark spark hive
- hadoop spark spark hive
- hadoop spark spark hive
输出:
- -------------------------------------------
- Time: 1542098115000 ms
- -------------------------------------------
- (hive,3)
- (spark,6)
- (hadoop,3)
下面输入一条新的数据:
- [hadoop@hadoop000 ~]$ nc -lk 9999
- hadoop spark spark hive
- hadoop spark spark hive
- hadoop spark spark hive
- huluwa huluwa dawa erwa
发现spark计算结果只展示与输入数据相匹配的结果:
- -------------------------------------------
- Time: 1542098120000 ms
- -------------------------------------------
- (huluwa,2)
- (dawa,1)
- (erwa,1)
- [hadoop@hadoop000 ~]$ nc -lk 9999
- hadoop spark spark hive
- hadoop spark spark hive
- hadoop spark spark hive
- huluwa huluwa dawa erwa
- huluwa huluwa dawa erwa
- -------------------------------------------
- Time: 1542098125000 ms
- -------------------------------------------
- (huluwa,4)
- (dawa,2)
- (erwa,2)
那之前的计算结果是否还存在?验证一下:
- [hadoop@hadoop000 ~]$ nc -lk 9999
- hadoop spark spark hive
- hadoop spark spark hive
- hadoop spark spark hive
- huluwa huluwa dawa erwa
- huluwa huluwa dawa erwa
- hadoop spark spark hive
从输出结果可以看到,之前的统计结果还存在,只是选择性的展示出来:
- -------------------------------------------
- Time: 1542098135000 ms
- -------------------------------------------
- (hive,4)
- (spark,8)
- (hadoop,4)
打开checkpoint目录,和updateStateByKey一样,有很多checkpoint-时间戳的小文件存在
- [hadoop@hadoop000 data]$ hadoop fs -ls /spark/data
- Found 12 items
- -rw-r--r-- 1 ÃÎcandybear supergroup 3974 2018-11-14 00:35 /spark/data/checkpoint-1542098120000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3978 2018-11-14 00:35 /spark/data/checkpoint-1542098120000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 3975 2018-11-14 00:35 /spark/data/checkpoint-1542098125000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3979 2018-11-14 00:35 /spark/data/checkpoint-1542098125000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 3975 2018-11-14 00:35 /spark/data/checkpoint-1542098130000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3979 2018-11-14 00:35 /spark/data/checkpoint-1542098130000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 4037 2018-11-14 00:35 /spark/data/checkpoint-1542098135000
- -rw-r--r-- 1 ÃÎcandybear supergroup 3979 2018-11-14 00:35 /spark/data/checkpoint-1542098135000.bk
- -rw-r--r-- 1 ÃÎcandybear supergroup 4043 2018-11-14 00:35 /spark/data/checkpoint-1542098140000
- -rw-r--r-- 1 ÃÎcandybear supergroup 4047 2018-11-14 00:35 /spark/data/checkpoint-1542098140000.bk
- drwxr-xr-x - ÃÎcandybear supergroup 0 2018-11-14 00:35 /spark/data/d55f3470-753c-4735-9b18-1b2c75f3a300
- drwxr-xrwx - ÃÎcandybear supergroup 0 2018-11-14 00:34 /spark/data/receivedBlockMetadata
那么updateStateByKey和mapWithState产生这些小文件应该怎么处理或者怎么规避产生这么多的小文件呢?
其实解决办法很简单,想要统计从某个时间段内的数据,可以不使用这两个算子,每个批次的数据处理之后在后面附上一个处理时间,然后保存到数据库比如MySQL中,等需要的时候,再取出历史数据进行统计,这样就从源头上避免了小文件的产生,数据库保存格式如下:
- +---------+-------+---------------+
- | word | count | timestamp |
- +---------+-------+---------------+
- | hive | 1 | 1542098135000 |
- | spark | 2 | 1542098135000 |
- | hadoop | 1 | 1542098135000 |
- | hive | 1 | 1542098140000 |
- | spark | 2 | 1542098140000 |
- | hadoop | 1 | 1542098140000 |
- | hive | 1 | 1542098145000 |
- | spark | 2 | 1542098145000 |
- | hadoop | 1 | 1542098145000 |
- +---------+-------+---------------+
updateStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
mapWithState只返回变化后的key的值,这样做的好处是,我们可以只是关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key的数据。这样的话,即使数据量很大,checkpoint也不会像updateStateByKey那样,占用太多的存储,效率比较高(在生产环境中建议使用这个)。
updateStateByKey可以用来统计历史数据。例如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的访问量等指标。
mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里的余额信息。