1.8.5.6 ALTER TABLE 分区操作
alter 分区操作包括增加分区和删除分区操作,这种分区操作在Spark3.x之后被支持,spark2.4版本不支持,并且使用时,必须在spark配置中加入spark.sql.extensions属性,其值为:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,在添加分区时还支持分区转换,语法如下:
- 添加分区语法:ALTER TABLE ... ADD PARTITION FIELD
- 删除分区语法:ALTER TABLE ... DROP PARTITION FIELD
具体操作如下:
- 创建表mytbl,并插入数据
- val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
- //指定hadoop catalog,catalog名称为hadoop_prod
- .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
- .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
- .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
- .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
- .getOrCreate()
-
- //1.创建普通表
- spark.sql(
- """
- | create table hadoop_prod.default.mytbl(id int,name string,loc string,ts timestamp) using iceberg
- """.stripMargin)
- //2.向表中插入数据,并查询
- spark.sql(
- """
- |insert into hadoop_prod.default.mytbl values
- |(1,'zs',"beijing",cast(1608469830 as timestamp)),
- |(3,'ww',"shanghai",cast(1603096230 as timestamp))
- """.stripMargin)
- spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:


- 将表loc列添加为分区列,并插入数据,查询
- //3.将 loc 列添加成分区,必须添加 config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 配置
- spark.sql(
- """
- |alter table hadoop_prod.default.mytbl add partition field loc
- """.stripMargin)
-
- //4.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
- spark.sql(
- """
- |insert into hadoop_prod.default.mytbl values
- |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
- |(2,'ls',"shandong",cast(1634559630 as timestamp))
- """.stripMargin )
- spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:


注意:添加分区字段是元数据操作,不会改变现有的表数据,新数据将使用新分区写入数据,现有数据将继续保留在原有的布局中。
我的测试:
测试代码:
- package com.shujia.spark.iceberg
-
- import org.apache.spark.sql.SparkSession
-
- object AlterTablePartition {
- def main(args: Array[String]): Unit = {
-
- /**
- * alter 分区操作包括增加分区和删除分区操作,这种分区操作在Spark3.x之后被支持,spark2.4版本不支持,并且使用时,
- * 必须在spark配置中加入spark.sql.extensions属性,
- * 其值为:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,在添加分区时还支持分区转换,语法如下:
- * 添加分区语法:ALTER TABLE ... ADD PARTITION FIELD
- * 删除分区语法:ALTER TABLE ... DROP PARTITION FIELD
- *
- */
-
- val spark: SparkSession = SparkSession
- .builder()
- .appName("SparkOperateIceberg")
- //指定hive catalog, catalog名称为hive_prod
- .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
- .config("spark.sql.catalog.hive_prod.type", "hive")
- .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
- .config("iceberg.engine.hive.enabled", "true")
- // 将 loc 列添加成分区,必须添加
- .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
- .enableHiveSupport()
- .getOrCreate()
-
-
- //1.创建普通表
- spark.sql(
- """
- | create table if not exists hive_prod.iceberg.repartition1
- | (id int,name string,loc string,ts timestamp) using iceberg
- |
- """.stripMargin)
- //2.向表中插入数据,并查询
- spark.sql(
- """
- |insert into hive_prod.iceberg.repartition1 values
- |(1,'zs',"beijing",cast(1608469830 as timestamp)),
- |(3,'ww',"shanghai",cast(1603096230 as timestamp))
- |
- """.stripMargin)
-
- spark.sql("select * from hive_prod.iceberg.repartition1").show()
-
-
- //3.将 loc 列添加成分区,必须添加 config("spark.sql.extensions",
- // "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 配置
- spark.sql(
- """
- |alter table hive_prod.iceberg.repartition1 add partition field loc
- |
- """.stripMargin)
-
- //4.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
- spark.sql(
- """
- |insert into hive_prod.iceberg.repartition1 values
- |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
- |(6,'xx',"hangzhou",cast(1608279631 as timestamp)),
- |(2,'ls',"shandong",cast(1634559632 as timestamp))
- |
- """.stripMargin )
-
- spark.sql("select * from hive_prod.iceberg.repartition1").show()
-
- //spark 提交任务的命令
- //spark-submit --master yarn --class com.shujia.spark.iceberg.AlterTablePartition spark-1.0.jar
- }
- }

- 将ts列进行转换作为分区列,插入数据并查询
- //5.将 ts 列通过分区转换添加为分区列
- spark.sql(
- """
- |alter table hadoop_prod.default.mytbl add partition field years(ts)
- """.stripMargin)
-
- //6.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
- spark.sql(
- """
- |insert into hadoop_prod.default.mytbl values
- |(4,'ml',"beijing",cast(1639920630 as timestamp)),
- |(6,'gb',"tianjin",cast(1576843830 as timestamp))
- """.stripMargin )
- spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:


