在跑调度任务时候,有时候子任务需要依赖前置任务的输出,但类似读取 Parquet 或者 Orc 文件时,如果不判断目录是否为空,在输出为空时会报错,所以需要 check 一下,此外Hadoop通常在写入数据时会在目录中生成一个名为_SUCCESS的文件来表示写入操作已成功完成,我们在检测时要排除这个文件
- from py4j.java_gateway import java_import
- from pyspark.sql import SparkSession
-
- # 初始化SparkSession
- spark = SparkSession.builder.appName("Example").getOrCreate()
-
- # 导入Hadoop FileSystem类
- java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
- java_import(spark._jvm, 'org.apache.hadoop.fs.FileSystem')
-
- # 定义要检查的路径
- FEATURE_OUTPUT_PATH = "your_path_here"
-
- # 获取Hadoop Configuration
- hadoop_conf = spark._jsc.hadoopConfiguration()
-
- # 获取FileSystem对象
- fs = spark._jvm.FileSystem.get(hadoop_conf)
-
- # 检查路径是否存在
- path = spark._jvm.Path(FEATURE_OUTPUT_PATH)
-
- if fs.exists(path):
- # 获取目录下所有的文件和子目录
- status_list = fs.listStatus(path)
- non_success_files = [file_status.getPath().getName() for file_status in status_list if
- file_status.getPath().getName() != "_SUCCESS"]
-
- # 检查除_SUCCESS文件外是否还有其他文件
- if non_success_files:
- # 读取Parquet文件
- table = spark.read.format('parquet').option('header', 'true').load(FEATURE_OUTPUT_PATH)
- else:
- print("The directory is empty or only contains a _SUCCESS file.")
- else:
- print("The path does not exist.")
注意这段脚本能使用的前提是,执行的机器上已经安装和配置了 HDFS 的 shell 命令
- import subprocess
-
- out=subprocess.check_output("hadoop fs -ls /tmp/file.txt",shell=True)
-
- out=out.strip()
-
- out=out.split("\n")
-
- for l in out:
-
- if l.endswith(".txt"):
-
- print "file exit"
- else:
- print "file not exit"