• ALTER TABLE 分区操作-动态增加一级,多级分区,动态删除分区


    1.8.5.6 ALTER TABLE 分区操作

    alter 分区操作包括增加分区和删除分区操作,这种分区操作在Spark3.x之后被支持,spark2.4版本不支持,并且使用时,必须在spark配置中加入spark.sql.extensions属性,其值为:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,在添加分区时还支持分区转换,语法如下:

    • 添加分区语法:ALTER TABLE ... ADD PARTITION FIELD
    • 删除分区语法:ALTER TABLE ... DROP PARTITION FIELD

    具体操作如下:

    1. 创建表mytbl,并插入数据
    1. val spark: SparkSession = SparkSession.builder().master("local").appName("SparkOperateIceberg")
    2. //指定hadoop catalog,catalog名称为hadoop_prod
    3. .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
    4. .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
    5. .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://mycluster/sparkoperateiceberg")
    6. .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    7. .getOrCreate()
    8. //1.创建普通表
    9. spark.sql(
    10. """
    11. | create table hadoop_prod.default.mytbl(id int,name string,loc string,ts timestamp) using iceberg
    12. """.stripMargin)
    13. //2.向表中插入数据,并查询
    14. spark.sql(
    15. """
    16. |insert into hadoop_prod.default.mytbl values
    17. |(1,'zs',"beijing",cast(1608469830 as timestamp)),
    18. |(3,'ww',"shanghai",cast(1603096230 as timestamp))
    19. """.stripMargin)
    20. spark.sql("select * from hadoop_prod.default.mytbl").show()

    在HDFS中数据存储和结果如下:

    1. 将表loc列添加为分区列,并插入数据,查询
    1. //3.将 loc 列添加成分区,必须添加 config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 配置
    2. spark.sql(
    3. """
    4. |alter table hadoop_prod.default.mytbl add partition field loc
    5. """.stripMargin)
    6. //4.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
    7. spark.sql(
    8. """
    9. |insert into hadoop_prod.default.mytbl values
    10. |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
    11. |(2,'ls',"shandong",cast(1634559630 as timestamp))
    12. """.stripMargin )
    13. spark.sql("select * from hadoop_prod.default.mytbl").show()

    在HDFS中数据存储和结果如下:

    注意:添加分区字段是元数据操作,不会改变现有的表数据,新数据将使用新分区写入数据,现有数据将继续保留在原有的布局中

    我的测试:

    测试代码:

    1. package com.shujia.spark.iceberg
    2. import org.apache.spark.sql.SparkSession
    3. object AlterTablePartition {
    4. def main(args: Array[String]): Unit = {
    5. /**
    6. * alter 分区操作包括增加分区和删除分区操作,这种分区操作在Spark3.x之后被支持,spark2.4版本不支持,并且使用时,
    7. * 必须在spark配置中加入spark.sql.extensions属性,
    8. * 其值为:org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,在添加分区时还支持分区转换,语法如下:
    9. * 添加分区语法:ALTER TABLE ... ADD PARTITION FIELD
    10. * 删除分区语法:ALTER TABLE ... DROP PARTITION FIELD
    11. *
    12. */
    13. val spark: SparkSession = SparkSession
    14. .builder()
    15. .appName("SparkOperateIceberg")
    16. //指定hive catalog, catalog名称为hive_prod
    17. .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
    18. .config("spark.sql.catalog.hive_prod.type", "hive")
    19. .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
    20. .config("iceberg.engine.hive.enabled", "true")
    21. // 将 loc 列添加成分区,必须添加
    22. .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    23. .enableHiveSupport()
    24. .getOrCreate()
    25. //1.创建普通表
    26. spark.sql(
    27. """
    28. | create table if not exists hive_prod.iceberg.repartition1
    29. | (id int,name string,loc string,ts timestamp) using iceberg
    30. |
    31. """.stripMargin)
    32. //2.向表中插入数据,并查询
    33. spark.sql(
    34. """
    35. |insert into hive_prod.iceberg.repartition1 values
    36. |(1,'zs',"beijing",cast(1608469830 as timestamp)),
    37. |(3,'ww',"shanghai",cast(1603096230 as timestamp))
    38. |
    39. """.stripMargin)
    40. spark.sql("select * from hive_prod.iceberg.repartition1").show()
    41. //3.将 loc 列添加成分区,必须添加 config("spark.sql.extensions",
    42. // "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") 配置
    43. spark.sql(
    44. """
    45. |alter table hive_prod.iceberg.repartition1 add partition field loc
    46. |
    47. """.stripMargin)
    48. //4.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
    49. spark.sql(
    50. """
    51. |insert into hive_prod.iceberg.repartition1 values
    52. |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
    53. |(6,'xx',"hangzhou",cast(1608279631 as timestamp)),
    54. |(2,'ls',"shandong",cast(1634559632 as timestamp))
    55. |
    56. """.stripMargin )
    57. spark.sql("select * from hive_prod.iceberg.repartition1").show()
    58. //spark 提交任务的命令
    59. //spark-submit --master yarn --class com.shujia.spark.iceberg.AlterTablePartition spark-1.0.jar
    60. }
    61. }

    1. 将ts列进行转换作为分区列,插入数据并查询
    1. //5.将 ts 列通过分区转换添加为分区列
    2. spark.sql(
    3. """
    4. |alter table hadoop_prod.default.mytbl add partition field years(ts)
    5. """.stripMargin)
    6. //6.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
    7. spark.sql(
    8. """
    9. |insert into hadoop_prod.default.mytbl values
    10. |(4,'ml',"beijing",cast(1639920630 as timestamp)),
    11. |(6,'gb',"tianjin",cast(1576843830 as timestamp))
    12. """.stripMargin )
    13. spark.sql("select * from hadoop_prod.default.mytbl").show()

    在HDFS中数据存储和结果如下:

    我的测试,在一级分区的基础上再次添加分区

    测试代码:

    1. package com.shujia.spark.iceberg
    2. import org.apache.spark.sql.SparkSession
    3. object AlterTable2Partitions {
    4. def main(args: Array[String]): Unit = {
    5. val spark: SparkSession = SparkSession
    6. .builder()
    7. .appName("SparkOperateIceberg")
    8. //指定hive catalog, catalog名称为hive_prod
    9. .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
    10. .config("spark.sql.catalog.hive_prod.type", "hive")
    11. .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
    12. .config("iceberg.engine.hive.enabled", "true")
    13. // 将 loc 列添加成分区,必须添加
    14. .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    15. .enableHiveSupport()
    16. .getOrCreate()
    17. //5.将 ts 列通过分区转换添加为分区列
    18. spark.sql(
    19. """
    20. |alter table hive_prod.iceberg.repartition1 add partition field years(ts)
    21. """.stripMargin)
    22. //6.向表 mytbl中继续插入数据,之前数据没有分区,之后数据有分区
    23. spark.sql(
    24. """
    25. |insert into hive_prod.iceberg.repartition1 values
    26. |(4,'ml',"beijing",cast(1639920630 as timestamp)),
    27. |(4,'mm',"beijing",cast(1639920639 as timestamp)),
    28. |(6,'gb',"tianjin",cast(1576843830 as timestamp))
    29. |
    30. """.stripMargin )
    31. spark.sql("select * from hive_prod.iceberg.repartition1").show()
    32. //spark 提交任务的命令
    33. //spark-submit --master yarn --class com.shujia.spark.iceberg.AlterTable2Partitions spark-1.0.jar
    34. }
    35. }

    1. 删除分区loc
    1. //7.删除表 mytbl 中的loc分区
    2. spark.sql(
    3. """
    4. |alter table hadoop_prod.default.mytbl drop partition field loc
    5. """.stripMargin)
    6. //8.继续向表 mytbl 中插入数据,并查询
    7. spark.sql(
    8. """
    9. |insert into hadoop_prod.default.mytbl values
    10. |(4,'ml',"beijing",cast(1639920630 as timestamp)),
    11. |(6,'gb',"tianjin",cast(1576843830 as timestamp))
    12. """.stripMargin )
    13. spark.sql("select * from hadoop_prod.default.mytbl").show()

    在HDFS中数据存储和结果如下:

    注意:由于表中还有ts分区转换之后对应的分区,所以继续插入的数据loc分区为null

    我的测试

    测试代码:

    1. package com.shujia.spark.iceberg
    2. import org.apache.spark.sql.SparkSession
    3. object DeleteTablePartition {
    4. def main(args: Array[String]): Unit = {
    5. /**
    6. *
    7. * 删除一个分区
    8. *
    9. */
    10. val spark: SparkSession = SparkSession
    11. .builder()
    12. .appName("SparkOperateIceberg")
    13. //指定hive catalog, catalog名称为hive_prod
    14. .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
    15. .config("spark.sql.catalog.hive_prod.type", "hive")
    16. .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
    17. .config("iceberg.engine.hive.enabled", "true")
    18. // 将 loc 列添加成分区,必须添加
    19. .config("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    20. .enableHiveSupport()
    21. .getOrCreate()
    22. //7.删除表 mytbl 中的loc分区
    23. spark.sql(
    24. """
    25. |alter table hive_prod.iceberg.repartition1 drop partition field loc
    26. """.stripMargin)
    27. //8.继续向表 mytbl 中插入数据,并查询
    28. spark.sql(
    29. """
    30. |insert into hive_prod.iceberg.repartition1 values
    31. |(4,'ml',"beijing",cast(1639920630 as timestamp)),
    32. |(6,'gb',"tianjin",cast(1576843830 as timestamp))
    33. |
    34. """.stripMargin )
    35. spark.sql("select * from hive_prod.iceberg.repartition1").show()
    36. //spark 提交任务的命令
    37. //spark-submit --master yarn --class com.shujia.spark.iceberg.DeleteTablePartition spark-1.0.jar
    38. }
    39. }

    1. 删除分区years(ts)
    1. //9.删除表 mytbl 中的years(ts) 分区
    2. spark.sql(
    3. """
    4. |alter table hadoop_prod.default.mytbl drop partition field years(ts)
    5. """.stripMargin)
    6. //10.继续向表 mytbl 中插入数据,并查询
    7. spark.sql(
    8. """
    9. |insert into hadoop_prod.default.mytbl values
    10. |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
    11. |(2,'ls',"shandong",cast(1634559630 as timestamp))
    12. """.stripMargin )
    13. spark.sql("select * from hadoop_prod.default.mytbl").show()

    在HDFS中数据存储和结果如下:

    我的测试:

    测试代码:

    1. package com.shujia.spark.iceberg
    2. import org.apache.spark.sql.SparkSession
    3. object DeleteTable2Partitions {
    4. def main(args: Array[String]): Unit = {
    5. /**
    6. *
    7. * 删除一个分区之后再次删除一个分区
    8. *
    9. */
    10. val spark: SparkSession = SparkSession
    11. .builder()
    12. .appName("SparkOperateIceberg")
    13. //指定hive catalog, catalog名称为hive_prod
    14. .config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
    15. .config("spark.sql.catalog.hive_prod.type", "hive")
    16. .config("spark.sql.catalog.hive_prod.uri", "thrift://master:9083")
    17. .config("iceberg.engine.hive.enabled", "true")
    18. // 将 loc 列添加成分区,必须添加
    19. .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    20. .enableHiveSupport()
    21. .getOrCreate()
    22. //9.删除表 mytbl 中的years(ts) 分区
    23. spark.sql(
    24. """
    25. |alter table hive_prod.iceberg.repartition1 drop partition field years(ts)
    26. """.stripMargin)
    27. //10.继续向表 mytbl 中插入数据,并查询
    28. spark.sql(
    29. """
    30. |insert into hive_prod.iceberg.repartition1 values
    31. |(5,'tq',"hangzhou",cast(1608279630 as timestamp)),
    32. |(2,'ls',"shandong",cast(1634559630 as timestamp))
    33. """.stripMargin )
    34. spark.sql("select * from hive_prod.iceberg.repartition1").show()
    35. //spark 提交任务的命令
    36. //spark-submit --master yarn --class com.shujia.spark.iceberg.DeleteTable2Partitions spark-1.0.jar
    37. }
    38. }

  • 相关阅读:
    86579-06-8,十七肽KAERADLIAYLKQATAK
    CPP-Templates-2nd--第 24 章 类型列表(Typelists)
    平地惊雷,GPT-4o 凌晨震撼发布
    【计算机毕业设计】java ssm网上宠物商店系统
    如何测试和调试Android应用程序
    Git学习
    链设计模式-装饰模式、职责链设计模式
    【推送服务】【FAQ】Push Ki常见咨询合集3--消息呈现类问题
    C++——pair用法总结
    [附源码]Python计算机毕业设计Django新能源汽车租赁
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/127813404