• Parquet 文件生成和读取


    一、什么是 Parquet

      Parquet 是一种列式存储格式,用于高效地存储和处理大规模数据集。它被广泛应用于大数据处理和分析场景中,例如 Apache Hadoop、Apache Spark 等。

      与传统的行式存储格式(如CSV和JSON)相比,Parquet 能够显著提高读写性能和存储效率。它将数据按列进行存储,而不是按行存储,这样可以更好地利用存储空间,减少 I/O 开销,并提供更高的压缩比。

    二、Parquet 文件的结构

      在开始解析 Parquet 文件之前,首先需要了解 Parquet 文件的结构。Parquet 文件包含一个 Schema Definition 和一系列的数据块。Schema Definition 定义了数据块中的列和它们的数据类型。每个数据块都包含一些行组(Row Group),每个行组又包含一些行(Row)。行组是数据的最小单位,每个行组都有自己的元数据(Metadata),例如压缩算法、列的偏移量等。

    三、实现 Java 读写 Parquet 的流程

    方式一:依赖于大数据环境

      Maven 依赖:

            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-avroartifactId>
                <version>1.12.0version>
            dependency>
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-clientartifactId>
                <version>3.3.1version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    [root@local~]# vim schema.avsc
    {
            "type": "record",
            "name": "User",
            "fields": [{
                            "name": "field1",
                            "type": "string"
                    }, {
                            "name": "field2",
                            "type": "int"
                    }
            ]
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    import org.apache.avro.Schema;
    import org.apache.avro.generic.GenericData;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.avro.AvroParquetWriter;
    import org.apache.parquet.hadoop.ParquetWriter;
    import org.apache.parquet.hadoop.metadata.CompressionCodecName;
    
    import org.apache.parquet.avro.AvroParquetReader;
    import org.apache.parquet.hadoop.ParquetReader;
    
    import java.io.File;
    import java.io.IOException;
    
    public class WriteToParquet {
    
        public static void main(String[] args) {
            try {
                // 创建Schema对象
                Schema schema = new Schema.Parser().parse(new File("schema.avsc"));
                // 方式二:不需要读文件
                // Schema schema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"field1\",\"type\":\"string\"},{\"name\":\"field2\",\"type\":\"int\"}]}");
    
                // 创建GenericRecord对象
                GenericRecord record = new GenericData.Record(schema);
                record.put("field1", "value1");
                record.put("field2", 123);
    
                // 创建ParquetWriter对象
                ParquetWriter<GenericRecord> writer = AvroParquetWriter.<GenericRecord>builder(new Path("output.parquet"))
                        .withSchema(schema)
                        .withCompressionCodec(CompressionCodecName.SNAPPY)
                        .build();
    
                // 将数据写入Parquet文件
                writer.write(record);
    
                // 关闭ParquetWriter
                writer.close();
    
                // 创建ParquetReader对象
                ParquetReader<GenericRecord> reader = AvroParquetReader.<GenericRecord>builder(new Path("output.parquet"))
                        .build();
    
                // 读取Parquet文件中的数据
                // GenericRecord record;
                while ((record = reader.read()) != null) {
                    // 处理每一条记录
                    System.out.println(record.get("field1"));
                    System.out.println(record.get("field2"));
                }
    
                // 关闭ParquetReader
                reader.close();
            } catch (
                    IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    [root@local~ ]# java -cp /huiq/only-maven-1.0-SNAPSHOT-jar-with-dependencies.jar WriteToParquet
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    log4j:WARN No appenders could be found for logger (org.apache.htrace.core.Tracer).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    value1
    123
    
    [root@local~ ]# ll -a
    -rw-r--r-- 1 root root  51783396 Feb 27 17:45 only-maven-1.0-SNAPSHOT-jar-with-dependencies.jar
    -rw-r--r-- 1 root root       615 Feb 27 17:45 output.parquet
    -rw-r--r-- 1 root root        16 Feb 27 17:45 .output.parquet.crc
    -rw-r--r-- 1 root root       147 Feb 26 17:24 schema.avsc
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    参考:
    java写parquet
    java parquet AvroParquetWriter

    遇到的坑:
    坑1:ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge

      一开始引入的依赖:

            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-avroartifactId>
                <version>1.12.0version>
            dependency>
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-clientartifactId>
                <version>3.0.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

      报错:

    Exception in thread "main" java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonMerge
            at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.(JacksonAnnotationIntrospector.java:50)
            at com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:351)
            at org.apache.avro.Schema.(Schema.java:109)
            at org.apache.avro.Schema$Parser.parse(Schema.java:1413)
            at WriteToParquet.main(WriteToParquet.java:21)
    Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonMerge
            at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
            ... 5 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

      解决:

            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-avroartifactId>
                <version>1.12.0version>
            dependency>
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-clientartifactId>
                <version>3.0.0version>
                <exclusions>
                    <exclusion>
                        <groupId>com.fasterxml.jackson.coregroupId>
                        <artifactId>jackson-annotationsartifactId>
                    exclusion>
                exclusions>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

      原因:我看当引入 hadoop-client 3.3.1 版本的时候 maven 依赖库里是 jackson-annotations-2.11.3.jar,但引入 hadoop-client 3.0.0 版本的时候 maven 依赖库里是 jackson-annotations-2.7.8.jar 执行程序会报上面那个错,于是在 3.0.0 版本中去掉 jackson-annotations 依赖后看 maven 依赖库里就是 jackson-annotations-2.11.3.jar 了。后来测试 jackson-annotations-2.6.7.jar 也正常。

      等第二天又出现了其他报错:

    Exception in thread "main" java.lang.NoClassDefFoundError: com/fasterxml/jackson/annotation/JsonView
    	at com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector.(JacksonAnnotationIntrospector.java:36)
    	at com.fasterxml.jackson.databind.ObjectMapper.(ObjectMapper.java:271)
    	at org.apache.avro.Schema.(Schema.java:109)
    	at org.apache.avro.Schema$Parser.parse(Schema.java:1413)
    	at WriteToParquet.main(WriteToParquet.java:20)
    Caused by: java.lang.ClassNotFoundException: com.fasterxml.jackson.annotation.JsonView
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 5 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

      后来感觉上面给出的原因不正确,只是表象,有时候 jackson-annotations-2.7.8.jar 也是正常的,感觉更深一次的原因是 com.fasterxml.jackson.core 下面的这三个包版本得统一才行。

      但下面这几种情况又好使,真把我整不会了还。。。
    在这里插入图片描述
    在这里插入图片描述
      感觉版本要统一这个结论还是对的,因为后来和另一个项目整合报错:

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/parquet/schema/LogicalTypeAnnotation$LogicalTypeAnnotationVisitor
    	at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:157)
    	at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)
    	at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:190)
    	at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:533)
    	at WriteToParquet.main(WriteToParquet.java:31)
    Caused by: java.lang.ClassNotFoundException: org.apache.parquet.schema.LogicalTypeAnnotation$LogicalTypeAnnotationVisitor
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 5 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

      maven 依赖:

            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-sql_2.11artifactId>
                <version>2.4.0version>
            dependency>
    
            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-avroartifactId>
                <version>1.12.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

      当时的 parquet 之类的版本号是这样的:

    在这里插入图片描述
      后来变成这样的就不报错了:

    在这里插入图片描述
      有了这个思路就好解决了:

            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-sql_2.11artifactId>
                <version>2.4.0version>
            dependency>
    
            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-avroartifactId>
                <version>1.10.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    坑2:No FileSystem for scheme “file”

      整合到项目中报错:org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "file"
      解决:增加如下代码

                Configuration conf = new Configuration();
                conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
                conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
    
                // 或者
    //            conf.set("fs.hdfs.impl",
    //                    org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
    //            );
    //            conf.set("fs.file.impl",
    //                    org.apache.hadoop.fs.LocalFileSystem.class.getName()
    //            );
                FileSystem fs = FileSystem.get(conf); // 这行必须有虽然没有被引用
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    参考:
    java.io.IOException: No FileSystem for scheme: file
    MapReduce 踩坑 - hadoop No FileSystem for scheme: file/hdfs
    FileSystem及其源码分析

    坑3:与 spark-sql 的引入冲突
            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-sql_2.11artifactId>
                <version>2.4.0version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

      报错:

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/parquet/schema/LogicalTypeAnnotation
    	at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:157)
    	at org.apache.parquet.avro.AvroParquetWriter.access$200(AvroParquetWriter.java:36)
    	at org.apache.parquet.avro.AvroParquetWriter$Builder.getWriteSupport(AvroParquetWriter.java:190)
    	at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:533)
    	at com.heheda.app.SparkWriteCsvToParquet.main(SparkWriteCsvToParquet.java:46)
    Caused by: java.lang.ClassNotFoundException: org.apache.parquet.schema.LogicalTypeAnnotation
    	at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    	at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    	... 5 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

      一开始的思路:

            <dependency>
                <groupId>org.apache.sparkgroupId>
                <artifactId>spark-sql_2.11artifactId>
                <version>2.4.0version>
                    <exclusion>
                        <groupId>org.apache.parquetgroupId>
                        <artifactId>parquet-columnartifactId>
                    exclusion>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

      接着又报错:

    Exception in thread "main" java.lang.AbstractMethodError: org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(Lorg/apache/parquet/bytes/BytesInput;IILorg/apache/parquet/column/statistics/Statistics;Lorg/apache/parquet/column/Encoding;Lorg
    /apache/parquet/column/Encoding;Lorg/apache/parquet/column/Encoding;)V	at org.apache.parquet.column.impl.ColumnWriterV1.writePage(ColumnWriterV1.java:59)
    	at org.apache.parquet.column.impl.ColumnWriterBase.writePage(ColumnWriterBase.java:387)
    	at org.apache.parquet.column.impl.ColumnWriteStoreBase.flush(ColumnWriteStoreBase.java:186)
    	at org.apache.parquet.column.impl.ColumnWriteStoreV1.flush(ColumnWriteStoreV1.java:29)
    	at org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:172)
    	at org.apache.parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:114)
    	at org.apache.parquet.hadoop.ParquetWriter.close(ParquetWriter.java:308)
    	at com.heheda.app.SparkWriteCsvToParquet.main(SparkWriteCsvToParquet.java:52)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注:文章里说不需要 Hadoop 也行,但我没成功,提交到有 Hadoop 环境的服务器上可以运行,但本地 Idea 中报错生成了 parquet 空文件或者没有文件生成:

    Exception in thread "main" java.lang.RuntimeException: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    	at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:736)
    	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:271)
    	at org.apache.hadoop.util.Shell.getSetPermissionCommand(Shell.java:287)
    	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:978)
    	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:324)
    	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.(RawLocalFileSystem.java:294)
    	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:439)
    	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:428)
    	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:459)
    	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.(ChecksumFileSystem.java:433)
    	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:521)
    	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
    	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
    	at org.apache.parquet.hadoop.util.HadoopOutputFile.createOrOverwrite(HadoopOutputFile.java:81)
    	at org.apache.parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:327)
    	at org.apache.parquet.hadoop.ParquetWriter.(ParquetWriter.java:292)
    	at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:646)
    	at WriteToParquet.main(WriteToParquet.java:33)
    Caused by: java.io.FileNotFoundException: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset. -see https://wiki.apache.org/hadoop/WindowsProblems
    	at org.apache.hadoop.util.Shell.fileNotFoundException(Shell.java:548)
    	at org.apache.hadoop.util.Shell.getHadoopHomeDir(Shell.java:569)
    	at org.apache.hadoop.util.Shell.getQualifiedBin(Shell.java:592)
    	at org.apache.hadoop.util.Shell.(Shell.java:689)
    	at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)
    	at org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:3741)
    	at org.apache.hadoop.fs.FileSystem$Cache$Key.(FileSystem.java:3736)
    	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3520)
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:288)
    	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:524)
    	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
    	at org.apache.parquet.hadoop.util.HadoopOutputFile.fromPath(HadoopOutputFile.java:58)
    	at org.apache.parquet.hadoop.ParquetWriter$Builder.build(ParquetWriter.java:643)
    	... 1 more
    Caused by: java.io.FileNotFoundException: HADOOP_HOME and hadoop.home.dir are unset.
    	at org.apache.hadoop.util.Shell.checkHadoopHomeInner(Shell.java:468)
    	at org.apache.hadoop.util.Shell.checkHadoopHome(Shell.java:439)
    	at org.apache.hadoop.util.Shell.(Shell.java:516)
    	... 11 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    方式二:不依赖于大数据环境

      网上许多写入 parquet 需要在本地安装 haddop 环境,下面介绍一种不需要安装 haddop 即可写入 parquet 文件的方式;

      来自:列式存储格式之parquet读写

      Maven 依赖:

            <dependency>
                <groupId>org.apache.avrogroupId>
                <artifactId>avroartifactId>
                <version>1.8.2version>
            dependency>
            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-coreartifactId>
                <version>1.2.1version>
            dependency>
            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-hadoopartifactId>
                <version>1.8.1version>
            dependency>
            
            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-avroartifactId>
                <version>1.8.1version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    public class User {
        private String id;
        private String name;
        private String password;
    
        public User() {
        }
    
        public User(String id, String name, String password) {
            this.id = id;
            this.name = name;
            this.password = password;
        }
    
        public String getId() {
            return id;
        }
    
    
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        @Override
        public String toString() {
            return "User{" +
                    "id='" + id + '\'' +
                    ", name='" + name + '\'' +
                    ", password='" + password + '\'' +
                    '}';
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

    注:这种方式的 User 实体类和上面方式的 schema.avsc 文件中的 "name": "User" 有冲突,报错:

    Exception in thread "main" org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/heheda/output.parquet
            at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254)
            at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132)
            at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136)
            at WriteToParquet.main(WriteToParquet.java:55)
    Caused by: java.lang.ClassCastException: User cannot be cast to org.apache.avro.generic.IndexedRecord
            at org.apache.avro.generic.GenericData.setField(GenericData.java:818)
            at org.apache.parquet.avro.AvroRecordConverter.set(AvroRecordConverter.java:396)
            at org.apache.parquet.avro.AvroRecordConverter$2.add(AvroRecordConverter.java:132)
            at org.apache.parquet.avro.AvroConverters$BinaryConverter.addBinary(AvroConverters.java:64)
            at org.apache.parquet.column.impl.ColumnReaderBase$2$6.writeValue(ColumnReaderBase.java:390)
            at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
            at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
            at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
            at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:229)
            ... 3 more
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    写入:
    import org.apache.avro.reflect.ReflectData;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.avro.AvroParquetWriter;
    import org.apache.parquet.hadoop.ParquetWriter;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
    import static org.apache.parquet.hadoop.metadata.CompressionCodecName.SNAPPY;
    
    public class WriteToParquet {
    
        public static void main(String[] args) {
            try {
                List<User> users = new ArrayList<>();
    
                User user1 = new User("1","huangchixin","123123");
                User user2 = new User("2","huangchixin2","123445");
                users.add(user1);
                users.add(user2);
    
                Path dataFile = new Path("output.parquet");
    
                ParquetWriter<User> writer = AvroParquetWriter.<User>builder(dataFile)
                        .withSchema(ReflectData.AllowNull.get().getSchema(User.class))
                        .withDataModel(ReflectData.get())
                        .withConf(new Configuration())
                        .withCompressionCodec(SNAPPY)
                        .withWriteMode(OVERWRITE)
                        .build();
    
                for (User user : users) {
                    writer.write(user);
                }
    
                writer.close();
            } catch (
                    IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45

      Idea 本地执行:

    在这里插入图片描述

    AvroParquetReader 读取,需要指定对象 class:
    import org.apache.avro.reflect.ReflectData;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.avro.AvroParquetReader;
    import org.apache.parquet.hadoop.ParquetReader;
    
    import java.io.IOException;
    
    public class WriteToParquet {
    
        public static void main(String[] args) {
            try {
                Path dataFile = new Path("output.parquet");
                ParquetReader<User> reader = AvroParquetReader.<User>builder(dataFile)
                        .withDataModel(new ReflectData(User.class.getClassLoader()))
                        .disableCompatibility()
                        .withConf(new Configuration())
                        .build();
                    User user;
    
                    while ((user = reader.read()) != null) {
                        System.out.println(user);
                    }
            } catch (
                    IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在这里插入图片描述

    ParquetFileReader 读取,只需虚拟 haddop:

      Maven 配置:

            <dependency>
                <groupId>org.apache.hadoopgroupId>
                <artifactId>hadoop-coreartifactId>
                <version>1.2.1version>
            dependency>
            <dependency>
                <groupId>org.apache.parquetgroupId>
                <artifactId>parquet-hadoopartifactId>
                <version>1.8.1version>
            dependency>
    
            <dependency>
                <groupId>log4jgroupId>
                <artifactId>log4jartifactId>
                <version>1.2.17version>
            dependency>
    
            <dependency>
                <groupId>com.google.guavagroupId>
                <artifactId>guavaartifactId>
                <version>11.0.2version>
            dependency>
    
            <dependency>
                <groupId>com.fasterxml.jackson.coregroupId>
                <artifactId>jackson-databindartifactId>
                <version>2.8.3version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

      列实体:

    package com.kestrel;
    
    public class TableHead {
    
        /**
         * 列名
         */
        private String name;
        /**
         * 存储 列的 数据类型
         */
        private String type;
        /**
         * 所在列
         */
        private Integer index;
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Integer getIndex() {
            return index;
        }
    
        public void setIndex(Integer index) {
            this.index = index;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

      Parquet 实体类:

    package com.kestrel;
    
    import java.util.List;
    
    public class TableResult {
        /**
         * 解析文件的表头信息 暂时只对 arrow,csv 文件有效
         */
        private List< TableHead> columns;
        /**
         * 数据内容
         */
        private List<?> data;
    
        public List< TableHead> getColumns() {
            return columns;
        }
    
        public void setColumns(List< TableHead> columns) {
            this.columns = columns;
        }
    
        public List<?> getData() {
            return data;
        }
    
        public void setData(List<?> data) {
            this.data = data;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

      读取 parquet 文件:

    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.google.common.collect.Lists;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.column.page.PageReadStore;
    import org.apache.parquet.example.data.Group;
    import org.apache.parquet.example.data.simple.convert.GroupRecordConverter;
    import org.apache.parquet.format.converter.ParquetMetadataConverter;
    import org.apache.parquet.hadoop.ParquetFileReader;
    import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    import org.apache.parquet.io.ColumnIOFactory;
    import org.apache.parquet.io.MessageColumnIO;
    import org.apache.parquet.io.RecordReader;
    import org.apache.parquet.schema.GroupType;
    import org.apache.parquet.schema.MessageType;
    import org.apache.parquet.schema.OriginalType;
    import org.apache.parquet.schema.Type;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import java.io.File;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    public class ReadParquet {
    
        private static final Log logger = LogFactory.getLog(ReadParquet.class);
    
        public static void main(String[] args) throws Exception {
            TableResult tableResult = parquetReaderV2(new File("output.parquet"));
            ObjectMapper mapper = new ObjectMapper();
            String jsonString = mapper.writerWithDefaultPrettyPrinter()
                    .writeValueAsString(tableResult);
            System.out.println(jsonString);
        }
    
        public static TableResult parquetReaderV2(File file) throws Exception {
            long start = System.currentTimeMillis();
            haddopEnv();
            Path path = new Path(file.getAbsolutePath());
    
            Configuration conf = new Configuration();
            TableResult table = new TableResult();
    
            //二位数据列表
            List<List<Object>> dataList = Lists.newArrayList();
    
            ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
    
            MessageType schema = readFooter.getFileMetaData().getSchema();
            ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, readFooter.getBlocks(), schema.getColumns());
    //        org.apache.parquet 1.9.0版本使用以下创建对象
    //        ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
    
            PageReadStore pages = null;
            try {
                while (null != (pages = r.readNextRowGroup())) {
                    final long rows = pages.getRowCount();
                    logger.info(file.getName()+" 行数: " + rows);
    
                    final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
                    final RecordReader<Group> recordReader = columnIO.getRecordReader(pages,
                            new GroupRecordConverter(schema));
                    for (int i = 0; i <= rows; i++) {
    //                    System.out.println(recordReader.shouldSkipCurrentRecord());
                        final Group g = recordReader.read();
                        if (i == 0) {
                            // 设置表头列名
                            table.setColumns(parquetColumn(g));
                            i++;
                        }
                        // 获取行数据
                        List<Object> row = getparquetData(table.getColumns(), g);
                        dataList.add(row);
                        // printGroup(g);
    
                    }
                }
            } finally {
                r.close();
            }
            logger.info(file.getName()+" 加载时间:"+(System.currentTimeMillis() - start));
    
            table.setData(dataList);
            return table;
        }
        
        private static List<Object> getparquetData(List<TableHead> columns, Group line) {
            List<Object> row = new ArrayList<>();
            Object cellStr = null;
            for (int i = 0; i < columns.size(); i++) {
                try {
                    switch (columns.get(i).getType()) {
                        case "DOUBLE":
                            cellStr = line.getDouble(i, 0);
                            break;
                        case "FLOAT":
                            cellStr = line.getFloat(i, 0);
                            break;
                        case "BOOLEAN":
                            cellStr = line.getBoolean(i, 0);
                            break;
                        case "INT96":
                            cellStr = line.getInt96(i, 0);
                            break;
                        case "LONG":
                            cellStr = line.getLong(i, 0);
                            break;
                        default:
                            cellStr = line.getValueToString(i, 0);
                    }
    
                } catch (RuntimeException e) {
    
                } finally {
                    row.add(cellStr);
                }
            }
            return row;
        }
    
        /**
         * 获取arrow 文件 表头信息
         *
         * @param
         * @return
         */
        private static List<TableHead> parquetColumn(Group line) {
            List<TableHead> columns = Lists.newArrayList();
            TableHead dto = null;
    
            GroupType groupType = line.getType();
    
            int fieldCount = groupType.getFieldCount();
            for (int i = 0; i < fieldCount; i++) {
                dto = new TableHead();
                Type type = groupType.getType(i);
                String fieldName = type.getName();
                OriginalType originalType = type.getOriginalType();
                String typeName = null;
                if (originalType != null) {
                    typeName = originalType.name();
                } else {
                    typeName = type.asPrimitiveType().getPrimitiveTypeName().name();
                }
                dto.setIndex(i);
                dto.setName(fieldName);
                dto.setType(typeName);
                columns.add(dto);
            }
            return columns;
        }
    
        public static void haddopEnv() throws IOException {
            File workaround = new File(".");
            System.getProperties().put("hadoop.home.dir", workaround.getAbsolutePath());
            new File("./bin").mkdirs();
            new File("./bin/winutils.exe").createNewFile();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162

    在这里插入图片描述

    ParquetReader 读取:
    import org.apache.hadoop.fs.Path;
    import org.apache.parquet.example.data.Group;
    import org.apache.parquet.hadoop.ParquetReader;
    import org.apache.parquet.hadoop.example.GroupReadSupport;
    
    public class ReadParquet {
    
        public static void main(String[] args) throws Exception {
            parquetReader("output.parquet");
        }
    
        static void parquetReader(String inPath) throws Exception{
            GroupReadSupport readSupport = new GroupReadSupport();
            ParquetReader<Group> reader = new ParquetReader<Group>(new Path(inPath),readSupport);
            // 新版本中new ParquetReader() 所有构造方法好像都弃用了,用 builder 去构造对象
    //        ParquetReader reader = ParquetReader.builder(readSupport, new Path(inPath)).build();
            Group line=null;
            while((line=reader.read())!=null){
                System.out.println(line.toString());
                
    //            System.out.println(line.getString("id",0));
    //            System.out.println(line.getString("name",0));
    //            System.out.println(line.getString("password",0));
    
                // 如果是其他数据类型的,可以参考下面的解析
    //            System.out.println(line.getInteger("intValue",0));
    //            System.out.println(line.getLong("longValue",0));
    //            System.out.println(line.getDouble("doubleValue",0));
    //            System.out.println(line.getString("stringValue",0));
    //            System.out.println(new String(line.getBinary("byteValue",0).getBytes()));
    //            System.out.println(new String(line.getBinary("byteNone",0).getBytes()));
            }
            System.out.println("读取结束");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    在这里插入图片描述

  • 相关阅读:
    难以理解:摄像头APP,我测试好好的,发给别人就用不了
    springboot+vue练手级项目,真实的在线博客系统
    什么是魔法值
    中文命名实体识别
    CANoe-vTESTstudio之Test Diagram编辑器(元素介绍)
    web学习---Vue---笔记(一)
    InfluxDB学习记录(二)——influxdb的关键概念
    Java调用ChatGPT的API接口实现对话与图片生成
    Isito 入门(九):安全认证
    6 种创新的人工智能在牙科领域的应用
  • 原文地址:https://blog.csdn.net/m0_37739193/article/details/136300647