题目要求:
-
抽取ds_db01库中customer_inf的增量数据进入Hive的ods库中表customer_inf。根据ods.user_info表中modified_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前日期的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.customer_inf命令;
代码实现:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.time.LocalDate
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("one")
.set("spark.testing.memory", "2147480000").set("dfs.client.use.datanode.hostname", "true")
System.setProperty("HADOOP_USER_NAME", "root")
val spark = SparkSession.builder()
.config("hive.metastore.uris", "thrift://192.168.23.60:9083")
.config("hive.metastore.warehouse", "hdfs://192.168.23.60://9000/user/hive/warehouse")
.config("spark.sql.storeAssignmentPolicy", "LEGACY")
spark.read.format("jdbc")
.option("url","jdbc:mysql://192.168.23.60:3306/ds_db01??characterEncoding=UTF-8")
.option("driver","com.mysql.jdbc.Driver")
.option("password","123456")
.option("dbtable","customer_inf")
.load().createOrReplaceTempView("v")
spark.sql("select * from v")
val unit = java.time.LocalDate.now().plusYears(-1).plusMonths(-1).plusDays(-1).toString().replace("-", "")
|select customer_inf_id,customer_id,customer_name,identity_card_type,identity_card_no,mobile_phone,
|customer_email,gender,customer_point,register_time,birthday,customer_level,customer_money,
|from_unixtime(unix_timestamp(modified_time,'yyyy-MM-dd'),'yyyyMMdd') as modified_time
|""".stripMargin).createOrReplaceTempView("v1")
|insert overwrite table gh_test.customer_inf
|partition (etl_date="${unit}")
|select * from v where modified_time>"${unit1}"
spark.sql("select count(*) from gh_test.customer_inf").show