• Mysql->Hudi->Hive


    一  准备

    1.启动集群 /hive/mysql
    start-all.sh
    
    2.启动spark-shell
    1. spark-shell \
    2. --master yarn \
    3. //--packages org.apache.hudi:hudi-spark3.1-bundle_2.12:0.12.2 \
    4. --jars /opt/software/hudi-spark3.1-bundle_2.12-0.12.0.jar \
    5. --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
    6. --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
    3.导入依赖包
    1. import org.apache.hudi.DataSourceWriteOptions._
    2. import org.apache.hudi.config.HoodieWriteConfig._
    3. import org.apache.spark.sql.functions._
    4. import org.apache.spark.sql.{SaveMode, SparkSession}
    1. val tableName="demotable"
    2. val hudiBasePath="hdfs://bigdata1:9000//user/hudi/hudi_ods.db/" + tableName

    二 查询mysql数据

    val DB_URL="jdbc:mysql://bigdata1:3306/ds_db01?allowPublicKeyRetrieval=true&serverTimezone=UTC&useSSL=false"
    连接mysql
    1. val df21 = spark.read.format("jdbc")
    2. .option("driver", "com.mysql.jdbc.Driver") // mysql驱动程序类名
    3. .option("url", DB_URL) // 连接url
    4. .option("dbtable", "customer_inf") // 要读取的表
    5. .option("user", "root") // 连接账户,需修改为自己的
    6. .option("password","123456") // 连接密码,需修改为自己的
    7. .option("fetchsize","50") // 每轮读取多少行
    8. .load()

     --------------------  转换Transform-增加静态分区列 -------------
    import spark.implicits._

    查询
    println(df21.count())   
    df21.show(10) 

    三 追加写入Hudi

    1. val df22 = df21.withColumn("etl_date",lit("20220816"))
    2. // val df22 = df21.withColumn("etl_date",date_format(date_sub(current_date(),1),"yyyyMMdd"))
    3. val dfWithTs = df22.withColumn("ts", current_timestamp())
    4. dfWithTs.write.format("hudi")
    5. .mode(SaveMode.Overwrite)
    6. .option("hoodie.insert.shuffle.parallelism","2")//操作并行度为2
    7. .option("hoodie.upsert.shuffle.parallelism","2")
    8. .option(RECORDKEY_FIELD.key(), "customer_inf_id")//记录键的字段名,作为hudi的主键
    9. .option(PARTITIONPATH_FIELD.key(), "etl_date")
    10. .option(TBL_NAME.key(), tableName)
    11. .save(hudiBasePath)
    12. .option(PRECOMBINE_FIELD.key(), "InPutTime")//预聚合字段名
    13. .option("hoodie.timestamp.field","modified_time")
    14. .option("hoodie.timestamp.field","birthday")
    15. .option("hoodie.timestamp.field","etl_date")
    16. .option("hoodie.timestamp.field","register_time")
    查询

    val  env_data_df=spark.read.format("org.apache.hudi").load(hudiBasePath)

    println(env_data_df.count())

    env_data_df.show()

    四 外接Hive

    1. val sql_create_table =
    2. s"""
    3. |create table hudi_demo.demotable(
    4. |customer_inf_id int,
    5. | customer_id int,
    6. | customer_name string ,
    7. | identity_card_type tinyint ,
    8. | identity_card_no string,
    9. | mobile_phone string,
    10. | customer_email string ,
    11. | gender string ,
    12. | customer_point int,
    13. | register_time timestamp ,
    14. | birthday date ,
    15. | customer_level tinyint ,
    16. | customer_money decimal,
    17. | modified_time string,
    18. | ts timestamp,
    19. | etl_date string
    20. |)
    21. |using hudi
    22. |tblproperties(
    23. | primaryKey = 'customer_inf_id',
    24. | type = 'cow'
    25. |)
    26. | options (
    27. | hoodie.metadata.enable = 'true'
    28. | )
    29. |partitioned by (etl_date)
    30. |location '$hudiBasePath'
    31. |""".stripMargin
    32. spark.sql(sql_create_table)

    查询分区

    hive查询

    FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat
    87308 [a3eed69d-1888-48fb-82f7-7254909d770f main] ERROR org.apache.hadoop.hive.ql.Driver  - FAILED: RuntimeException java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat
     

    报错Hudi集成Hive时的异常解决方法 java.lang.ClassNotFoundException: org.apache.hudi.hadoop.HoodieParquetInputFormat_田昕峣 Richard的博客-CSDN博客
    原因

    缺少相应的jar包org.apache.hudi.hadoop.HoodieParquetInputFormat

    查看hudi的pom文件发现hive版本为2.3.1

    重新编译构建

  • 相关阅读:
    MySQL数据库更换数据路径
    termux使用
    servlet实现登录功能【当用户当前未登陆,跳转登录页面才能访问,若已经登录了,才可以直接访问】
    数据结构插入排序
    PX4模块设计之十八:Logger模块
    C语言 — _getch() 和 system(“pause“)
    iObjects C++许可模块划分
    API简介,如何运用API接口获取商品数据(淘宝/天猫、1688、拼多多、京东等二十多个海内外电商平台)
    linux应用hook实例(含源码分析)
    【HarmonyOS】HUAWEI DevEco Studio 下载地址汇总
  • 原文地址:https://blog.csdn.net/m0_65077254/article/details/132801328