• 6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)-dsl


    6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)

    6.1 使用spark dsl 方式处理

    6.1.1 注意事项

    1. # 开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了
    2. .enableHiveSupport()
    3. # 这下脚本都是作用在dwd层,所以必须在dwd的用户下执行,可能会报权限不够,需要我们申请权限

    6.1.2 项目结构如下:

    1.是脚本,内容如下:

    1. # 分区
    2. ds=$1
    3. # 执行任务
    4. spark-submit \
    5. --master yarn-client \
    6. --class com.wt.dwd.DwdFcjNwrsSellbargainMskDay \
    7. ../target/dwd-1.0-SNAPSHOT.jar \
    8. $ds
    9. # 增加分区
    10. hive -e "alter table dwd.dwd_gsj_reg_investor_msk_d add IF NOT EXISTS partition (ds='$ds')"
    11. # 注意:
    12. 1.如果换行的话,后面必须加上 \
    13. 2.因为jar包如果不指定路径的话会找不到
    14. 3.可以在最后面动态的增加分区,然后再动态的传入变量

    2.是在dwd中运行的hive建表语句

    1. -- hive建表语句
    2. -- hive建表语句
    3. CREATE external TABLE IF NOT EXISTS dwd.dwd_fcj_nwrs_sellbargain_msl_d(
    4. id STRING comment '身份证号码',
    5. r_fwzl STRING comment '房产地址',
    6. htydjzmj STRING comment '合同中约定房子面积',
    7. tntjzmj STRING comment '房子内建筑面积',
    8. ftmj STRING comment '房子分摊建筑面积',
    9. time_tjba STRING comment '商品房备案时间',
    10. htzj STRING comment '合同总价'
    11. )PARTITIONED BY
    12. (
    13. ds STRING
    14. )
    15. ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    16. STORED AS textfile
    17. location '/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/';

    3,是代码运行的逻辑,主要是处理数据

    1. package com.wt.dwd
    2. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    3. object DwdFcjNwrsSellbargainMskDay {
    4. def main(args: Array[String]): Unit = {
    5. /**
    6. * 1. 创建spark环境
    7. *
    8. */
    9. val spark: SparkSession = SparkSession
    10. .builder
    11. //.master("local")
    12. .enableHiveSupport() //开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了
    13. .getOrCreate()
    14. import spark.implicits._
    15. import org.apache.spark.sql.functions._
    16. /**
    17. * 获取时间分区的字段
    18. *
    19. */
    20. val ds: String = args.head
    21. /**
    22. * 2. 读取获取购房合同中的表
    23. * 必须带上库名,否则读不到
    24. *
    25. * 不可能读取所有的数据,我们只需要读取每一天的数据
    26. *
    27. */
    28. val sellbargain: DataFrame = spark
    29. .table("ods.ods_t_fcj_nwrs_sellbargain")
    30. .where($"ds" === ds)
    31. //对原始的数据进行托名 对id进行脱敏,然后将r_fwzl中的数字变成 * 号(通过正则表达式替换)
    32. val resultDF: DataFrame = sellbargain.select(
    33. upper(md5($"id")) as "id",
    34. regexp_replace($"r_fwzl", "\\d", "*") as "r_fwzl",
    35. $"htydjzmj",
    36. $"tntjzmj",
    37. $"ftmj",
    38. $"time_tjba",
    39. $"htzj"
    40. )
    41. resultDF
    42. .write
    43. .format("csv")
    44. .mode(SaveMode.Overwrite)
    45. .option("sep","\t")
    46. .save(s"/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/ds=$ds")
    47. //提交到集群运行 spark-submit --master yarn-client --class com.wt.dwd.DwdFcjNwrsSellbargainMskDay dwd-1.0-SNAPSHOT.jar
    48. }
    49. }

    6.1.3 将通用的东西封装(重要-可以极快的提高效率):

    代码逻辑如下:

    1. package com.wt.common
    2. import org.apache.spark.internal.Logging
    3. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
    4. abstract class SparkTool extends Logging{
    5. def main(args: Array[String]): Unit = {
    6. /**
    7. * 获取时间分区
    8. *
    9. */
    10. if(args.length == 0){
    11. logError("请指定分区!!!")
    12. return
    13. }
    14. val ds: String = args.head
    15. //创建spark环境
    16. val spark: SparkSession = SparkSession
    17. .builder()
    18. .enableHiveSupport()
    19. .getOrCreate()
    20. //调用子类实现的抽象方法
    21. this.run(spark,ds)
    22. }
    23. /**
    24. * 抽象方法: 在子类中实现这个方法
    25. * import spark.implicits._
    26. * import org.apache.spark.sql.functions._
    27. *
    28. * @param spark: spark的环境
    29. * @param ds:分区
    30. */
    31. def run(spark: SparkSession,ds: String): Unit
    32. /**
    33. * 传入DataFrame 和 path 就可以保存数据
    34. *
    35. * 其中的format默认值是 csv格式的。
    36. *
    37. */
    38. def save(dataframe:DataFrame,path:String,format:String = "csv"): Unit={
    39. dataframe
    40. .write
    41. .format("csv")
    42. .mode(SaveMode.Overwrite)
    43. .option("sep","\t")
    44. .save(path)
    45. }
    46. }

    理解:子类继承父类,在父类中已经封装好了spark,DataFormat的save 两个环境,需要子类继承SparkTool ,就可以拿到父类已经创建好的环境,减少代码量,提高效率

    在save 中可以设置默认值,如果不传入的话,就使用默认值

    而且该工具还吧 ds 给封装好了,我们在使用的时候可以直接用变量传入即可

    6.1.4 调用方法如下:

    1. package com.wt.dwd
    2. import com.wt.common.SparkTool
    3. import org.apache.spark.sql.{DataFrame, SparkSession}
    4. object DwdGsjRegLegrepreMskDay extends SparkTool{
    5. /**
    6. * 抽象方法: 在子类中实现这个方法
    7. * import spark.implicits._
    8. * import org.apache.spark.sql.functions._
    9. *
    10. * @param spark : spark的环境
    11. * @param ds :分区
    12. */
    13. override def run(spark: SparkSession, ds: String): Unit = {
    14. import spark.implicits._
    15. import org.apache.spark.sql.functions._
    16. /**
    17. * 读取hive中的表
    18. *
    19. */
    20. val legrepre: DataFrame = spark
    21. .table("ods.ods_t_gsj_reg_legrepre")
    22. .where($"ds" === ds)
    23. val resultDF: DataFrame = legrepre
    24. .select(
    25. upper(md5($"id")) as "id",
    26. $"position",
    27. upper(md5($"tel")) as "tel",
    28. $"appounit",
    29. $"accdside",
    30. $"posbrmode",
    31. $"offhfrom",
    32. $"offhto"
    33. )
    34. save(resultDF,s"/daas/motl/dwd/dwd_gsj_reg_legrepre_msk_d/ds=$ds")
    35. }
    36. }

    6.1.5 脚本如下:

    因为在common工具和dwd不在同一个模块中,需要在dwd模块中导入common的jar包

    需要导入依赖,如下:

    脚本如下;

    1. # 分区
    2. ds=$1
    3. # --jars :指定代码需要的其他的包
    4. # 执行任务
    5. spark-submit \
    6. --master yarn-client \
    7. --class com.wt.dwd.DwdGsjRegLegrepreMskDay \
    8. --jars ../lib/common-1.0-SNAPSHOT.jar \
    9. ../target/dwd-1.0-SNAPSHOT.jar \
    10. $ds
    11. # 增加分区
    12. hive -e "alter table dwd.dwd_gsj_reg_legrepre_msk_d add IF NOT EXISTS partition (ds='$ds')"

    将dwd包拖到dwd用户下,执行脚本,最终结果如下

  • 相关阅读:
    (四)进程管理:进程基本概念
    开发板uboot与virtualbox虚拟机、windows11网络互通
    Java — 内部类
    低代码平台选型,你一定要知道以下5点
    第十七章 维护本地数据库(二)
    一命通关双指针
    FreeRTOS开始的宏和任务状态
    【数据结构】串的模式匹配:简单的模式匹配算法,KMP算法
    每日一题:地下城游戏
    抖音排名怎么做的?抖音视频排名规则是什么样的
  • 原文地址:https://blog.csdn.net/weixin_48370579/article/details/126220331