• Flink - 读取 Parquet 文件 By Scala / Java


    一.引言

    parquet 文件常见与 Flink、Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面介绍 Flink 场景下如何读取 Parquet。Parquet 相关知识可以参考:Spark - 一文搞懂 parquet

    二.Parquet Read By Scala

    1.依赖准备与环境初始化

    1. import org.apache.hadoop.fs.FileSystem
    2. import org.apache.flink.formats.parquet.ParquetRowInputFormat
    3. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    4. import org.apache.flink.streaming.api.scala._
    5. import org.apache.parquet.hadoop.ParquetFileReader
    6. import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
    7. import org.apache.parquet.schema.Type.Repetition
    8. import org.apache.parquet.schema.{MessageType, PrimitiveType, Type}

    Flink 读取 parquet 除了正常 Flink 环境相关依赖外,还需要加载单独的 Parquet 组件:

    1. <dependency>
    2. <groupId>org.apache.parquetgroupId>
    3. <artifactId>parquet-avroartifactId>
    4. <version>1.10.0version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.apache.flinkgroupId>
    8. <artifactId>flink-parquet_2.12artifactId>
    9. <version>1.9.0version>
    10. dependency>

    本文基于 Flink-1.13.1 + scala-2.12.8 + hadoop-2.6.0 的运行环境,不同版本下可能需要更换上述 parquet 相关依赖。下面初始化 Flink ExecutionEnvironment,因为流式处理的原因,这里初始化环境类型为 Stream:

        val env = StreamExecutionEnvironment.getExecutionEnvironment

    2.推断 Schem 读取 Parquet

    parquet 通过列式存储数据,所以需要 schema 标定每一列的数据类型与名称,与 Spark 类似, Flink 也可以通过 Parquet 文件推断其对应 schema 并读取 Parquet。

    1. def readParquetWithInferSchema(env: StreamExecutionEnvironment): Unit = {
    2. val filePath = "./test.parquet"
    3. val configuration = new org.apache.hadoop.conf.Configuration(true)
    4. val parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(filePath))
    5. val schema: MessageType = parquetFileReader.getFileMetaData.getSchema
    6. println(s"Schema: $schema")
    7. val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(configuration)
    8. val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1)
    9. rowData.map(row => {
    10. val source = row.getField(1)
    11. val flag = row.getField(35)
    12. source + "\t" + flag
    13. }).setParallelism(1).print()
    14. }

    通过 parquetFileReader 获取元数据 MetaData 并获取 parquet 对应 schema,最终通过 env.readFile 方法指定 InputFormat 为 ParquetRowInputFormat 读取 parquet 文件,先看一下打印出来的 schema 形式:

    由于读取的 parquet 为 SparkSession 生成,所以列名采用了 Spark 的默认形式 _c1,_c2 ...

        env.execute("ReadParquet")

    调用执行方法运行上述 print demo 打印最终结果。

    Tips:

    这里的 Row 类型为 org.apache.flink.types.Row 而不再是 org.apache.spark.sql.Row,获取元素的方法也不再是 row.getString 或其他,而是采用 getFiled 传入 position 或者 列名 得到,索引从 0 开始。

    3.指定 schema 读取 Parquet

    除了 infer 推理得到 schema 外,读取也支持自定义 schema,与 spark 类似,这里也提供了 PrimitiveType 指定每一列的数据类型,并合并为 MessageType 得到最终的 schema。

    1. def readParquetWithAssignSchema(env: StreamExecutionEnvironment): Unit = {
    2. val filePath = "./test.parquet"
    3. val id = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c0")
    4. val source = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c1")
    5. val flag = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c35")
    6. val typeArray = Array(id, source, flag)
    7. val typeListAsJava = java.util.Arrays.asList(typeArray: _*).asInstanceOf[java.util.List[Type]]
    8. val schema = new MessageType("schema", typeListAsJava)
    9. println(schema)
    10. val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1)
    11. rowData.map(row => {
    12. val source = row.getField(1)
    13. val flag = row.getField(2)
    14. source + "\t" + flag
    15. }).setParallelism(1).print()
    16. }

    上面读取的 test.parquet 有 40+ col,这里只读取第 1,2,35 列,所以单独指定 id,source,flag 三列生成 PrimitiveType 并添加至 MessageType 形成 schema,由于 MessageType 为 Java 参数,所以需要通过 asList + asInstance 进行转化,看一下当前的 schema 情况:

        env.execute("ReadParquet")

    调用执行方法执行上述 print 逻辑即可。

    Tips:

    这里列名给出了 _c0, _c1,_c35,但是读取是 position 索引只能选取 0,1,2,因为 schema 数量决定了读取 Row 的列数,而 schema 的列名决定了读取的内容,在该 schema 基础下读取 getField(35) 会报数组越界  java.lang.ArrayIndexOutOfBoundsException: 

    三. Parquet Read By Java

    java 读取与 scala 大同小异,主要差别是 map 变为 MapFunction,这里直接贴完整函数方法:

    1. import org.apache.flink.api.common.functions.FilterFunction;
    2. import org.apache.flink.api.common.functions.MapFunction;
    3. import org.apache.flink.formats.parquet.ParquetRowInputFormat;
    4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    5. import org.apache.flink.types.Row;
    6. import org.apache.hadoop.conf.Configuration;
    7. import org.apache.hadoop.fs.FileSystem;
    8. import org.apache.parquet.hadoop.ParquetFileReader;
    9. import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    10. import org.apache.parquet.schema.MessageType;
    11. /**
    12. * @title: ReadParquetByJava
    13. * @Author DDD
    14. * @Date: 2022/7/21 8:36 上午
    15. * @Version 1.0
    16. */
    17. public class ReadParquetByJava {
    18. public static void main(String[] args) throws Exception {
    19. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    20. String path = "./test.parquet";
    21. Configuration configuration = new org.apache.hadoop.conf.Configuration(true);
    22. FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(configuration);
    23. ParquetMetadata parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(path));
    24. MessageType schema = parquetFileReader.getFileMetaData().getSchema();
    25. System.out.println("-----Schema-----");
    26. System.out.println(schema);
    27. env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path)
    28. .setParallelism(1)
    29. .map(new MapFunction() {
    30. @Override
    31. public String map(Row row) throws Exception {
    32. try {
    33. String source = String.valueOf(row.getField(1));
    34. String flag = String.valueOf(row.getField(35));
    35. return source + "\t" + flag;
    36. } catch (Exception e) {
    37. e.printStackTrace();
    38. return null;
    39. }
    40. }
    41. }).print();
    42. env.execute("ReadParquetByJava");
    43. }
    44. }

    四.总结

    Parquet 通过其列式存储与空间压缩应用于多种大数据场景,上面给出了 parquet 文件转 DataStream 的两种方式,同理也可以使用 DataSet 加载为静态数据,上面两个方法都给出了 hdfs: FileSystem 变量但都没有使用,下面说下使用场景:

    一般分布式任务读取时对应的 parquet 文件不是一个而是多个,所以需要从目标目录中找出第一个合法的 parquet 文件供 ParquetFileReader 解析对应的 schema,hdfs 的任务就是通过目标路径获取第一个合法文件使用。

    1. def getFirstFilePath(hdfsPath: String, hdfs: FileSystem): String = {
    2. val files = hdfs.listFiles(new org.apache.hadoop.fs.Path(hdfsPath), false)
    3. var flag = true
    4. var firstFile = ""
    5. while (flag) {
    6. if (files.hasNext) {
    7. firstFile = files.next().getPath.getName
    8. if (!firstFile.equalsIgnoreCase(s"_SUCCESS")
    9. && !firstFile.startsWith(".")
    10. && firstFile.endsWith(".parquet")) {
    11. flag = false
    12. }
    13. } else {
    14. flag = false
    15. }
    16. }
    17. hdfsPath + "/" + firstFile
    18. }

    合法的判断需要三个条件:

    A.不包含 _SUCCESS

    B.不以 '.' 开头

    C.以 '.parquet' 结尾

  • 相关阅读:
    奇异矩阵与非奇异矩阵(广义逆)
    Golang第二章:程序结构
    商铺分类介绍
    AOP之Java动态代理
    Jenkins远程构建项目超时的问题
    java--拼图游戏
    06-nginx缓存集成
    jenkins配置gitlab凭据
    Android移动安全攻防实战 第二章
    不用Swagger,那我用啥?
  • 原文地址:https://blog.csdn.net/BIT_666/article/details/125905038