• spark学习笔记(十)——sparkSQL核心编程-自定义函数UDF、UDAF/读取保存数据/五大数据类型


    目录

    用到的全部依赖

    用户自定义函数

    UDF 

    UDAF

    弱类型

    强类型

    数据的读取与保存

    通用的方式

    数据类型

    (1)JSON

    (2)CSV

    (3)Parquet

    (4)MySQL

    (5)hive 


    用到的全部依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.sparkgroupId>
    4. <artifactId>spark-core_2.12artifactId>
    5. <version>3.0.0version>
    6. dependency>
    7. <dependency>
    8. <groupId>org.apache.sparkgroupId>
    9. <artifactId>spark-sql_2.12artifactId>
    10. <version>3.0.0version>
    11. dependency>
    12. <dependency>
    13. <groupId>mysqlgroupId>
    14. <artifactId>mysql-connector-javaartifactId>
    15. <version>5.1.27version>
    16. dependency>
    17. <dependency>
    18. <groupId>org.apache.sparkgroupId>
    19. <artifactId>spark-hive_2.12artifactId>
    20. <version>3.0.0version>
    21. dependency>
    22. <dependency>
    23. <groupId>org.apache.hivegroupId>
    24. <artifactId>hive-execartifactId>
    25. <version>1.2.1version>
    26. dependency>
    27. dependencies>
    28. <build>
    29. <plugins>
    30. <plugin>
    31. <groupId>net.alchim31.mavengroupId>
    32. <artifactId>scala-maven-pluginartifactId>
    33. <version>3.2.2version>
    34. <executions>
    35. <execution>
    36. <goals>
    37. <goal>testCompilegoal>
    38. goals>
    39. execution>
    40. executions>
    41. plugin>
    42. <plugin>
    43. <groupId>org.apache.maven.pluginsgroupId>
    44. <artifactId>maven-assembly-pluginartifactId>
    45. <version>3.1.0version>
    46. <configuration>
    47. <descriptorRefs>
    48. <descriptorRef>jar-with-dependenciesdescriptorRef>
    49. descriptorRefs>
    50. configuration>
    51. <executions>
    52. <execution>
    53. <id>make-assemblyid>
    54. <phase>packagephase>
    55. <goals>
    56. <goal>singlegoal>
    57. goals>
    58. execution>
    59. executions>
    60. plugin>
    61. plugins>
    62. build>

    用户自定义函数

    用户可以通过spark.udf功能添加自定义函数,实现自定义功能。

    UDF 

    (1)创建DataFrame

    spark的bin目录下创建input文件夹,在input里创建user.json文件,user.json内容如下:

    {"username":"zj","age":20}
    {"username":"xx","age":21}
    {"username":"yy","age":22}

    val df = spark.read.json("input/user.json")

    (2)注册UDF

     spark.udf.register("addName",(x:String)=> "Name:"+x)

    (3)创建临时表

     df.createOrReplaceTempView("people")

    (4)应用UDF

    spark.sql("Select addName(username),age from people").show()

    UDAF

    强类型的Dataset和弱类型的DataFrame都提供了相关的聚合函数max()min(),count(),avg()等等。

    用户可以设定自定义聚合函数,通过继承UserDefinedAggregateFunction来实现用户自定义弱类型聚合函数。Spark3.0版本推荐使用强类型聚合函数Aggregator。

    在datas目录下新建user.json文件,内容为:

    {"username": "zj","age": 25}
    {"username": "qq","age": 32}
    {"username": "ww","age": 43}

    弱类型

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
    3. import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}
    4. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, types}
    5. object sparkSQL_UDAF {
    6. def main(args: Array[String]): Unit = {
    7. //TODO 创建sparkSQL运行环境
    8. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    9. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    10. //TODO 执行逻辑操作 计算平均年龄
    11. //创建DataFrame
    12. val df = spark.read.json("datas/user.json")
    13. //创建临时表
    14. df.createOrReplaceTempView("user")
    15. //自定义
    16. spark.udf.register("ageAvg",new AvgUDAF())
    17. spark.sql("select ageAvg(age) from user").show()
    18. //TODO 关闭环境
    19. spark.stop()
    20. }
    21. /*
    22. 自定义函数:计算平均值
    23. 1.继承
    24. 2.重写方法
    25. */
    26. class AvgUDAF extends UserDefinedAggregateFunction{
    27. //输入数据的结构
    28. override def inputSchema: StructType = {
    29. StructType(
    30. Array(
    31. StructField("age",LongType)
    32. )
    33. )
    34. }
    35. //缓冲区数据的结构
    36. override def bufferSchema: StructType = {
    37. StructType(
    38. Array(
    39. StructField("total",LongType),
    40. StructField("count",LongType)
    41. )
    42. )
    43. }
    44. //函数计算结果数据类型
    45. override def dataType: DataType = LongType
    46. //函数稳定性
    47. override def deterministic: Boolean = true
    48. //缓冲区初始化
    49. override def initialize(buffer: MutableAggregationBuffer): Unit = {
    50. buffer.update(0,0L)
    51. buffer.update(1,0L)
    52. }
    53. //根据输入的值更新缓冲区数据
    54. override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
    55. buffer.update(0,buffer.getLong(0)+input.getLong(0))
    56. buffer.update(1,buffer.getLong(1)+1)
    57. }
    58. //缓冲区数据合并
    59. override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
    60. buffer1.update(0,buffer1.getLong(0) + buffer2.getLong(0))
    61. buffer1.update(1,buffer1.getLong(1) + buffer2.getLong(1))
    62. }
    63. //计算平均值
    64. override def evaluate(buffer: Row): Any = {
    65. buffer.getLong(0)/buffer.getLong(1)
    66. }
    67. }
    68. }

    结果

    强类型

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.sql.expressions.Aggregator
    3. import org.apache.spark.sql.{Encoder, Encoders, Row, SparkSession, functions}
    4. object sparkSQL_UDAF02 {
    5. def main(args: Array[String]): Unit = {
    6. //TODO 创建sparkSQL运行环境
    7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    8. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    9. //TODO 执行逻辑操作 计算平均年龄
    10. //创建DataFrame
    11. val df = spark.read.json("datas/user.json")
    12. //创建临时表
    13. df.createOrReplaceTempView("user")
    14. //自定义操作
    15. spark.udf.register("ageAvg",functions.udaf(new AvgUDAF()))
    16. spark.sql("select ageAvg(age) from user").show()
    17. //TODO 关闭环境
    18. spark.stop()
    19. }
    20. /*
    21. 自定义函数:计算平均值
    22. 1.继承Aggregator
    23. IN:输入的数据类型Long
    24. BUF:缓冲区的数据类型Buff
    25. OUT:输出的数据类型Long
    26. 2.重写方法
    27. */
    28. case class Buff(var total:Long,var count:Long)
    29. class AvgUDAF extends Aggregator[Long,Buff,Long]{
    30. //初始值/零值 缓冲区的初始化
    31. override def zero: Buff = {
    32. Buff(0L,0L)
    33. }
    34. //根据输入的数据更新缓冲区的数据
    35. override def reduce(buff: Buff, in: Long): Buff = {
    36. buff.total = buff.total + in
    37. buff.count = buff.count + 1
    38. buff
    39. }
    40. //合并缓冲区
    41. override def merge(buff1: Buff, buff2: Buff): Buff = {
    42. buff1.total = buff1.total + buff2.total
    43. buff1.count = buff1.count + buff2.count
    44. buff1
    45. }
    46. //计算结果
    47. override def finish(buff: Buff): Long = {
    48. buff.total / buff.count
    49. }
    50. //缓冲区的编码操作
    51. override def bufferEncoder: Encoder[Buff] = Encoders.product
    52. //输出的编码操作
    53. override def outputEncoder: Encoder[Long] = Encoders.scalaLong
    54. }
    55. }

    结果

    数据的读取与保存

    通用的方式

    SparkSQL提供了通用的保存数据和读取数据的方式;通用指的是使用相同的API根据不同的参数读取和保存不同格式的数据,SparkSQL默认读取和保存的文件格式是parquet。

    (1)读取数据

    读取数据的通用方法:spark.read.load

    数据类型:csv、format、jdbc、json、load、option、options、orc、parquet、schema、table、text、textFile

    读取不同格式的数据要对不同的数据格式进行设定:

    spark.read.format("…")[.option("…")].load("…")

    format("…"):指定加载的数据类型:csv、jdbc。json、orc、parquet、textFile;

    load("…"):在csv、jdbc、json、orc、parquet、textFile格式下传入数据的路径;

    option("…"):在jdbc格式下需要传入JDBC相应参数:url、user、password、dbtable;

    注:直接在文件上进行查询:文件格式 ‘文件路径’

    spark.sql("select * from json.`datas/user.json`").show

    (2)保存数据

    保存数据的通用方法:df.write.save

    保存不同格式的数据,可以对不同的数据格式进行设定:

    df.write.format("…")[.option("…")].save("…")

    format("…")指定保存的数据类型:csvjdbc、jsonorcparquet、textFile;

    save ("…"):在csvorcparquet、textFile格式下保存数据的路径;

    option("…"):在jdbc格式下需要传入JDBC相应参数:urluserpassword、dbtable;

    保存操作可以使用SaveMode用来指明如何处理数据,使用mode()方法来设置;有一点很重要这些 SaveMode都是没有加锁的, 也不是原子操作。

    Scala/Javaany languagemeaning
    SaveMode ErrorIfExists(默认)“error”如果文件已经存在则异常
    SaveMode Append“append”如果文件已经存在则追加
    SaveMode Overwrite“overwrite”

    如果文件已经存在则覆盖

    SaveMode Ignore“ignore”如果文件已经存在则忽略
    df.write.mode("append").json("/data/output")

    数据类型

    (1)JSON

    SparkSQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row],可以通过 SparkSession.read.json()去加载JSON文件,且读取的JSON文件不是传统的JSON文件,每一行都应该是一个JSON串。

    如:

    {"username": "zj","age": 25}
    {"username": "qq","age": 32}
    {"username": "ww","age": 43}
    1. //导入隐式转换
    2. import spark.implicits._
    3. //加载JSON文件
    4. val path = "input/user.json"
    5. val df = spark.read.json(path)
    6. //创建临时表
    7. df.createOrReplaceTempView("user")
    8. //数据查询
    9. val userDF = spark.sql("select name from user where age between 20 and 40")
    10. userDF.show()

    (2)CSV

    SparkSQL可以配置CSV文件的列表信息、读取CSV文件;CSV文件的第一行设置为数据列。

    spark.read.format("csv").option("sep", ";").option("inferSchema","true").option("header", "true").load("input/user.csv").show

    (3)Parquet

    SparkSQL的默认数据源为Parquet格式;

    Parquet是一种能够有效存储嵌套数据的列式存储格式;

    数据源为Parquet文件时,SparkSQL可以方便的执行所有的操作,不需要使用format;

    修改配置项spark.sql.sources.default可修改默认数据源格式。

    1. //加载数据
    2. val df = spark.read.load("/input/users.parquet")
    3. df.show
    4. //保存数据
    5. df.write.mode("append").save("/output")

    (4)MySQL

    SparkSQL可以通过JDBC从关系型数据库中以读取数据的方式创建DataFrame,通过对 DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

    使用spark-shell操作可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。

    1)导入依赖

    1. <dependency>
    2. <groupId>mysqlgroupId>
    3. <artifactId>mysql-connector-javaartifactId>
    4. <version>5.1.27version>
    5. dependency>

    2)读取数据/写入数据

    创建数据库

    1. create database spaek;
    2. use spark;

    创建表

    1. CREATE TABLE IF NOT EXISTS Produce
    2. (
    3. id int NOT NULL,
    4. name varchar(45) NOT NULL,
    5. age INT NULL,
    6. PRIMARY KEY (id)
    7. )
    8. ENGINE = innodb;

    添加mysql数据

    insert into user values (1,'zj',24),(2,'zjj',34),(7,'zjjj',42);

    spark代码:

    1. import java.util.Properties
    2. import org.apache.spark.SparkConf
    3. import org.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}
    4. object sparkSQL_JDBC {
    5. def main(args: Array[String]): Unit = {
    6. //TODO 创建sparkSQL运行环境
    7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    8. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    9. import spark.implicits._
    10. //TODO 执行逻辑操作
    11. //读取MySQL数据 方式一
    12. val df = spark.read
    13. .format("jdbc")
    14. .option("url","jdbc:mysql://hadoop01:3306/spark")
    15. .option("driver","com.mysql.jdbc.Driver")
    16. .option("user","root")
    17. .option("password","123456")
    18. .option("dbtable","user")
    19. .load()
    20. df.show()
    21. //读取MySQL数据 方式二
    22. spark.read.format("jdbc")
    23. .options(Map("url"->"jdbc:mysql://hadoop01:3306/spark?user=root&password=123456","dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show
    24. //读取MySQL数据 方式三
    25. val props: Properties = new Properties()
    26. props.setProperty("user", "root")
    27. props.setProperty("password", "123456")
    28. val dff: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop01:3306/spark",
    29. "user", props)
    30. dff.show
    31. //保存数据
    32. df.write
    33. .format("jdbc")
    34. .option("url","jdbc:mysql://hadoop01:3306/spark")
    35. .option("driver","com.mysql.jdbc.Driver")
    36. .option("user","root")
    37. .option("password","123456")
    38. .option("dbtable","user1")
    39. .mode(SaveMode.Append)
    40. .save()
    41. //TODO 关闭环境
    42. spark.stop()
    43. }
    44. }

     运行结果:

      

    (5)hive 

    1)Apache Hive是Hadoop上的SQL引擎,SparkSQL包含Hive支持,支持Hive表访问、UDF、Hive查询语言HQL等。

    2)在Spark SQL中包含Hive库,并不需要事先安装Hive。最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果下载的是二进制版本的Spark,应该已经在编译时添加了Hive支持。

    3)若要把Spark SQL连接到一个部署好的Hive上,必须把hive-site.xml复制到Spark的配置文件目录中($SPARK_HOME/conf)

    4)没有部署好HiveSpark SQL也可以运行。 需要注意的是,如果没有部署好HiveSpark SQL会在当前的工作目录中创建出自己的Hive元数据仓库metastore_db。尝试使用HQL中的CREATE TABLE语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse目录中

    注:如果classpath中有配好的hdfs-site.xml,默认的文件系统就是HDFS,否则就是本地文件系统,spark-shell默认Hive支持;代码中默认不支持,需要手动指定)。

    5)连接外部已经部署好的Hive,需要几个步骤:

    Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下;

    Mysql的驱动拷贝jars/目录下;

    如果访问不到hdfs,则需要把core-site.xmlhdfs-site.xml拷贝到conf/目录下;

    重启spark-shell。

    6)Spark SQL CLI 可以在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行命令bin/spark-sql启动Spark SQL CLI,直接执行SQL语句,类似Hive窗口。

    7)Spark Thrift Server是Spark基于HiveServer2实现的一个Thrift服务,无缝兼容HiveServer2;Spark Thrift Server的接口和协议都和HiveServer2完全一致,因此我们部署好Spark Thrift Server后,可以直接使用hivebeeline访问Spark Thrift Server执行相关语句。Spark Thrift Server的目的只是取代HiveServer2,它依旧可以和Hive Metastore进行交互,获取到 hive 的元数据。

    连接Thrift Server,需要几个步骤:

    Spark要接管Hive需要把hive-site.xml拷贝到conf/目录下;

    Mysql的驱动拷贝jars/目录下;

    如果访问不到hdfs,则需要把core-site.xmlhdfs-site.xml拷贝到conf/目录下;

    启动Thrift Server。

    注:sbin/start-thriftserver.sh

    使用beeline连接Thrift Server:bin/beeline -u jdbc:hive2://hadoop01:10000 -n root

    8)导入依赖

    1. <dependency>
    2. <groupId>org.apache.sparkgroupId>
    3. <artifactId>spark-hive_2.12artifactId>
    4. <version>3.0.0version>
    5. dependency>
    6. <dependency>
    7. <groupId>org.apache.hivegroupId>
    8. <artifactId>hive-execartifactId>
    9. <version>1.2.1version>
    10. dependency>

    9)将hive-site.xml文件拷贝到项目的resources目录中,代码实现:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.sql.{SparkSession}
    3. object sparkSQL_hive {
    4. def main(args: Array[String]): Unit = {
    5. //TODO 创建sparkSQL运行环境
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSQL")
    7. val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    8. //TODO 执行逻辑操作
    9. //使用sparkSQL连接hive
    10. //1.拷贝hive-site.xml文件到classpath下
    11. //2.启用hive支持
    12. //3.增加依赖
    13. spark.sql("show tables").show()
    14. //TODO 关闭环境
    15. spark.stop()
    16. }
    17. }

    本文仅仅是学习笔记的记录!!

  • 相关阅读:
    微信小程序的五种传值方式
    Mysql使用中的性能优化——索引数对插入操作性能的影响
    编程团体赛
    Bika LIMS 开源LIMS集——实验室检验流程概述及主页、面板
    「小白学Python」Windows安装Python
    Win11快捷复制粘贴不能用怎么办?Win11快捷复制粘贴不能用
    详解:npm升级到pnpm对比优化点!!
    养老院管理系统(Java+Web+MySQL)
    【JVM】运行时数据区、程序计数器
    [手写spring](2)初始化BeanDefinitionMap
  • 原文地址:https://blog.csdn.net/qq_55906442/article/details/126241967