我的测试,在一级分区的基础上再次添加分区
测试代码:
- package com.shujia.spark.iceberg
-
- import org.apache.spark.sql.SparkSession
-
- object AlterTable2Partitions {
- def main(args: Array[String]): Unit = {
-
- val spark: SparkSession = SparkSession
- .builder()
- .appName("SparkOperateIceberg")
- //指定hive catalog, catalog名称为hive_prod
- .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
- .config("spark.sql.catalog.hive_prod.type", "hive")
- .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
- .config("iceberg.engine.hive.enabled", "true")
- // 将 loc 列添加成分区,必须添加
- .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
- .enableHiveSupport()
- .getOrCreate()
-
-
- //5.将 ts 列通过分区转换添加为分区列
- spark.sql(
- """
- |alter table hive_prod.iceberg.repartition1 add partition field years(ts)
- """.stripMargin)
-
- //6.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
- spark.sql(
- """
- |insert into hive_prod.iceberg.repartition1 values
- |(4,'ml',"beijing",cast(1639920630 as timestamp)),
- |(4,'mm',"beijing",cast(1639920639 as timestamp)),
- |(6,'gb',"tianjin",cast(1576843830 as timestamp))
- |
- """.stripMargin )
-
-
- spark.sql("select * from hive_prod.iceberg.repartition1").show()
-
- //spark 提交任务的命令
- //spark-submit --master yarn --class com.shujia.spark.iceberg.AlterTable2Partitions spark-1.0.jar
- }
- }

- 删除分区loc
- //7.删除表 mytbl 中的loc分区
- spark.sql(
- """
- |alter table hadoop_prod.default.mytbl drop partition field loc
- """.stripMargin)
- //8.继续向表 mytbl 中插入数据,并查询
- spark.sql(
- """
- |insert into hadoop_prod.default.mytbl values
- |(4,'ml',"beijing",cast(1639920630 as timestamp)),
- |(6,'gb',"tianjin",cast(1576843830 as timestamp))
- """.stripMargin )
- spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:

注意:由于表中还有ts分区转换之后对应的分区,所以继续插入的数据loc分区为null

我的测试
测试代码:
- package com.shujia.spark.iceberg
-
- import org.apache.spark.sql.SparkSession
-
- object DeleteTablePartition {
- def main(args: Array[String]): Unit = {
-
- /**
- *
- * 删除一个分区
- *
- */
- val spark: SparkSession = SparkSession
- .builder()
- .appName("SparkOperateIceberg")
- //指定hive catalog, catalog名称为hive_prod
- .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
- .config("spark.sql.catalog.hive_prod.type", "hive")
- .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
- .config("iceberg.engine.hive.enabled", "true")
- // 将 loc 列添加成分区,必须添加
- .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
- .enableHiveSupport()
- .getOrCreate()
-
- //7.删除表 mytbl 中的loc分区
- spark.sql(
- """
- |alter table hive_prod.iceberg.repartition1 drop partition field loc
- """.stripMargin)
- //8.继续向表 mytbl 中插入数据,并查询
- spark.sql(
- """
- |insert into hive_prod.iceberg.repartition1 values
- |(4,'ml',"beijing",cast(1639920630 as timestamp)),
- |(6,'gb',"tianjin",cast(1576843830 as timestamp))
- |
- """.stripMargin )
-
- spark.sql("select * from hive_prod.iceberg.repartition1").show()
- //spark 提交任务的命令
- //spark-submit --master yarn --class com.shujia.spark.iceberg.DeleteTablePartition spark-1.0.jar
- }
- }

- 删除分区years(ts)
- //9.删除表 mytbl 中的years(ts) 分区
- spark.sql(
- """
- |alter table hadoop_prod.default.mytbl drop partition field years(ts)
- """.stripMargin)
- //10.继续向表 mytbl 中插入数据,并查询
- spark.sql(
- """
- |insert into hadoop_prod.default.mytbl values
- |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
- |(2,'ls',"shandong",cast(1634559630 as timestamp))
- """.stripMargin )
- spark.sql("select * from hadoop_prod.default.mytbl").show()
在HDFS中数据存储和结果如下:


我的测试:
测试代码:
- package com.shujia.spark.iceberg
-
- import org.apache.spark.sql.SparkSession
-
- object DeleteTable2Partitions {
- def main(args: Array[String]): Unit = {
-
- /**
- *
- * 删除一个分区之后再次删除一个分区
- *
- */
- val spark: SparkSession = SparkSession
- .builder()
- .appName("SparkOperateIceberg")
- //指定hive catalog, catalog名称为hive_prod
- .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
- .config("spark.sql.catalog.hive_prod.type", "hive")
- .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
- .config("iceberg.engine.hive.enabled", "true")
- // 将 loc 列添加成分区,必须添加
- .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
- .enableHiveSupport()
- .getOrCreate()
-
- //9.删除表 mytbl 中的years(ts) 分区
- spark.sql(
- """
- |alter table hive_prod.iceberg.repartition1 drop partition field years(ts)
- """.stripMargin)
- //10.继续向表 mytbl 中插入数据,并查询
- spark.sql(
- """
- |insert into hive_prod.iceberg.repartition1 values
- |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
- |(2,'ls',"shandong",cast(1634559630 as timestamp))
- """.stripMargin )
- spark.sql("select * from hive_prod.iceberg.repartition1").show()
-
-
- //spark 提交任务的命令
- //spark-submit --master yarn --class com.shujia.spark.iceberg.DeleteTable2Partitions spark-1.0.jar
- }
- }

