6. 从ods(贴源层)到 dwd(数据明细层)的两种处理方式(spark)
6.1 使用spark dsl 方式处理
6.1.1 注意事项
- # 开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了
- .enableHiveSupport()
-
- # 这下脚本都是作用在dwd层,所以必须在dwd的用户下执行,可能会报权限不够,需要我们申请权限
6.1.2 项目结构如下:

1.是脚本,内容如下:
- # 分区
- ds=$1
-
- # 执行任务
- spark-submit \
- --master yarn-client \
- --class com.wt.dwd.DwdFcjNwrsSellbargainMskDay \
- ../target/dwd-1.0-SNAPSHOT.jar \
- $ds
- # 增加分区
- hive -e "alter table dwd.dwd_gsj_reg_investor_msk_d add IF NOT EXISTS partition (ds='$ds')"
-
- # 注意:
- 1.如果换行的话,后面必须加上 \
- 2.因为jar包如果不指定路径的话会找不到
- 3.可以在最后面动态的增加分区,然后再动态的传入变量
2.是在dwd中运行的hive建表语句
- -- hive建表语句
- -- hive建表语句
- CREATE external TABLE IF NOT EXISTS dwd.dwd_fcj_nwrs_sellbargain_msl_d(
- id STRING comment '身份证号码',
- r_fwzl STRING comment '房产地址',
- htydjzmj STRING comment '合同中约定房子面积',
- tntjzmj STRING comment '房子内建筑面积',
- ftmj STRING comment '房子分摊建筑面积',
- time_tjba STRING comment '商品房备案时间',
- htzj STRING comment '合同总价'
- )PARTITIONED BY
- (
- ds STRING
- )
- ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
- STORED AS textfile
- location '/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/';
3,是代码运行的逻辑,主要是处理数据
- package com.wt.dwd
- import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
-
- object DwdFcjNwrsSellbargainMskDay {
- def main(args: Array[String]): Unit = {
-
- /**
- * 1. 创建spark环境
- *
- */
- val spark: SparkSession = SparkSession
- .builder
- //.master("local")
- .enableHiveSupport() //开启hive元数据支持,开启之后在spark中可以直接读取hive中的表,但是开启之后就不能再本地云心的了
- .getOrCreate()
-
- import spark.implicits._
- import org.apache.spark.sql.functions._
-
- /**
- * 获取时间分区的字段
- *
- */
- val ds: String = args.head
- /**
- * 2. 读取获取购房合同中的表
- * 必须带上库名,否则读不到
- *
- * 不可能读取所有的数据,我们只需要读取每一天的数据
- *
- */
- val sellbargain: DataFrame = spark
- .table("ods.ods_t_fcj_nwrs_sellbargain")
- .where($"ds" === ds)
-
- //对原始的数据进行托名 对id进行脱敏,然后将r_fwzl中的数字变成 * 号(通过正则表达式替换)
- val resultDF: DataFrame = sellbargain.select(
- upper(md5($"id")) as "id",
- regexp_replace($"r_fwzl", "\\d", "*") as "r_fwzl",
- $"htydjzmj",
- $"tntjzmj",
- $"ftmj",
- $"time_tjba",
- $"htzj"
- )
-
- resultDF
- .write
- .format("csv")
- .mode(SaveMode.Overwrite)
- .option("sep","\t")
- .save(s"/daas/motl/dwd/dwd_fcj_nwrs_sellbargain_msl_d/ds=$ds")
-
- //提交到集群运行 spark-submit --master yarn-client --class com.wt.dwd.DwdFcjNwrsSellbargainMskDay dwd-1.0-SNAPSHOT.jar
- }
- }
6.1.3 将通用的东西封装(重要-可以极快的提高效率):

代码逻辑如下:
- package com.wt.common
- import org.apache.spark.internal.Logging
- import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
- abstract class SparkTool extends Logging{
- def main(args: Array[String]): Unit = {
-
- /**
- * 获取时间分区
- *
- */
- if(args.length == 0){
- logError("请指定分区!!!")
- return
- }
- val ds: String = args.head
-
- //创建spark环境
- val spark: SparkSession = SparkSession
- .builder()
- .enableHiveSupport()
- .getOrCreate()
-
- //调用子类实现的抽象方法
- this.run(spark,ds)
- }
-
- /**
- * 抽象方法: 在子类中实现这个方法
- * import spark.implicits._
- * import org.apache.spark.sql.functions._
- *
- * @param spark: spark的环境
- * @param ds:分区
- */
- def run(spark: SparkSession,ds: String): Unit
-
- /**
- * 传入DataFrame 和 path 就可以保存数据
- *
- * 其中的format默认值是 csv格式的。
- *
- */
-
- def save(dataframe:DataFrame,path:String,format:String = "csv"): Unit={
- dataframe
- .write
- .format("csv")
- .mode(SaveMode.Overwrite)
- .option("sep","\t")
- .save(path)
- }
- }
理解:子类继承父类,在父类中已经封装好了spark,DataFormat的save 两个环境,需要子类继承SparkTool ,就可以拿到父类已经创建好的环境,减少代码量,提高效率
在save 中可以设置默认值,如果不传入的话,就使用默认值
而且该工具还吧 ds 给封装好了,我们在使用的时候可以直接用变量传入即可
6.1.4 调用方法如下:
- package com.wt.dwd
- import com.wt.common.SparkTool
- import org.apache.spark.sql.{DataFrame, SparkSession}
-
- object DwdGsjRegLegrepreMskDay extends SparkTool{
- /**
- * 抽象方法: 在子类中实现这个方法
- * import spark.implicits._
- * import org.apache.spark.sql.functions._
- *
- * @param spark : spark的环境
- * @param ds :分区
- */
- override def run(spark: SparkSession, ds: String): Unit = {
- import spark.implicits._
- import org.apache.spark.sql.functions._
-
- /**
- * 读取hive中的表
- *
- */
- val legrepre: DataFrame = spark
- .table("ods.ods_t_gsj_reg_legrepre")
- .where($"ds" === ds)
-
- val resultDF: DataFrame = legrepre
- .select(
- upper(md5($"id")) as "id",
- $"position",
- upper(md5($"tel")) as "tel",
- $"appounit",
- $"accdside",
- $"posbrmode",
- $"offhfrom",
- $"offhto"
- )
-
- save(resultDF,s"/daas/motl/dwd/dwd_gsj_reg_legrepre_msk_d/ds=$ds")
-
- }
- }
6.1.5 脚本如下:
因为在common工具和dwd不在同一个模块中,需要在dwd模块中导入common的jar包

需要导入依赖,如下:

脚本如下;
- # 分区
- ds=$1
-
- # --jars :指定代码需要的其他的包
-
- # 执行任务
- spark-submit \
- --master yarn-client \
- --class com.wt.dwd.DwdGsjRegLegrepreMskDay \
- --jars ../lib/common-1.0-SNAPSHOT.jar \
- ../target/dwd-1.0-SNAPSHOT.jar \
- $ds
- # 增加分区
- hive -e "alter table dwd.dwd_gsj_reg_legrepre_msk_d add IF NOT EXISTS partition (ds='$ds')"
将dwd包拖到dwd用户下,执行脚本,最终结果如下

