<properties>
<spark.version>3.2.1spark.version>
<scala.version>2.13scala.version>
<iceberg.version>0.13.2iceberg.version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-core_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-hive_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql-kafka-0-10_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
<dependency>
<groupId>org.apache.iceberggroupId>
<artifactId>iceberg-spark3artifactId>
<version>${iceberg.version}version>
dependency>
<dependency>
<groupId>org.apache.iceberggroupId>
<artifactId>iceberg-spark3-runtimeartifactId>
<version>${iceberg.version}version>
dependency>
<dependency>
<groupId>org.apache.avrogroupId>
<artifactId>avroartifactId>
<version>1.10.2version>
dependency>
<dependency>
<groupId>org.apache.parquetgroupId>
<artifactId>parquet-hadoopartifactId>
<version>1.12.0version>
dependency>
// 其它省略...
<dependencies>
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[*]")
// 指定hive catalog, catalog名称为hive_prod
.config("spark.sql.catalog.hive_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hive_prod.type", "hive")
.config("spark.sql.catalog.hive_prod.uri", "thrift://node03:9083")
.config("iceberg.engine.hive.enabled", "true")
// 指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://node01:8020/spark_iceberg")
.getOrCreate()
// 1. 创建表: hive_prod:指定catalog名称, default:指定Hive中存在的库, test: 创建的iceberg表名
spark.sql(
"""
| create table if not exists hive_prod.default.test(id int, name string, age int) using iceberg
""".stripMargin)
// 2. 插入数据
spark.sql(
"""
| insert into hive_prod.default.test values(1, 'zhangsan', 23),(1, 'lisi', 14),(1, 'wangwu', 35)
""".stripMargin)
// 3. 查询数据
spark.sql(
"""
| select * from hive_prod.default.test
""".stripMargin).show()
// 4. 删除表
spark.sql(
"""
| drop table hive_prod.default.test
""".stripMargin)
注意:
- 创建表时,表名称为:
${hive catalog名称}.${Hive中库名}.${创建的Iceberg格式表名}
- 表创建之后,可以在 Hive 中查询到对应的 test 表,创建的是 Hive 外表,在对应的Hive warehouse 目录下可以看到对应的数据目录。
- 删除表后,数据会被删除,但是表目录还是存在,如果彻底删除数据,需要把对应的表目录删除。
// 1. 创建表 hadoop_prod: 指定Hadoop catalog名称, default: 指定库名称, test: 创建的iceberg表名
spark.sql(
"""
| create table if not exists hadoop_prod.default.test(id int,name string,age int) using iceberg
""".stripMargin);
// 2. 插入数据
spark.sql(
"""
| insert into hadoop_prod.default.test values(1, 'zhangsan', 23),(2, 'lisi', 14),(3, 'wangwu', 35)
""".stripMargin);
// 3. 查询数据
spark.sql(
"""
| select * from hadoop_prod.default.test
""".stripMargin).show();
注意:
- 创建表时,表名称为:
${hadoop catalog名称}.${随意定义库名}.${Iceberg格式表名}
- 表创建之后,可以在 hadoop_prod 名称对应的目录下创建表
CREATE TABLE hdfs_iceberg (
id int,
name string,
age int
) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://node01:8020/spark_iceberg/default/test'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
// 5. 删除表:删除iceberg表后,数据被删除,对应的库目录存在。
spark.sql(
"""
| drop table hadoop_prod.default.test
""".stripMargin);
// 1. 创建普通表
spark.sql(
"""
| create table if not exists hadoop_prod.default.normal_tbl(id int,name string,age int) using iceberg
""".stripMargin)
// 2. 创建分区表,以 loc 列为分区字段
spark.sql(
"""
| create table if not exists hadoop_prod.default.partition_tbl(id int,name string,age int,loc string) using iceberg partitioned by (loc)
""".stripMargin)
// 3. 向分区表中插入数据
spark.sql(
"""
| insert into table hadoop_prod.default.partition_tbl values (1,"zs",18,"bj"),(3,"ww",20,"sz"),(2,"ls",19,"sh"),(4,"ml",21,"gz")
""".stripMargin)
// 4. 查询
spark.sql("select * from hadoop_prod.default.partition_tbl").show()
// 创建分区表 partition_tbl1,指定分区为year
spark.sql(
"""
| create table if not exists hadoop_prod.default.partition_tbl1(id int ,name string,age int,register_ts timestamp) using iceberg
| partitioned by (years(register_ts))
""".stripMargin)
// 插入数据
spark.sql(
"""
| insert into hadoop_prod.default.partition_tbl1 values
| (1,'zs',18,cast(1608469830 as timestamp)),
| (2,'ls',19,cast(1634559630 as timestamp)),
| (3,'ww',20,cast(1603096230 as timestamp)),
| (4,'ml',21,cast(1639920630 as timestamp)),
| (5,'tq',22,cast(1608279630 as timestamp)),
| (6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// 查询
spark.sql("select * from hadoop_prod.default.partition_tbl1").show()
// 创建分区表 partition_tbl2,指定分区为months,会按照“年-月”分区
spark.sql(
"""
| create table if not exists hadoop_prod.default.partition_tbl2(id int ,name string,age int,register_ts timestamp) using iceberg
| partitioned by (months(register_ts))
""".stripMargin)
// 插入数据
spark.sql(
"""
| insert into hadoop_prod.default.partition_tbl2 values
| (1,'zs',18,cast(1608469830 as timestamp)),
| (2,'ls',19,cast(1634559630 as timestamp)),
| (3,'ww',20,cast(1603096230 as timestamp)),
| (4,'ml',21,cast(1639920630 as timestamp)),
| (5,'tq',22,cast(1608279630 as timestamp)),
| (6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// 查询
spark.sql("select * from hadoop_prod.default.partition_tbl2").show()
// 创建分区表 partition_tbl3,指定分区为days,会按照“年-月-日”分区
spark.sql(
"""
| create table if not exists hadoop_prod.default.partition_tbl3(id int ,name string,age int,register_ts timestamp) using iceberg
| partitioned by (days(register_ts))
""".stripMargin)
// 插入数据
spark.sql(
"""
| insert into hadoop_prod.default.partition_tbl3 values
| (1,'zs',18,cast(1608469830 as timestamp)),
| (2,'ls',19,cast(1634559630 as timestamp)),
| (3,'ww',20,cast(1603096230 as timestamp)),
| (4,'ml',21,cast(1639920630 as timestamp)),
| (5,'tq',22,cast(1608279630 as timestamp)),
| (6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// 查询
spark.sql("select * from hadoop_prod.default.partition_tbl3").show()
// 创建分区表 partition_tbl4,指定分区为hours,会按照“年-月-日-时”分区
spark.sql(
"""
| create table if not exists hadoop_prod.default.partition_tbl4(id int ,name string,age int,register_ts timestamp) using iceberg
| partitioned by (days(register_ts))
""".stripMargin)
// 插入数据
spark.sql(
"""
| insert into hadoop_prod.default.partition_tbl4 values
| (1,'zs',18,cast(1608469830 as timestamp)),
| (2,'ls',19,cast(1634559630 as timestamp)),
| (3,'ww',20,cast(1603096230 as timestamp)),
| (4,'ml',21,cast(1639920630 as timestamp)),
| (5,'tq',22,cast(1608279630 as timestamp)),
| (6,'gb',23,cast(1576843830 as timestamp))
""".stripMargin)
// 查询
spark.sql("select * from hadoop_prod.default.partition_tbl4").show()
// 创建表
spark.sql("create table hadoop_prod.default.my_tb1(id int,name string,age int) using iceberg")
// 向表中插入数据
spark.sql("insert into table hadoop_prod.default.my_tb1 values (1,'zs',18),(3,'ww',20),(2,'ls',19),(4,'ml',21)")
// 查询数据
spark.sql("select * from hadoop_prod.default.my_tb1").show()
// 查询插入
spark.sql("create table hadoop_prod.default.my_tb2 using iceberg as select id,name,age from hadoop_prod.default.my_tb1")
// 查询
spark.sql("select * from hadoop_prod.default.my_tb2").show()
// 创建表
spark.sql("create table hadoop_prod.default.my_tb3(id int,name string,age int) using iceberg")
// 向表中插入数据
spark.sql("insert into table hadoop_prod.default.my_tb3 values (1,'zs',18),(3,'ww',20),(2,'ls',19),(4,'ml',21)")
// 查询数据
spark.sql("select * from hadoop_prod.default.my_tb3").show()
// 查询插入
spark.sql("replace table hadoop_prod.default.my_tb2 using iceberg as select * from hadoop_prod.default.my_tb3")
// 查询
spark.sql("select * from hadoop_prod.default.my_tb2").show()
spark.sql("drop table hadoop_prod.default.mytb1")
// 创建表
spark.sql("create table hadoop_prod.default.test(id int,name string,age int) using iceberg")
// 向表中插入数据
spark.sql("insert into table hadoop_prod.default.test values (1,'zs',18),(2,'ls',19),(3,'ww',20)")
// 查询
spark.sql("select * from hadoop_prod.default.test").show()
// 添加字段,给 test 表增加列: gender、loc
spark.sql("alter table hadoop_prod.default.test add column gender string, loc string")
// 删除字段,给 test 表删除列: age
spark.sql("alter table hadoop_prod.default.test drop column age")
// 再次查询
spark.sql("select * from hadoop_prod.default.test").show()
/*
最终表展示的列少了age列,多了gender、loc列
+---+----+------+----+
| id|name|gender| loc|
+---+----+------+----+
| 1| zs| null|null|
| 2| ls| null|null|
| 3| ww| null|null|
+---+----+------+----+
*/
// 重命名列
spark.sql("alter table hadoop_prod.default.test rename column gender to xxx")
// 再次查询
spark.sql("select * from hadoop_prod.default.test").show()
/*
最终表展示的列 gender列变成了xxx列
+---+----+----+----+
| id|name| xxx| loc|
+---+----+----+----+
| 1| zs|null|null|
| 2| ls|null|null|
| 3| ww|null|null|
+---+----+----+----+
*/
-- 创建普通表
create table if not exists hadoop_prod.default.my_tab(id int,name string,loc string,ts timestamp) using iceberg;
-- 向表中插入数据
insert into hadoop_prod.default.my_tab values (1,'zs','shenzhen',cast(1608469830 as timestamp));
-- 将表loc列添加为分区列,并插入数据
alter table hadoop_prod.default.my_tab add partition field loc;
insert into hadoop_prod.default.my_tab values (2,'li','wuhan',cast(1634559630 as timestamp));
-- 将ts列进行转换作为分区列,插入数据
alter table hadoop_prod.default.my_tab add partition field years(ts);
insert into hadoop_prod.default.my_tab values (3,'ww','beijing',cast(1576843830 as timestamp));
-- 删除分区loc,插入数据
alter table hadoop_prod.default.my_tab drop partition field loc;
insert into hadoop_prod.default.my_tab values (4,'zl','shanghai',cast(1639920630 as timestamp));
-- 删除分区删除分区years(ts),插入数据
alter table hadoop_prod.default.my_tab drop partition field years(ts);
insert into hadoop_prod.default.my_tab values (5,'tq','shanghai',cast(1634559630 as timestamp));
// 1. SQL 方式读取Iceberg中的数据
spark.sql("select * from hadoop_prod.default.my_tb1").show()
// 2. 使用DataFrame方式,建议使用SQL方式
// 方式一
val df1: DataFrame = spark.table("hadoop_prod.default.my_tb1")
df1.show()
// 方式二
val df2: DataFrame = spark.read.format("iceberg").load("hdfs://node01:8020/spark_iceberg/default/my_tb1")
df2.show()
${catalog名称}.${库名}.${Iceberg表}.snapshots
来查询对应Iceberg表中拥有的所有快照,操作如下:// 查看Iceberg表快照信息
spark.sql("select * from hadoop_prod.default.my_tb1.snapshots").show(false)
${catalog名称}.${库名}.${Iceberg表}.history
进行查询,操作如下:// 查看Iceberg表历史信息
spark.sql("select * from hadoop_prod.default.my_tb1.history").show(false)
/*
+-----------------------+-------------------+---------+-------------------+
|made_current_at |snapshot_id |parent_id|is_current_ancestor|
+-----------------------+-------------------+---------+-------------------+
|2022-07-30 00:15:07.942|8409100036511820619|null |true |
+-----------------------+-------------------+---------+-------------------+
*/
${catalog名称}.${库名}.${Iceberg表}.files
来查询Iceberg表对应的data files 信息,操作如下:// 查看Iceberg表中的data files
spark.sql("select * from hadoop_prod.default.my_tb1.files").show(false)
${catalog名称}.${库名}.${Iceberg表}.manifests
来查询表对应的manifests信息,具体操作如下:// 查询Manifests
spark.sql("select * from hadoop_prod.default.my_tb1.manifests").show(false)
snapshot-id
来查询指定快照的数据,这种方式可以使用DataFrame Api方式来查询// 查询指定快照数据
spark.read.option("snapshot-id", 8409100036511820619L).format("iceberg")
.load("hdfs://node01:8020/spark_iceberg/default/my_tb1")
.show()
/*
+---+----+---+
| id|name|age|
+---+----+---+
| 1| zs| 18|
| 3| ww| 20|
| 2| ls| 19|
| 4| ml| 21|
+---+----+---+
*/
CALL ${Catalog 名称}.system.set_current_snapshot("${库名.表名}",快照ID)
-- 插入数据
insert into table hadoop_prod.default.my_tb1 values (5,'tq',17),(6,'qq',20);
-- 查询表数据
select * from hadoop_prod.default.my_tb1;
-- 查询表快照
select * from hadoop_prod.default.my_tb1.snapshots;
-- 指定读取快照
call hadoop_prod.system.set_current_snapshot('default.my_tb1', 8409100036511820619L);
-- 再次查询表数据
select * from hadoop_prod.default.my_tb1;
// call hadoop_prod.system.set_current_snapshot('default.my_tb1', 2564591926167696280L);
// 根据时间戳查询数据
spark.read.option("as-of-timestamp", "1659466148000")
.format("iceberg")
.load("hdfs://node01:8020/spark_iceberg/default/my_tb1")
.show()
/*
+---+----+---+
| id|name|age|
+---+----+---+
| 1| zs| 18|
| 3| ww| 20|
| 2| ls| 19|
| 4| ml| 21|
+---+----+---+
*/
CALL ${Catalog 名称}.system.rollback_to_timestamp("${库名.表名}", TIMESTAMP '日期数据')
,操作如下:-- 指定读取快照
call hadoop_prod.system.rollback_to_timestamp('default.my_tb1', TIMESTAMP '2022-07-30 18:00:00');
-- 查询表数据
select * from hadoop_prod.default.my_tb1;
// 回滚快照
spark.sql("select * from hadoop_prod.default.my_tb1").show() // 回滚前
val conf = new Configuration()
val catalog = new HadoopCatalog(conf, "hdfs://node01:8020/spark_iceberg")
val table = catalog.loadTable(TableIdentifier.of("default", "my_tb1"))
table.manageSnapshots().rollbackTo(8409100036511820619L).commit()
spark.sql("select * from hadoop_prod.default.my_tb1").show() // 回滚后
CALL ${Catalog 名称}.system.rollback_to_snapshot("${库名.表名}", 快照ID)
-- 回滚快照
call hadoop_prod.system.rollback_to_snapshot('default.my_tb1', 8409100036511820619);
/** ***************** 1. 创建iceberg表表my_test *******************/
spark.sql("create table if not exists hadoop_prod.default.my_test(id int,name string,age int) using iceberg")
/** ***************** 2. 向表my_test中插入一批数据 *******************/
val df: DataFrame = spark.read.textFile(this.getClass.getClassLoader.getResource("nameinfo.txt").getPath)
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0).toInt, arr(1), arr(2).toInt)
}).toDF("id", "name", "age")
df.writeTo("hadoop_prod.default.my_test").append()
/******************* 3. 合并小文件数据 *******************/
val catalog = new HadoopCatalog(new Configuration(), "hdfs://node01:8020/spark_iceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("default", "my_test"))
SparkActions.get().rewriteDataFiles(table)
.filter(Expressions.greaterThanOrEqual("id", 1))
.option("target-file-size-bytes", "10240") // 10KB
.execute()
/******************* 4. 删除历史快照 *******************/
table.expireSnapshots().expireOlderThan(1659658697197L).commit()
CALL ${Catalog 名称}.system.expire_snapshots("${库名.表名}", TIMESTAMP '年-月-日 时-分-秒.000', N)
CREATE TABLE ${Catalog名称}.${库名}.${表名} (
id bigint,
name string
) using iceberg
PARTITIONED BY (loc string)
TBLPROPERTIES (
-- 每次表提交后是否删除旧的元数据文件
'write.metadata.delete-after-commit.enabled'= true,
-- 要保留旧的元数据文件数量
'write.metadata.previous-version-max' = 3
)
insert into
是向Iceberg表中插入数据,有两种语法形式:INSERT INTO tbl VALUES (1,"zs",18),(2,"ls",19)
INSERT INTO tbl SELECT ...
merge into
语法可以对表数据进行行级更新或删除,在 Spark3.x 版本之后支持,其原理是重写包含需要删除和更新行数据所在的 data files。merge into
可以使用一个查询结果数据来更新目标表的数据,其语法通过类似 join 关联方式,根据指定的匹配条件对匹配的行数据进行相应操作。语法如下:MERGE INTO tbl t
USING (SELECT ...) s
ON t.id = s.id
-- 删除
WHEN MATCHED AND ... THEN DELETE
-- 更新
WHEN MATCHED AND ... THEN UPDATE SET ...
-- 多条件更新
WHEN MATCHED AND ... AND ... THEN UPDATE SET ...
-- 匹配不上,向目标表插入数据
WHEN NOT MATCHED ADN ... THEN INSERT (col1,col2...) VALUES(s.col1,s.col2 ...)
-- 创建表a,并插入数据
create table hadoop_prod.default.t_a(id int, name string, age int) using iceberg;
insert into hadoop_prod.default.t_a values(1, 'zs', 13), (2, 'ls', 24), (3, 'ww', 35);
-- 创建表b,并插入数据
create table hadoop_prod.default.t_b(id int, name string, age int, tp string) using iceberg;
insert into hadoop_prod.default.t_b values(1, 'zs', 23, 'del'),(2, 'ls', 14, 'upd'), (4, 'ww', 25, 'add');
t_b
与 t_a
表匹配 id:
t_b
中 tp 字段是 del
,则 t_a
表中对应 id 数据删除;t_b
中 tp 字段是 upd
,则 t_a
表中对应 id 数据其他字段进行更新;t_a
与 t_b
匹配不上,那么将 t_b
表中数据插入到 t_a
中。merge into hadoop_prod.default.t_a t1 using (select id, name, age, tp from hadoop_prod.default.t_b) t2 on t1.id = t2.id
when matched and t2.tp = 'del' then delete
when matched and t2.tp = 'upd' then update set t1.name = t2.name, t1.age = t2.age
when not matched then insert (id, name, age) values (t2.id, t2.name, t2.age);
注意:更新数据时,在查询的数据中只能有一条匹配的数据更新到目标表,否则将报错。
insert overwrite
可以覆盖Iceberg表中的数据,这种操作会将表中全部数据替换掉,建议如果有部分数据替换操作可以使用 merge into
操作。insert overwrite
操作时,有两种情况:
-- 创建 test1 分区表,并插入数据
create table hadoop_prod.default.test1 (id int, name string, loc string) using iceberg partitioned by (loc);
insert into hadoop_prod.default.test1 values(1, 'zs', 'bj'), (2, 'ls', 'sz');
-- 创建 test2 普通表,并插入数据
create table hadoop_prod.default.test2 (id int, name string, loc string) using iceberg;
insert into hadoop_prod.default.test2 values(10, 'x1', 'gd'), (11, 'x2', 'wh');
-- 创建 test3 普通表,并插入数据
create table hadoop_prod.default.test3 (id int, name string, loc string) using iceberg;
insert into hadoop_prod.default.test3 values(3, 'ww', 'bj'), (4, 'ml', 'sh'), (5, 'tq', 'gz');
insert overwrite hadoop_prod.default.test2
select id, name, loc from hadoop_prod.default.test3;
insert overwrite hadoop_prod.default.test1
select id, name, loc from hadoop_prod.default.test3;
-- 删除表test1,重新创建表test1 分区表,并插入数据
drop table hadoop_prod.default.test1;
create table hadoop_prod.default.test1 (id int, name string, loc string) using iceberg partitioned by (loc);
insert into hadoop_prod.default.test1 values(1, 'zs', 'bj'), (2, 'ls', 'sz');
-- 静态分区下,就不要在查询 “loc" 列了,否则重复
insert overwrite hadoop_prod.default.test1
partition (loc = 'js')
select id, name from hadoop_prod.default.test3;
注意:使用insert overwrite 读取test3表数据 静态分区方式覆盖到表 test1,表中其他分区数据不受影响,只会覆盖指定的静态分区数据。
Delete from
可以根据指定的 where 条件来删除表中数据。如果 where 条件匹配 Iceberg 表一个分区的数据,Iceberg 仅会修改元数据,如果where条件匹配的表的单个行,则 Iceberg 会重写受影响行所在的数据文件。-- 创建表 t_del,并插入数据
create table hadoop_prod.default.t_del (id int, name string, age int) using iceberg;
insert into hadoop_prod.default.t_del values (1, 'zs', 13), (2, 'ls', 14), (3, 'ww', 25), (4, 'zl', 36), (5, 'tq', 27), (6, 'gb', 18);
-- 根据条件范围删除表 t_del 中的数据
delete from hadoop_prod.default.t_del where id > 3 and id < 6;
-- 根据条件删除表 t_del 中的一条数据
delete from hadoop_prod.default.t_del where id = 2;
-- 创建表 t_upd,并插入数据
create table hadoop_prod.default.t_upd (id int, name string, age int) using iceberg;
insert into hadoop_prod.default.t_upd values (1, 'zs', 13), (2, 'ls', 14), (3, 'ww', 25), (4, 'zl', 36), (5, 'tq', 27), (6, 'gb', 18);
-- 根据条件范围update表 t_upd 中的数据
update hadoop_prod.default.t_upd set name = 'xxx', age = 30 where id > 3;
df.write(tbl).create() -- 相当于 CREATE TABLE AS SELECT ...
df.write(tbl).replace() -- 相当于 REPLACE TABLE AS SELECT ...
df.write(tbl).append() -- 相当于 INSERT INTO ...
df.write(tbl).overwritePartitions() -- 相当于动态 INSERT OVERWRITE ...
// 1.准备数据,使用DataFrame Api 写入Iceberg表及分区表
val nameJsonList = List[String](
"{\"id\":1,\"name\":\"zs\",\"age\":18,\"loc\":\"beijing\"}",
"{\"id\":2,\"name\":\"ls\",\"age\":19,\"loc\":\"shanghai\"}",
"{\"id\":3,\"name\":\"ww\",\"age\":20,\"loc\":\"beijing\"}",
"{\"id\":4,\"name\":\"ml\",\"age\":21,\"loc\":\"shanghai\"}")
import spark.implicits._
val df: DataFrame = spark.read.json(nameJsonList.toDS)
// 2. 创建普通表df_tbl1,并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列
df.writeTo("hadoop_prod.default.df_tbl1").create()
// 3. 查询表 hadoop_prod.default.df_tbl1 中的数据,并查看数据存储结构
spark.read.table("hadoop_prod.default.df_tbl1").show()
/*
+---+---+--------+----+
|age| id| loc|name|
+---+---+--------+----+
| 18| 1| beijing| zs|
| 19| 2|shanghai| ls|
| 20| 3| beijing| ww|
| 21| 4|shanghai| ml|
+---+---+--------+----+
*/
// 4. 创建分区表 df_tbl2, 并将数据写入到Iceberg表,其中DF中的列就是Iceberg表中的列
df.sortWithinPartitions($"loc") //写入分区表,必须按照分区列进行排序
.writeTo("hadoop_prod.default.df_tbl2")
.partitionedBy($"loc") //这里可以指定多个列为联合分区
.create()
// 5.查询分区表 hadoop_prod.default.df_tbl2 中的数据,并查看数据存储结构
spark.read.table("hadoop_prod.default.df_tbl2").show()
/*
+---+---+--------+----+
|age| id| loc|name|
+---+---+--------+----+
| 18| 1| beijing| zs|
| 19| 2|shanghai| ls|
| 20| 3| beijing| ww|
| 21| 4|shanghai| ml|
+---+---+--------+----+
*/