• 离线数据处理——子任务一:数据抽取


    目录

    子任务一:数据抽取

    实现代码

    (1)定义工具类

    (2)定义工作类


    子任务一:数据抽取

    编写Scala代码,使用Spark将MySQL的shtd_store库中表user_info、sku_info、base_province、base_region、order_info、order_detail的数据增量抽取到Hive的ods库中对应表user_info、sku_info、base_province、base_region、order_info、order_detail中。

    1.抽取shtd_store库中user_info的增量数据进入Hive的ods库中表user_info。根据ods.user_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.user_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

    2.抽取shtd_store库中sku_info的增量数据进入Hive的ods库中表sku_info。根据ods.sku_info表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.sku_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

    3.抽取shtd_store库中base_province的增量数据进入Hive的ods库中表base_province。根据ods.base_province表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加静态分区,分区字段类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.base_province命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

    4.抽取shtd_store库中base_region的增量数据进入Hive的ods库中表base_region。根据ods.base_region表中id作为增量字段,只将新增的数据抽入,字段名称、类型不变并添加字段create_time取当前时间,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.base_region命令,将结果截图粘贴至对应报告中;

    5.抽取shtd_store库中order_info的增量数据进入Hive的ods库中表order_info,根据ods.order_info表中operate_time或create_time作为增量字段(即MySQL中每条数据取这两个时间中较大的那个时间作为增量字段去和ods里的这两个字段中较大的时间进行比较),只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_info命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下;

    6.抽取shtd_store库中order_detail的增量数据进入Hive的ods库中表order_detail,根据ods.order_detail表中create_time作为增量字段,只将新增的数据抽入,字段名称、类型不变,同时添加静态分区,分区字段为etl_date,类型为String,且值为当前比赛日的前一天日期(分区字段格式为yyyyMMdd)。使用hive cli执行show partitions ods.order_detail命令,将结果截图粘贴至客户端桌面【Release\任务B提交结果.docx】中对应的任务序号下。

    实现代码

    (1)定义工具类

    该工具类包含两个方法。loadJDBC方法负责加载mysql中的数据,appendHive方法负责将增量抽取的数据追加写入到hive的相关表中。

    1. package com.hbzy.GZ03
    2. import org.apache.orc.OrcProto.ColumnEncoding
    3. import org.apache.spark.sql.{DataFrame, SparkSession}
    4. // 负责mysql的链接和hive的写入
    5. object gz03utils {
    6. // 加载jdbc
    7. def loadJDBC(sparkSession: SparkSession, jdbcMap:Map[String, String]):DataFrame ={
    8. val dataframe: DataFrame = sparkSession.read.format("jdbc").options(jdbcMap).load()
    9. dataframe
    10. }
    11. // 增量写入hive
    12. def appendHive(sparkSession: SparkSession, dataFrame: DataFrame, hiveMap:Map[String, String]):Unit = {
    13. val db = hiveMap("db") // 确定数据库
    14. val tb = hiveMap("tb") // 确定表
    15. val partitionColumn = hiveMap.get("partitionColumn") // 确定分区列
    16. sparkSession.sql(s"use ${db}") // 使用插值法填充
    17. // 有的表需要分区,有的不需要。这里使用模式匹配来分别处理
    18. partitionColumn match {
    19. case Some(column) => dataFrame
    20. .write
    21. .format("parquet") // 这里的关键是写入格式问题,不能用hive,要用parquet
    22. .mode("append")
    23. .partitionBy(column)
    24. .saveAsTable(tb)
    25. case None => dataFrame
    26. .write
    27. .format("parquet")
    28. .mode("append")
    29. .saveAsTable(tb)
    30. }
    31. }
    32. }

    (2)定义工作类

    本题需要增量抽取相同的数据库的6张不同的表到hive数据库中。所以定义了6个方法,分别对应了6张表的抽取。user_info、sku_info、base_province、base_region、order_info、order_detail。抽取的具体方法和逻辑大同小异。首先需要编写hiveSQL的查询语句,查询出数据库中最大的时间或者id。接着根据时间或者id在mysql中进行查找。然后将查找到的结果再追加写入到hive的表中,同时增加相关的列。

    1. package com.hbzy.GZ03
    2. import com.hbzy.GZ03.gz03utils.{appendHive, loadJDBC}
    3. import org.apache.spark.SparkConf
    4. import org.apache.spark.sql.functions.{current_timestamp, lit}
    5. import org.apache.spark.sql.{DataFrame, SparkSession}
    6. import java.sql.Timestamp
    7. object gz03job {
    8. def main(args: Array[String]): Unit = {
    9. val SparkConf = new SparkConf().setMaster("local[*]").setAppName("gz03job")
    10. val session: SparkSession = SparkSession
    11. .builder() // 环境
    12. .config(SparkConf) // 各种配置项
    13. .config("hive.metastore.uris", "thrift://192.168.79.132:9083") // 加载元数据
    14. // 打开hive动态分区
    15. .config("hive.exec.dynamic.partition", "true")
    16. .config("hive.exec.dynamic.partition.mode", "nonstrict")
    17. // 需要根据分区值,覆盖原来的分区时,需要配置的参数
    18. // .config("spark.sql.source.partitionOverwriteMode", "dynamic")
    19. .enableHiveSupport() // 获得hive支持
    20. .getOrCreate() // 创建
    21. // 增量抽取user_info
    22. job_user_info(session)
    23. // 增量抽取sku_info
    24. job_sku_info(session)
    25. // 增量抽取base_province
    26. job_base_province(session)
    27. // 增量抽取base_region
    28. job_base_region(session)
    29. // 增量抽取order_info
    30. job_order_info(session)
    31. // 增量抽取order_detail
    32. job_order_detail(session)
    33. }
    34. // 增量抽取user_info operate_time或create_time
    35. def job_user_info(sparkSession: SparkSession):Unit = {
    36. // 首先计算出user_info存量数据的最大时间
    37. val hive_max_time = "select greatest(max(operate_time), max(create_time)) max_time from ds_ods.user_info"
    38. val df: DataFrame = sparkSession.sql(hive_max_time)
    39. // df.show()
    40. val max_time = df.first().getTimestamp(0)
    41. // println(max_time)
    42. // ---------查询mysql
    43. // 定义mysql查询语句
    44. val querySQL = s"select * from user_info where operate_time > '${max_time}' or create_time > '${max_time}'"
    45. // 将查询mysql的options参数定义为jdbcMap
    46. val jdbcMap = Map(
    47. "driver" -> "com.mysql.jdbc.Driver",
    48. "url" -> "jdbc:mysql://192.168.79.132:3306/shtd_store?useSSL=false",
    49. "query" -> querySQL,
    50. "user" -> "root", // 致命性小错误,这里的参数是user而不是username
    51. "password" -> "admin"
    52. )
    53. // 查询
    54. val frame: DataFrame = loadJDBC(sparkSession, jdbcMap)
    55. frame.show()
    56. // 增加静态分区
    57. val frame1: DataFrame = frame.withColumn(colName = "etl_date", lit("20230820"))
    58. // 定义hiveMap,写入hive访问的基本信息
    59. val hiveMap = Map(
    60. "db" -> "ds_ods",
    61. "tb" -> "user_info",
    62. "partitionColumn" -> "etl_date"
    63. )
    64. // 将分区后的数据增量写入hive中
    65. appendHive(sparkSession, frame1, hiveMap)
    66. }
    67. // 抽取sku_info 条件create_time
    68. def job_sku_info(sparkSession: SparkSession):Unit = {
    69. // 计算出sku_info中的最大时间
    70. // 编写hive查询语句
    71. val hive_max_time = "select max(create_time) max_time from ds_ods.sku_info"
    72. // 运行查询语句
    73. val df: DataFrame = sparkSession.sql(hive_max_time)
    74. df.show()
    75. val max_time: Timestamp = df.first().getTimestamp(0)
    76. println(max_time)
    77. // 定义mysql查询语句
    78. val querySQL = s"select * from sku_info where create_time > '${max_time}'"
    79. // 定义jdbcMap
    80. val jdbcMap = Map(
    81. "driver" -> "com.mysql.jdbc.Driver",
    82. "url" -> "jdbc:mysql://192.168.79.132:3306/shtd_store?useSSL",
    83. "query" -> querySQL,
    84. "user" -> "root",
    85. "password" -> "admin"
    86. )
    87. // 执行查询
    88. val frame: DataFrame = loadJDBC(sparkSession, jdbcMap)
    89. frame.show()
    90. // 增加静态分区
    91. val frame1: DataFrame = frame.withColumn("etl_date", lit("20230820"))
    92. // 定义hiveMap
    93. val hiveMap = Map(
    94. "db" -> "ds_ods",
    95. "tb" -> "sku_info",
    96. "partitionColumn" -> "etl_date"
    97. )
    98. // 写入
    99. appendHive(sparkSession, frame1, hiveMap)
    100. }
    101. // 抽取base_province 条件id
    102. def job_base_province(sparkSession: SparkSession):Unit={
    103. // 编写hive查询语句,计算最大的id
    104. val hive_max_id = "select max(id) max_id from ds_ods.base_province"
    105. // 执行查询
    106. val df: DataFrame = sparkSession.sql(hive_max_id)
    107. // 数据转换
    108. val max_id = df.first().getLong(0)
    109. // 通过id来查询mysql
    110. // 定义mysql查询语句
    111. val querySQL = s"select * from base_province where id > '${max_id}'"
    112. // 定义jdbcMap
    113. val jdbcMap = Map(
    114. "driver" -> "com.mysql.jdbc.Driver",
    115. "url" -> "jdbc:mysql://192.168.79.132:3306/shtd_store?useSSL=false",
    116. "query" -> querySQL,
    117. "user" -> "root",
    118. "password" -> "admin"
    119. )
    120. // 通过工具方法执行
    121. val frame: DataFrame = loadJDBC(sparkSession, jdbcMap)
    122. frame.show()
    123. // 增加create_time以及etl_date分区
    124. val frame1: DataFrame = frame
    125. .withColumn("create_time", current_timestamp())
    126. .withColumn("etl_date", lit("20230820"))
    127. // 定义hiveMap
    128. val hiveMap = Map(
    129. "db" -> "ds_ods",
    130. "tb" -> "base_province",
    131. "partitionColumn" -> "etl_date"
    132. )
    133. // 执行增量抽取
    134. appendHive(sparkSession, frame1, hiveMap)
    135. }
    136. // 增量抽取base_region 条件id
    137. def job_base_region(sparkSession: SparkSession):Unit={
    138. // 查询最大id
    139. val hive_max_id = "select max(id) from ds_ods.base_region"
    140. // 执行
    141. val df: DataFrame = sparkSession.sql(hive_max_id)
    142. // 转换 // base_region中的这张表的id类型为varchar导致抽取到的类型为string类型,string类型不能直接转换long类型
    143. val max_id: Long = df.first().getString(0).toLong
    144. println(max_id)
    145. // 根据id来查询
    146. val querySQL = s"select * from base_region where id > '${max_id}'"
    147. // 编写jdbcMap
    148. val jdbcMap = Map(
    149. "driver" -> "com.mysql.jdbc.Driver",
    150. "url" -> "jdbc:mysql://192.168.79.132/shtd_store?useSSL=false",
    151. "query" -> querySQL,
    152. "user" -> "root",
    153. "password" -> "admin"
    154. )
    155. // 执行查询
    156. val frame: DataFrame = loadJDBC(sparkSession, jdbcMap)
    157. frame.show()
    158. // 增加create_time和etl_date分区
    159. val frame1: DataFrame = frame
    160. .withColumn("create_time", current_timestamp())
    161. .withColumn("etl_date", lit("20230820"))
    162. // 编写hiveMap
    163. val hiveMap = Map(
    164. "db" -> "ds_ods",
    165. "tb" -> "base_region",
    166. "partitionColumn" -> "etl_date"
    167. )
    168. appendHive(sparkSession, frame1, hiveMap)
    169. }
    170. // 增量抽取order_info 条件operate_time和create_time
    171. def job_order_info(sparkSession: SparkSession):Unit={
    172. // 查询hive中的最大时间
    173. val hive_max_time = "select greatest(max(create_time), max(operate_time)) max_time from ds_ods.order_info"
    174. // 执行
    175. val df: DataFrame = sparkSession.sql(hive_max_time)
    176. // 转换
    177. val max_time: Timestamp = df.first().getTimestamp(0)
    178. println(max_time)
    179. // 根据使时间查询
    180. val querySQL = s"select * from order_info where operate_time > '${max_time}' or create_time > '${max_time}'"
    181. // 定义jdbcMap
    182. val jdbcMap = Map(
    183. "driver" -> "com.mysql.jdbc.Driver",
    184. "url" -> "jdbc:mysql://192.168.79.132/shtd_store?useSSL",
    185. "query" -> querySQL,
    186. "user" -> "root",
    187. "password" -> "admin"
    188. )
    189. val frame: DataFrame = loadJDBC(sparkSession, jdbcMap)
    190. frame.show()
    191. // 增加列
    192. val frame1: DataFrame = frame.withColumn("etl_date", lit("20230820"))
    193. // 编写hiveMap
    194. val hiveMap = Map(
    195. "db" -> "ds_ods",
    196. "tb" -> "order_info",
    197. "partitionColumn" -> "etl_date"
    198. )
    199. // 执行
    200. appendHive(sparkSession, frame1, hiveMap)
    201. }
    202. // 增量抽取order_detail 条件create_time 2020-04-25 18:47:14.0
    203. def job_order_detail(sparkSession: SparkSession):Unit={
    204. // 查询hive中的最大时间
    205. val hive_max_time = "select max(create_time) from ds_ods.order_detail"
    206. // 执行查询
    207. val df: DataFrame = sparkSession.sql(hive_max_time)
    208. // 转换
    209. val max_time = df.first().getTimestamp(0)
    210. println(max_time)
    211. // 根据时间查询mysql
    212. val querySQL = s"select * from order_detail where create_time > '${max_time}'"
    213. // 编写jdbcMap
    214. val jdbcMap = Map(
    215. "driver" -> "com.mysql.jdbc.Driver",
    216. "url" -> "jdbc:mysql://192.168.79.132:3306/shtd_store?useSSL",
    217. "query" -> querySQL,
    218. "user" -> "root",
    219. "password" -> "admin"
    220. )
    221. // 查询
    222. val frame: DataFrame = loadJDBC(sparkSession, jdbcMap)
    223. frame.show()
    224. // 增加分区
    225. val frame1: DataFrame = frame.withColumn("etl_date", lit("20230820"))
    226. // 编写hiveMap
    227. val hiveMap = Map(
    228. "db" -> "ds_ods",
    229. "tb" -> "order_detail",
    230. "partitionColumn" -> "etl_date"
    231. )
    232. appendHive(sparkSession, frame1, hiveMap)
    233. }
    234. }

  • 相关阅读:
    看完这份SpringBoot神级文档,面试真的可以为所欲为
    [MAUI]模仿网易云音乐黑胶唱片的交互实现
    基于Spring Boot的ERP仓储管理信息系统设计与实现毕业设计源码150958
    P1208 [USACO1.3] 混合牛奶 Mixing Milk
    miRNA测序数据生信分析——第四讲,未知物种的生信分析实例
    如何提高自己的软件测试水平之bug定位
    自动驾驶研究生就业如何,自动驾驶的研究方向
    Python的基础语法知识:循环语句、字符串格式化、运算符的优先级
    如何开发一个扩展性高、维护性好的软件系统?(一个程序员最基本的修养)
    OData WebAPI实践-与ABP vNext集成
  • 原文地址:https://blog.csdn.net/m0_69535058/article/details/132622197