目录
目录
做了一个操作,发现数据不均衡:
sp_bp_pic_df = chengren_sp_bp_link_df.select("pic_large_url_list", "pic").dropDuplicates(["pic_large_url_list", "pic"]).repartition(300).cache()

要确认 Spark 任务在使用 repartition 对数据进行了重新分区后,任务输入数据大小仍存在不均衡,可以通过以下步骤进行验证和分析:
使用 RDD.glom() 方法可以查看每个分区的数据量。glom() 方法将每个分区的数据转换为一个列表,从而可以检查每个分区的大小。
- # 获取每个分区的大小
- partition_sizes = sp_bp_pic_df.rdd.glom().map(len).collect()
-
- # 打印每个分区的大小
- for i, size in enumerate(partition_sizes):
- print(f"Partition {i}: {size} records")
DataFrame API 检查分区可以通过 DataFrame API 来检查每个分区的数据量,确保数据分布均匀。
- # 获取每个分区的大小
- partition_sizes = sp_bp_pic_df.rdd.mapPartitionsWithIndex(
- lambda idx, it: [(idx, len(list(it)))], preservesPartitioning=True).collect()
-
- # 打印每个分区的大小
- for idx, size in partition_sizes:
- print(f"Partition {idx}: {size} records")
在运行 Spark 作业时,可以通过 Spark UI 查看每个任务的输入数据大小和分区情况。
打开 Spark UI:
http://localhost:4040 查看 Spark UI。查看 Stages 页面:
查看 Tasks 页面:
可以在代码中添加日志记录,输出每个分区的数据量,以便在日志中查看分区情况。
- import logging
-
- # 配置日志
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
-
- # 获取每个分区的大小并记录日志
- partition_sizes = sp_bp_pic_df.rdd.mapPartitionsWithIndex(
- lambda idx, it: [(idx, len(list(it)))], preservesPartitioning=True).collect()
-
- for idx, size in partition_sizes:
- logger.info(f"Partition {idx}: {size} records")
尽管您已经使用 repartition 对数据进行了重新分区,但任务输入数据大小仍存在不均衡的情况。这可能由以下几个原因引起:
即使在重新分区后,数据分布仍可能不均衡。例如,如果某些键的频率远高于其他键,重新分区也可能无法完全平衡数据。
repartition 使用随机分区策略,可能会导致某些分区的数据量仍然较大。如果数据本身存在倾斜,随机分区可能无法解决问题。
在 repartition 之前进行的操作(如 dropDuplicates)可能会影响数据的分布,导致某些分区的数据量较大。
为了进一步优化数据分布,可以尝试以下几种方法:
首先,检查数据的分布情况,以确定是否存在数据倾斜。可以通过以下代码查看每个分区的数据量:
- from pyspark.sql import Row
-
- # 获取每个分区的大小
- partition_sizes = sp_bp_pic_df.rdd.glom().map(len).collect()
-
- # 打印每个分区的大小
- for i, size in enumerate(partition_sizes):
- print(f"Partition {i}: {size} records")
coalesce 减少分区如果数据量较小,可以尝试使用 coalesce 减少分区数量,以提高数据的均匀分布:
sp_bp_pic_df = sp_bp_pic_df.coalesce(100).cache()
如果数据存在显著的倾斜,可以使用自定义分区器来更好地平衡数据。例如,可以使用 hash 分区器:
- from pyspark.sql.functions import hash
-
- # 添加一个分区键
- sp_bp_pic_df = sp_bp_pic_df.withColumn("partition_key", hash(col("pic_large_url_list")))
-
- # 按分区键进行重新分区
- sp_bp_pic_df = sp_bp_pic_df.repartition(300, "partition_key").drop("partition_key").cache()
在进行 repartition 之前,尽量减少不必要的操作(如 dropDuplicates)对数据分布的影响。可以在重新分区后进行这些操作:
# 先重新分区,再去重 sp_bp_pic_df = chengren_sp_bp_link_df.repartition(300, "pic_large_url_list", "pic").dropDuplicates(["pic_large_url_list", "pic"]).cache()
尽管已经使用 repartition 进行了分区,但任务输入数据大小仍可能不均衡。通过检查数据分布、使用自定义分区器、优化数据预处理等方法,可以进一步优化数据分布,减少任务输入数据大小的不均衡,提高作业的整体性能和效率。