启动spark-shell
- spark-shell \
- > --jars /opt/software/hudi-spark3.1-bundle_2.12-0.12.0.jar \
- > --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'\
- > --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
2
hudi内置数据生成器,生成10条json数据
- scala> :paste
- // Entering paste mode (ctrl-D to finish)
-
- import org.apache.hudi.QuickstartUtils._
- import scala.collection.JavaConversions._
- import org.apache.spark.sql.SaveMode._
- import org.apache.hudi.DataSourceReadOptions._
- import org.apache.hudi.DataSourceWriteOptions._
- import org.apache.hudi.config.HoodieWriteConfig._
- import org.apache.hudi.common.model.HoodieRecord
-
- val tableName="hudi_trips_cow"
- val basePath ="file:///tmp/hudi_trips_cow"
- val dataGen = new DataGenerator
-
- val inserts=convertToStringList(dataGen.generateInserts(10))
3加载到DF,写入hudi,实现简单etl处理
- scala> :paste
- // Entering paste mode (ctrl-D to finish)
-
- val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
- df.write.format("hudi").
- options(getQuickstartWriteConfigs).
- option(PRECOMBINE_FIELD_OPT_KEY, "ts").
- option(RECORDKEY_FIELD_OPT_KEY, "uuid").
- option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
- option(TABLE_NAME, tableName).
- mode(Overwrite).
- save(basePath)
4读取存储数据及注册临时表
- scala> :paste
- // Entering paste mode (ctrl-D to finish)
-
- val tripsSnapshotDF = spark.read.format("hudi").load(basePath + "/*/*/*/*")
- tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
- spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()