• 大数据编程实验三:SparkSQL编程


    大数据编程实验三:SparkSQL编程

    一、前言

    二、实验目的与要求

    1. 通过实验掌握Spark SQL的基本编程方法
    2. 熟悉RDD到DataFrame的转化方法
    3. 熟悉利用Spark SQL管理来自不同数据源的数据

    三、实验内容

    1. Spark SQL基本操作

      将下列JSON格式数据复制到Linux系统中,并保存命名为employee.json。

      { "id":1 , "name":" Ella" , "age":36 }
      { "id":2, "name":"Bob","age":29 }
      { "id":3 , "name":"Jack","age":29 }
      { "id":4 , "name":"Jim","age":28 }
      { "id":4 , "name":"Jim","age":28 }
      { "id":5 , "name":"Damon" }
      { "id":5 , "name":"Damon" }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7

      为employee.json创建DataFrame,并写出Python语句完成下列操作:

      (1) 查询所有数据;

      (2) 查询所有数据,并去除重复的数据;

      (3) 查询所有数据,打印时去除id字段;

      (4) 筛选出age>30的记录;

      (5) 将数据按age分组;

      (6) 将数据按name升序排列

      (7) 取出前3行数据;

      (8) 查询所有记录的name列,并为其取别名为username;

      (9) 查询年龄age的平均值;

      (10) 查询年龄age的最小值。

    2. 编程实现将RDD转换为DataFrame

      源文件内容如下(包含id,name,age):

      1,Ella,36
      2,Bob,29
      3,Jack,29
      
      • 1
      • 2
      • 3

      请将数据复制保存到Linux系统中,命名为employee.txt,实现从RDD转换得到DataFrame,并按“id:1,name:Ella,age:36”的格式打印出DataFrame的所有数据。请写出程序代码。

    3. 编程实现利用DataFrame读写MySQL的数据

      (1)在MySQL数据库中新建数据库sparktest,再创建表employee,包含如下表所示的两行数据。

    在这里插入图片描述

    (2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入如表5-3所示的两行数据到MySQL中,最后打印出age的最大值和age的总和。

    在这里插入图片描述

    四、实验步骤

    1、Spark SQL基本操作

    我们在之前创建的sparkdata目录下创建该json文件并将上面信息复制进去并保存命名为employee.json:

    cd /usr/local/spark/sparkdata
    vim employee.json
    
    • 1
    • 2

    在这里插入图片描述

    然后我们进入到pyspark中,开始做题。

    首先我们创建一个DataFrame:

    >>> sp=SparkSession.builder.getOrCreate()
    >>> df=sp.read.json("file:///usr/local/spark/sparkdata/employee.json")
    
    • 1
    • 2

    (1)查询DataFrame的所有数据

    >>> df.show()
    
    • 1

    在这里插入图片描述

    (2)查询所有数据,并去除重复的数据

    >>> df.distinct().show()
    
    • 1

    在这里插入图片描述

    (3)查询所有数据,打印时去除id字段

    >>> df.drop("id").show()
    
    • 1

    在这里插入图片描述

    (4)筛选age>30的记录

     df.filter(df.age>30).show()
    
    • 1

    在这里插入图片描述

    (5) 将数据按age分组

    >>> df.groupBy("age").count().show()
    
    • 1

    在这里插入图片描述

    (6) 将数据按name升序排列

    >>> df.sort(df.name.asc()).show()
    
    • 1

    在这里插入图片描述

    (7) 取出前3行数据

    >>> df.take(3)
    
    • 1

    在这里插入图片描述

    (8) 查询所有记录的name列,并为其取别名为username

    >>> df.select(df.name.alias("username")).show()
    
    • 1

    在这里插入图片描述

    (9) 查询年龄age的平均值

    >>> df.agg({"age":"mean"}).show()
    
    • 1

    在这里插入图片描述

    (10) 查询年龄age的最小值

    >>> df.agg({"age":"min"}).show()
    
    • 1

    在这里插入图片描述

    2、编程实现将RDD转换为DataFrame

    首先我们仍然在sparkdata目录下创建我们需要的文件并命令为employee.txt,然后写入信息:

    vim employee.txt
    
    • 1

    在这里插入图片描述

    然后我们还是在该目录下新建一个py文件命名为rddTodf.py,然后写入如下py程序:

    from pyspark.conf import SparkConf
    from pyspark.sql.session import SparkSession
    from pyspark import SparkContext
    from pyspark.sql.types import Row
    from pyspark.sql import SQLContext
    if __name__ == "__main__":
            sc = SparkContext("local","Simple App")
            spark=SparkSession(sc)
            peopleRDD = spark.sparkContext.textFile("file:///usr/local/spark/sparkdata/employee.txt")
            rowRDD = peopleRDD.map(lambda line : line.split(",")).map(lambda attributes : Row(int(attributes[0]),attributes[1],int(attributes[2]))).toDF()
            rowRDD.createOrReplaceTempView("employee")
            personsDF = spark.sql("select * from employee")
            personsDF.rdd.map(lambda t : "id:"+str(t[0])+","+"Name:"+t[1]+","+"age:"+str(t[2])).foreach(print)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    在这里插入图片描述

    然后我们运行该程序:

    python3 rddTodf.py
    
    • 1

    在这里插入图片描述

    出现这个结果证明成功。

    3、编程实现利用DataFrame读写MySQL的数据

    我们首先启动mysql服务并进入到mysql数据库中:

    systemctl start mysqld.service
    mysql -u root -p
    
    • 1
    • 2

    然后开始接下来的操作。

    (1)在MySQL数据库中新建数据库sparktest,再创建表employee,并写入题目中的原始数据

    mysql> create database sparktest;
    mysql> use sparktest;
    mysql> create table employee (id int(4),name char(20),gender char(4),age int(4));
    mysql> insert into employee values(1,'Alice','F',22);
    mysql> insert into employee values(2,'John','M',25);
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    (2)配置Spark通过JDBC连接数据库MySQL,编程实现利用DataFrame插入下列数据到MySQL,最后打印出age的最大值和age的总和

    我们仍然在sparkdata目录下面新建一个py程序并命名为mysqlTest.py

    cd /usr/local/spark/sparkdata
    vim mysqlTest.py
    
    • 1
    • 2

    然后写入如下py程序:

    from pyspark import SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import Row
    from pyspark.sql.types import StructType
    from pyspark.sql.types import StructField
    from pyspark.sql.types import StringType
    from pyspark.sql.types import IntegerType
    if __name__ == "__main__":
    
        sc = SparkContext( 'local', 'test')
        spark=SQLContext(sc)
        jdbcDF=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/sparktest").option("driver","com.mysql.jdbc.Driver").option("dbtable","employee").option("user", "root").option("password", "MYsql123!").load()
        jdbcDF.filter(jdbcDF.age>20).collect()      # 检测是否连接成功
        studentRDD = sc.parallelize(["3 Mary F 26","4 Tom M 23"]).map(lambda line : line.split(" "))
        schema = StructType([StructField("id",IntegerType(),True),StructField("name", StringType(), True),StructField("gender", StringType(), True),StructField("age",IntegerType(), True)])
        rowRDD = studentRDD.map(lambda p : Row(int(p[0]),p[1].strip(), p[2].strip(),int(p[3])))
        employeeDF = spark.createDataFrame(rowRDD, schema)
        prop = {}
        prop['user'] = 'root'
        prop['password'] = 'MYsql123!'
        prop['driver'] = "com.mysql.jdbc.Driver"
        employeeDF.write.jdbc("jdbc:mysql://localhost:3306/sparktest",'employee','append', prop)
        jdbcDF.collect()
        jdbcDF.agg({"age": "max"}).show()
        jdbcDF.agg({"age": "sum"}).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    在这里插入图片描述

    然后直接运行该py程序即可得到结果:

    python3 mysqlTest.py
    
    • 1

    在这里插入图片描述

    在这里插入图片描述

    五、最后我想说

    本次实验的话,难度主要在后面两个题目中,在第二题中我遇见了两个错误:

    1. PipelinedRDD’ object has no attribute ‘toDF’
    2. ‘SparkSession’ object has no attribute ‘textFile’

    第一个错误我是通过如下解决的:

    spark = SparkSession(sc)
    
    • 1

    解决第一个错误之后,我再次运行的时候就开始报第二个错误了,第二个错误我是这样解决的:

    from pyspark.sql import SQLContext
    spark.sparkContext.textFile('filepath')
    
    • 1
    • 2

    具体可以看我们上面对于的代码就可以明白了。

    另外,很明显可以看见第三题第二问后面抛出了异常:

    ** BEGIN NESTED EXCEPTION ** 
    
    javax.net.ssl.SSLException
    MESSAGE: closing inbound before receiving peer's close_notify
    
    STACKTRACE:
    
    javax.net.ssl.SSLException: closing inbound before receiving peer's close_notify
    	at sun.security.ssl.Alert.createSSLException(Alert.java:133)
    	at sun.security.ssl.Alert.createSSLException(Alert.java:117)
    	at sun.security.ssl.TransportContext.fatal(TransportContext.java:340)
    	at sun.security.ssl.TransportContext.fatal(TransportContext.java:296)
    	at sun.security.ssl.TransportContext.fatal(TransportContext.java:287)
    	at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:737)
    	at sun.security.ssl.SSLSocketImpl.shutdownInput(SSLSocketImpl.java:716)
    	at com.mysql.jdbc.MysqlIO.quit(MysqlIO.java:2239)
    	at com.mysql.jdbc.ConnectionImpl.realClose(ConnectionImpl.java:4267)
    	at com.mysql.jdbc.ConnectionImpl.close(ConnectionImpl.java:1531)
    	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.org$apache$spark$sql$execution$datasources$jdbc$JDBCRDD$$close$1(JDBCRDD.scala:259)
    	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$$anonfun$compute$1.apply$mcV$sp(JDBCRDD.scala:308)
    	at org.apache.spark.util.CompletionIterator$$anon$1.completion(CompletionIterator.scala:44)
    	at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:33)
    	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source)
    	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    	at org.apache.spark.scheduler.Task.run(Task.scala:121)
    	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    这是因为与MySQL数据库的SSL连接失败了,我们只需要将数据源的URL后面添加**?useSSL=false**就可以解决,也就是禁用SSL:

    在这里插入图片描述

    在这里插入图片描述

    但是它还是抛出了异常,只是异常没有之前那么多了,我上网查阅了一下相关错误,好像这样添加不能完全禁用SSL,具体原因我也不知道,可能跟底层C语言有关,这个我不了解,所以就先这样了。

    本次实验到这里就结束了,谢谢你们的阅读!

  • 相关阅读:
    【自动化测试】基于Selenium + Python的web自动化框架
    MySQL再总结
    JDBC学习笔记
    你看过字符画吗?用 Python 自己实现一个吧
    List去重的五种方法
    Java实现Csv文件导入导出
    Unity 设置Inspect上问号的跳转链接
    Dubbo捕获自定义异常问题
    【SwiftUI模块】0003、SwiftUI搭建瀑布流-交错网格
    【LeetCode动态规划#14】子序列系列题(最长递增子序列、最长连续递增序列、最长重复子数组、最长公共子序列)
  • 原文地址:https://blog.csdn.net/qq_52417436/article/details/127818783