使用Spark读写Parquet文件验证Parquet自带表头的性质及NULL值来源【Java】
地主家的集群从CDH5.16升级到CDP7.1.5时,遇到了一个很头疼的NULL值问题。老一套集群是IBM DataStage运算后写入Hive的HDFS路径,我等短工们作为大数据学徒工,自然是不晓得DataStage有神马黑科技,可以做到很多本该是NULL的字段居然自动变成了""这种空String。到了CDP7我们使用阿里云DataPhin,硬翻HQL任务后,理所当然并没有做类似:
select
nvl(col1,'') as col1
from
db1.tb1
;
的这种骚操作,也就出现了一大坨的NULL值。这种情况下给业务的报表也很难看。由于CDP7我们统一采用Parquet格式,故笔者使用Spark读写Parquet文件,向SQL Boy们科普Parquet自带表头【schema】的性质,且要通过演示,证明NULL值是存储在Parquet文件块中的,而非存储在Hive中。当然也就不需要对Hive多动脑筋。
<properties>
<maven.compiler.source>8maven.compiler.source>
<maven.compiler.target>8maven.compiler.target>
<scala.version>2.12.12scala.version>
<scala.binary.version>2.12scala.binary.version>
<spark.version>3.3.0spark.version>
<encoding>UTF-8encoding>
properties>
<dependencies>
<dependency>
<groupId>org.scala-langgroupId>
<artifactId>scala-libraryartifactId>
<version>${scala.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-hive_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-kafka-0-10_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql-kafka-0-10_${scala.binary.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
<version>1.16.20version>
dependency>
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>8.0.28version>
dependency>
dependencies>
<build>
<sourceDirectory>src/main/javasourceDirectory>
<testSourceDirectory>src/test/javatestSourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
<version>3.5.1version>
<configuration>
<source>1.8source>
<target>1.8target>
configuration>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-surefire-pluginartifactId>
<version>2.18.1version>
<configuration>
<useFile>falseuseFile>
<disableXmlReport>truedisableXmlReport>
<includes>
<include>**/*Test.*include>
<include>**/*Suite.*include>
includes>
configuration>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<version>2.3version>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<filters>
<filter>
<artifact>*:*artifact>
<excludes>
<exclude>META-INF/*.SFexclude>
<exclude>META-INF/*.DSAexclude>
<exclude>META-INF/*.RSAexclude>
excludes>
filter>
filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
transformer>
transformers>
configuration>
execution>
executions>
plugin>
plugins>
build>
package com.zhiyong.parquet;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StringType;
/**
* @program: zhiyong_study
* @description: 测试Parquet
* @author: zhiyong
* @create: 2022-11-04 20:13
**/
public class test1 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("测试写Parquet")
.master("local[2]")
.getOrCreate();
Dataset<Long> ds1 = spark.range(10);
Dataset<Row> df1 = ds1.toDF("id");
//df1.show();//输出id,内容0->9
Dataset<Row> df2 = df1.withColumn("name", functions.lit("战斗暴龙兽").cast(DataTypes.StringType));
//df2.show();//输出id和name,内容 0|战斗暴龙兽->9|战斗暴龙兽
Dataset<Row> df3 = df2
.withColumn("name_id", functions.concat_ws("^", df2.col("name"), df2.col("id")));
//df3.show();//输出name和id拼接,内容 0|战斗暴龙兽|战斗暴龙兽^0->9|战斗暴龙兽|战斗暴龙兽^9
Dataset<Row> df4 = df3.filter("id%3=1");//获取一些数据
//df4.show();//过滤出只有id=1,4,7的数据
Dataset<Row> df5 = df4.selectExpr("id", "name", "null as name_id");
//df5.show();
//df5.filter("name_id is NULL").show();//null生效
Dataset<Row> df6 = df3.filter("id%3!=1").unionAll(df5);
df6=df6.sort(df6.col("id").asc());
//df6.show();排序
//df6.write().mode(SaveMode.Append).parquet("file:///E:/tempdata/20221104/parquet");
String[] str = {"name_id"};
Dataset<Row> df7 = df6.na().fill("Na1",str);
df7.show();
}
}
为了方便,笔者连Hive都没有连接,直接造了10条数据,然后取了3条将name_id
这个拼接的字段值置为NULL
,最后直接写入到Win10宿主机的本地路径:
可以看到已经将Parquet文件写到了Win10的本地E:\tempdata\20221104\parquet
路径。接下来就可以读取这个Parquet文件查看内容。
package com.zhiyong.parquet;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
/**
* @program: zhiyong_study
* @description: 测试读Parquet
* @author: zhiyong
* @create: 2022-11-04 20:54
**/
public class test2 {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("测试读Parquet")
.master("local[2]")
.getOrCreate();
Dataset<Row> df1 = spark.read().parquet("file:///E:/tempdata/20221104/parquet");
df1.show();
String[] schema_name = df1.schema().names();
StringBuilder strb = new StringBuilder();
System.out.print("表头:");
for (String s : schema_name) {
System.out.print(s + "\t");
}
StructField[] schema = df1.schema().fields();
for (StructField structField : schema) {
DataType dataType = structField.dataType();
String name = structField.name();
System.out.println("dataType = " + dataType + "\tname=" + name);
if (dataType.toString().equalsIgnoreCase("StringType")){
if (strb.length()>0){
strb.append(",");
}
strb.append(name);
}
}
String[] split = strb.toString().split(",");
Dataset<Row> fillDf = df1.na().fill("***", split);
fillDf.show();
}
}
为了方便,直接从本地读这个路径的Parquet文件块即可:
此时可以看到,事实胜于雄辩。Parquet文件块本身就存储了表头信息【Schema】,且在数据文件中本身就存储了null
,不只是Hive读Parquet会出现NULL,Spark会,Impala也会。。。
Hive参数总共几千个:https://lizhiyong.blog.csdn.net/article/details/126634922
在Hiveconf.java
源码文件中也搜索了一波,没找到和Parquet、NULL等非常相关的参数。
所以,想要解决NULL的情况,一定要对Parquet文件块做修正,确保Parquet文件块中本身就不存在NULL值,而不是妄想设置Hive的参数实现读数据时自动将NULL转为""空String。
当然也没啥好办法。。。要不在每个HQL任务最后一步:
insert overwrite table db1.tb1
select
nvl(col1,'') as col1
from
db2.tb2
;
做nvl空值处理,要不就使用Spark或者Flink对Parquet文件做空值填充的修复。
这样可以填充NULL值。
甚至可以自动对String类型的所有字段做空值填充:
Java实现起来比SQL还是要方便很多。
总归是没想到啥太好的解决方法。。。
转载请注明出处:https://lizhiyong.blog.csdn.net/article/details/127697083