• Spark【Spark SQL(二)RDD转换DataFrame、Spark SQL读写数据库 】


    RDD 转换得到 DataFrame

    Saprk 提供了两种方法来实现从 RDD 转换得到 DataFrame:

    1. 利用反射机制推断 RDD 模式
    2. 使用编程方式定义 RDD 模式

    下面使用到的数据 people.txt :

    1. Tom, 21
    2. Mike, 25
    3. Andy, 18

    1、利用反射机制推断 RDD 模式

            在利用反射机制推断 RDD 模式的过程时,需要先定义一个 case 类,因为只有 case 类才能被 Spark 隐式地转换为DataFrame对象。

    1. object Tese{
    2. // 反射机制推断必须使用 case 类,case class 必须放到main方法之外
    3. case class Person(name: String,age: Long) //定义一个case类
    4. def main(args: Array[String]): Unit = {
    5. val spark = SparkSession.builder()
    6. .master("local[*]")
    7. .appName("rdd to df 1")
    8. .getOrCreate()
    9. import spark.implicits._ //这里的spark不是org.apache.spark这个包 而是我们创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象
    10. val rdd: RDD[Person] = spark.sparkContext
    11. .textFile("data/sql/people.txt")
    12. .map(line => line.split(","))
    13. .map(t => Person(t(0), t(1).trim.toInt))
    14. // 将RDD对象转为DataFrame对象
    15. val df: DataFrame = rdd.toDF()
    16. df.createOrReplaceTempView("people")
    17. spark.sql("SELECT * FROM people WHERE age > 20").show()
    18. spark.stop()
    19. }
    20. }

    注意事项1:

    case 类必须放到伴生对象下,main方法之外,因为在隐式转换的时候它会自动通过 伴生对象名.case类名 来调用case类,如果放到main下面就找不到了。

    注意事项2:

    import spark.implicits._
    这里的spark不是org.apache.spark这个包 而是我们上面创建的SparkSession对象 它支持把一个RDD隐式地转换为一个 DataFrame对象
    

    2、使用编程方式定义 RDD 模式

            反射机制推断时需要定义 case class,但当无法定义 case 类时,就需要采用编程式来定义 RDD 模式了。这种方法看起来比较繁琐,但是很好用,不容易报错。

            我们现在同样加载 people.txt 中的数据,生成 RDD 对象,再把RDD对象转为DataFrame对象,进行SparkSQL 查询。主要包括三个步骤:

    1. 制作表头 schema: StructType
    2. 制作表中记录 rowRDD: RDD[Row]
    3. 合并表头和记录 df:DataFramw
    1. def main(args: Array[String]): Unit = {
    2. val spark = SparkSession.builder()
    3. .master("local[*]")
    4. .appName("rdd to df 2")
    5. .getOrCreate()
    6. //1.制作表头-也就是定义表的模式
    7. val schema: StructType = StructType(Array(StructField("name", StringType, true),
    8. StructField("age", IntegerType, true)))
    9. //2.加载表中的记录-也就是读取文件生成RDD
    10. val rowRdd: RDD[Row] = spark.sparkContext
    11. .textFile("data/sql/people.txt")
    12. .map(_.split(","))
    13. .map(attr => Row(attr(0), attr(1).trim.toInt))
    14. //3.把表头和记录拼接在一起
    15. val peopleDF: DataFrame = spark.createDataFrame(rowRdd, schema)
    16. peopleDF.createOrReplaceTempView("people")
    17. spark.sql("SELECT * FROM people WHERE age > 20").show()
    18. spark.stop()
    19. }

    运行结果:

    1. +----+---+
    2. |name|age|
    3. +----+---+
    4. | Tom| 21|
    5. |Mike| 25|
    6. +----+---+

    Spark SQL读取数据库

    导入依赖

    根据自己本地的MySQL版本导入对应的驱动。

    注意:mysql8.0版本在JDBC中的url是:" com.mysql.cj.jdbc.Driver "

    1. <dependency>
    2. <groupId>mysqlgroupId>
    3. <artifactId>mysql-connector-javaartifactId>
    4. <version>8.0.31version>
    5. dependency>

    读取 MySQL 中的数据

    1. def main(args: Array[String]): Unit = {
    2. val spark = SparkSession.builder()
    3. .master("local[*]")
    4. .appName("jdbc spark sql")
    5. .getOrCreate()
    6. val mysql: DataFrame = spark.read.format("jdbc")
    7. .option("url", "jdbc:mysql://localhost:3306/spark")
    8. .option("driver", "com.mysql.cj.jdbc.Driver")
    9. .option("dbtable", "student")
    10. .option("user", "root")
    11. .option("password", "Yan1029.")
    12. .load()
    13. mysql.show()
    14. spark.stop()
    15. }

    运行结果:

    默认显示整张表

    1. +---+----+---+---+
    2. | id|name|age|sex|
    3. +---+----+---+---+
    4. | 1| Tom| 21| 男|
    5. | 2|Andy| 20| 女|
    6. +---+----+---+---+

    向 MySQL 写入数据

    1. def main(args: Array[String]): Unit = {
    2. val spark = SparkSession.builder()
    3. .master("local[*]")
    4. .appName("jdbc spark sql")
    5. .getOrCreate()
    6. //导入两条student信息
    7. val rdd: RDD[Array[String]] = spark.sparkContext
    8. .parallelize(Array("3 Mike 22 男", "4 Cindy 23 女"))
    9. .map(_.split(" "))
    10. //设置模式信息-创建表头
    11. val schema: StructType = StructType(Array(StructField("id", IntegerType, true),
    12. StructField("name", StringType, true),
    13. StructField("age", IntegerType, true),
    14. StructField("sex", StringType, true)))
    15. //创建Row对象 每个 Row对象都是表中的一行-创建记录
    16. val rowRDD = rdd.map(stu => Row(stu(0).toInt, stu(1), stu(2).toInt, stu(3)))
    17. //创建DataFrame对象 拼接表头和记录
    18. val df = spark.createDataFrame(rowRDD, schema)
    19. //创建一个 prop 变量 用来保存 JDBC 连接参数
    20. val prop = new Properties()
    21. prop.put("user","root")
    22. prop.put("password","Yan1029.")
    23. prop.put("driver","com.mysql.cj.jdbc.Driver")
    24. //写入数据 采用 append 模式追加
    25. df.write.mode("append").jdbc("jdbc:mysql://localhost:3306/spark","spark.student",prop)
    26. spark.stop()
    27. }

    运行结果:


    总结

            今天上午就学到这里,本想着今天专门看看StructType、StructField和Row这三个类的,没想到就在这节课。这一篇主要学了RDD对象向DataFrame对象的转换以及Spark SQL如何读取数据库、写入数据库。

            下午学完这一章最后的DataSet。

  • 相关阅读:
    络达开发---自定义BLE服务(二):功能实现
    一分钟带你了解易货:来龙去脉,古往今来
    flume+es+kibana日志系统
    [ JavaScript ] JSON方法
    网络运维Day01
    Matlab(数值微积分)
    Java的基础框架之SpringMvc第二讲(SSM框架整合和拦截器)
    (二)Java网络编程之爆肝HTTP、HTTPS、TLS协议及对称与非对称加密原理!
    [论文笔记] SecPod:一个基于虚拟化安全系统的框架
    【小白专用】微信小程序个人中心、我的界面(示例一)23.11.04
  • 原文地址:https://blog.csdn.net/m0_64261982/article/details/132787800