首先我们要知道:
foreachRDD 作用于 DStream中每一个时间间隔的 RDD
foreachPartition 作用于每一个时间间隔的RDD中的每一个 partition
foreach 作用于每一个时间间隔的 RDD 中的每一个元素。
foreachRDD是作用在driver端的一种最常见的输出方式,而其他都作用在executor端。
所以注意!在driver端建立类似连接的话(或者想作为共享变量的对象)
rdd是获取不到的!这就是为什么我在向hbase写DStream的时候迟迟写不进去的原因!
举例:
- //错误写法
- dstream.foreachRDD { rdd =>
- val connection = createNewConnection() // executed at the driver
- rdd.foreach { record =>
- connection.send(record) // executed at the worker
- }
- }
当然 我们接下来肯定是想着把连接写到foreach里面解决问题的,但是这样容易炸!
因为每次遍历RDD的时候都会产生一个连接 创建连接和关闭连接都很频繁 造成系统不必要的开销
于是我们就用foreachPartition解决问题!
- // 使用foreachPartitoin来减少连接的创建,RDD的每个partition创建一个链接
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- val connection = createNewConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- connection.close()
- }
- }
还有优化手段 因为分区过多的话连接数也会变多 于是还可以用线程池
- // 使用静态连接池,可以增加连接的复用、减少连接的创建和关闭。
- dstream.foreachRDD { rdd =>
- rdd.foreachPartition { partitionOfRecords =>
- // ConnectionPool is a static, lazily initialized pool of connections
- val connection = ConnectionPool.getConnection()
- partitionOfRecords.foreach(record => connection.send(record))
- ConnectionPool.returnConnection(connection) // return to the pool for future reuse
- }
- }