• pySpark,执行算子show(1)导致并行度不对的问题


    spark里,一个action算子会生成一个job,一个重分区算子形成宽依赖会划分出前后两个stage。

    spark会先读取代码然后生成执行计划,这个过程中,每个rdd和dataframe的分区数会计算出来,在driver端print的内容也会打印出来(代码日志),等于把整个spark代码加载的一个过程。

    然后再执行转换算子里的操作,操作里的打印内容在executor端打印出来,

    最后执行action算子的操作,把action算子的结果在driver端(代码日志)打印出来。

    1. def read_hive_table(self, start=-1, length=1):
    2. 读取hdfs路径生成rdd转df
    3. print "1. df的分区个数:", task_df.rdd.getNumPartitions()
    4. df.show(1)
    5. return df
    6. def process_detour_reason(self, data_df):
    7. print "2. df的分区个数:", data_df.rdd.getNumPartitions()
    8. 定义udf1
    9. data_df_1 = data_df.repartition(8).withColumn(XXX, udf1(XXXX))
    10. print "3. df的分区个数:", data_df_2.rdd.getNumPartitions()
    11. data_df_1.show(1)
    12. def process(self):
    13. table_data_df = self.read_hive_table(-1, 1)
    14. table_result_df_1 = self.process_detour_reason(table_data_df)
    15. self.fs.write_fs_for_df(table_result_df_2, "XXXX")

    当执行到函数process_detour_reason时,会发现打印内容如下:

    1. 2. df的分区个数: 10
    2. 3. df的分区个数: 8
    3. ----process_detour_reason函数任务执行中-----

    此时正在执行process_detour_reason函数,df的每一条数据在调用udf进行处理,但是发现此时task并未按照预期的开8个分区并行计算,而是只运行了1个task。

    原因如下:

    spark中一个action算子触发一个job,会将整个job里的转换算子全部执行一遍(数据转换),最后通过action算子进行数据输出到driver端;

    也就是说,因为有action算子show(1)的存在,df的repartition和withColumn这两个数据转换算子会得以生效,但是show(1)比较特殊,因为其只需要返回1条数据,所以内部做了优化,只会计算一个分区的数据,然后返回一条到driver端,这也就是为什么在spark的ui中看到这一步骤只有一个task在执行;

    因为在这个job里,8个分区,只会执行1个分区的数据,计算完毕后,通过show(1)返回给driver端;

    那剩下7个分区的数据什么时候执行呢?在最后saveAs将df的数据存储下来时,由于save也是一个action算子,所以触发save算子时,会执行剩下7个分区的udf计算。

    总结:

    1. 一般一个spark代码里,除非涉及数据的临时保存,需要用到persist、cache等,只需要在代码结尾处设置一个action算子即可,这样action算子之前的所有转换操作都会得到执行;

    2. spark代码的预加载机制,会在生成dag图时,把代码里的print执行出来,所以并不是print1,udf执行,再print2;而是print1,print2,加载完毕dag生成,udf执行;

    3. rdd转dataframe后,分区数不变;

    4. df.repartition().转换算子,会先重分区,再执行转换算子;

    5. repartition()用于增加或减少分区数,coalesc()只用于减少分区数;

    6. show(1)一般用于代码调试,实际运算时,建议用count()作为执行算子,这样不会对重分区造成影响;

    7. 查询df的分区数:df.rdd.getNumPartitions()

    spark代码的执行过程:

    先从前往后执行,加载代码,遇到action算子后,生成一个job,然后从后往前划分stage,然后再从前往后执行。

  • 相关阅读:
    EDI系统如何恢复历史映射关系?
    搜维尔科技: 使用 Xsens 和 HTC Vive进行电影制作案例
    MYSQL高可用架构之MHA实战(真实可用)
    Go语言 错误处理
    JEE.XML配置文件
    微信小程序基础学习(5):使用 npm包、全局数据共享、分包
    二氧化钛纳米粒TIO2修饰多肽R8/CTT2/CCK8/GE11/cTAT/CPP/RVG29/SP94(无机纳米粒子偶联多肽)
    为什么 Go 语言 struct 要使用 tags
    ps2021神经ai滤镜无法使用,ps2021神经滤镜出现错误
    创业合伙必读之:合伙企业登记指南
  • 原文地址:https://blog.csdn.net/wx1528159409/article/details/127534156