• SparkStreaming的foreachPartition理解


    首先我们要知道:

    foreachRDD 作用于 DStream中每一个时间间隔的 RDD

    foreachPartition 作用于每一个时间间隔的RDD中的每一个 partition

    foreach 作用于每一个时间间隔的 RDD 中的每一个元素。

    foreachRDD是作用在driver端的一种最常见的输出方式,而其他都作用在executor端。

    所以注意!在driver端建立类似连接的话(或者想作为共享变量的对象)

    rdd是获取不到的!这就是为什么我在向hbase写DStream的时候迟迟写不进去的原因!

    举例:

    1. //错误写法
    2. dstream.foreachRDD { rdd =>
    3. val connection = createNewConnection() // executed at the driver
    4. rdd.foreach { record =>
    5. connection.send(record) // executed at the worker
    6. }
    7. }

    当然 我们接下来肯定是想着把连接写到foreach里面解决问题的,但是这样容易炸!

    因为每次遍历RDD的时候都会产生一个连接 创建连接和关闭连接都很频繁 造成系统不必要的开销

    于是我们就用foreachPartition解决问题!

    1. // 使用foreachPartitoin来减少连接的创建,RDD的每个partition创建一个链接
    2. dstream.foreachRDD { rdd =>
    3. rdd.foreachPartition { partitionOfRecords =>
    4. val connection = createNewConnection()
    5. partitionOfRecords.foreach(record => connection.send(record))
    6. connection.close()
    7. }
    8. }

    还有优化手段 因为分区过多的话连接数也会变多 于是还可以用线程池

    1. // 使用静态连接池,可以增加连接的复用、减少连接的创建和关闭。
    2. dstream.foreachRDD { rdd =>
    3. rdd.foreachPartition { partitionOfRecords =>
    4. // ConnectionPool is a static, lazily initialized pool of connections
    5. val connection = ConnectionPool.getConnection()
    6. partitionOfRecords.foreach(record => connection.send(record))
    7. ConnectionPool.returnConnection(connection) // return to the pool for future reuse
    8. }
    9. }

  • 相关阅读:
    AutoJs学习-实现区域截图+文字识别+摇一摇截图+截图查看器
    Docker安装Seata
    nuc980学习笔记6 - 手把手教 源码编译和YAFFS2文件系统制作
    Data Guard archive gap检测及解决方案
    如何实现一个IO口读取多个设备信息
    王道操作系统___第三章02
    leetcode:67. 二进制求和
    springcloud之Eureka
    USACO美国信息学奥赛竞赛12月份开赛,中国学生备赛指南
    liunx的攻击
  • 原文地址:https://blog.csdn.net/weixin_51981189/article/details/128136812