• DataFrame API入门操作及代码展示


    DataFrame风格编程

    • DataFrame支持两种风格进行编程
      • DSL风格
      • SQL风格
    • DSL称之为领域特定语言,其实就是指DataFrame特有的API,DSL风格意思就是以调用API的方式来处理Data。
    • SQL风格就是使用SQL语句处理DataFrame的数据。

    DSL风格编程代码示例

    相关API

    • show()方法

      • 功能:展示DataFrame中的数据。
      • 语法:df.show(参数1, 参数2)
        • 参数1: 默认是20, 控制展示多少条。
        • 参数2: 是否阶段列, 默认只输出20个字符的长度, 过长不显示, 显示请用truncate = True。
    • printSchema()方法

      • 功能:打印输出df的schema信息。
      • 语法:df.printSchema()
    • select()方法

      • 功能:选择DataFrame中的指定列(通过传入参数进行指定)。
      • 语法:df.select()
        • 参数传递1:可变参数的cols对象,cols对象可以是Column对象来指定列或者字符串列名来指定列。
        • 参数传递2:List[Column]对象或者List[str]对象, 用来选择多个列。
    • filter()与where()方法

      • 功能:过滤DataFrame内的数据,返回一个过滤后的DataFrame,两者方法是等价的。
      • 语法:df.filter()、df.where()
    • groupBy()方法

      • 功能:按照指定的列进行数据的分组, 返回值是GroupedData对象。
      • 语法:df.groupBy()
    • agg()方法

      • 功能:GroupData的一种方法,作用就是在里面可以写多个聚合。
      • 语法:
    df.groupBy().agg(
    	聚合方法1,
    	聚合方法2
    )
    
    • 1
    • 2
    • 3
    • 4

    相关代码示例

    # coding : utf8
    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        ss = SparkSession.builder \
            .appName("test") \
            .master("local[*]") \
            .getOrCreate()
        sc = ss.sparkContext
    
        df = ss.read.format("csv") \
            .option("sep", ",") \
            .schema("id INT, subject STRING, score INT") \
            .load("../Data/input/stu_score.txt")
    
        # DSL风格展示
        # Column对象获取
        id_col = df["id"]
        subject_col = df["subject"]
        score_col = df["score"]
    
        # list形式
        df.select(["id", "subject"]).limit(10).show()
        # 可变参数形式
        df.select("id", "score").limit(10).show()
        # Column对象形式
        df.select(id_col, subject_col).limit(10).show()
    
        # filter API
        df.filter("score < 99").show()
        df.filter(df["score"] < 99).show()
    
        # where API
        df.where("score < 99").show()
        df.where(df["score"] < 99).show()
    
        # groupby API
        df.groupBy("subject").count().show()
        df.groupBy(subject_col).count().show()
    
        # groupby返回值是GroupedData,不是DataFrame,是一个有分组的数据集合,后面只能是聚合函数
        print(type(df.groupBy(df["subject"])))
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    SQL风格编程代码示例

    相关API

    • DataFrame的一个强大之处就是我们可以将它看作是一个关系型数据表,然后可以通过在程序中使用spark.sql() 来执行SQL语句查询,结果返回一个DataFrame。如果想使用SQL风格的语法,需要将DataFrame注册成表。

    • df.createTempView():注册临时的视图表。

    • df.createOrReplaceTempView():注册或者替换临时的视图表。

    • df.createGlobalTempView():注册全局临时的视图表。

    • 全局表与临时表

      • 全局表:跨SparkSession对象使用,在一个程序的多个SparkSession中均可调用,查询时需要添加前缀:global_temp。
      • 临时表:只在当前的SparkSession中可用。
    • 使用SQL查询我们需要调用SparkSession.sql(“SQL语句”)执行查询,返回值是一个新的DataFrame。

    相关代码

    # coding : utf8
    from pyspark.sql import SparkSession
    
    if __name__ == '__main__':
        ss = SparkSession.builder \
            .appName("test") \
            .master("local[*]") \
            .getOrCreate()
        sc = ss.sparkContext
    
        df = ss.read.format("csv") \
            .option("sep", ",") \
            .schema("id INT, subject STRING, score INT") \
            .load("../Data/input/stu_score.txt")
    
        # 注册成临时表
        df.createTempView("score1")
        df.createOrReplaceTempView("score2") # 创建或替换临时视图表
        df.createGlobalTempView("score3")  # 创建全局临时视图表
    
        ss.sql("SELECT subject, AVG(score) AS  avg_score FROM score1 GROUP BY subject").show()
        ss.sql("SELECT subject, COUNT(*) AS  cnt FROM score2 GROUP BY subject").show()
        ss.sql("SELECT subject, MAX(score) AS max_score , MIN(score) AS  min_score FROM global_temp.score3 GROUP BY subject").show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    Fucntions包

    • PySpark提供了一个函数包:pyspark.sql.functions,这个包里提供了一些列的计算函数供SparkSQL使用最常见的有我们所熟悉的split和explode方法。
    • 导入这个包我们可以通过以下代码来实现:
      from pyspark.sql import functions as F
    
    • 1
    • 这些功能函数的返回值多数都是column对象。

    基于SparkSQL的WordCount代码编写

    # coding : utf8
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    
    if __name__ == '__main__':
        ss = SparkSession.builder \
            .appName("test") \
            .master("local[*]") \
            .getOrCreate()
        sc = ss.sparkContext
    
        # TODO: 1 SQL风格处理
        rdd = sc.textFile("hdfs://node1:8020/Test/WordCount.txt") \
            .flatMap(lambda line: line.split(" ")) \
            .map(lambda x: [x])
    
        df1 = rdd.toDF(["words"])
        df1.createOrReplaceTempView("words")
    
        ss.sql("SELECT words, COUNT(*) AS cnt FROM words GROUP  BY words ORDER  BY cnt DESC").show()
    
    
        # TODO: 2 DSL风格处理
        df2 = ss.read.format("text") \
            .load("hdfs://node1:8020/Test/WordCount.txt")
    
        # withColumn方法
        # 方法功能: 对已存在的列进行操作, 返回一个新的列, 如果名字和老列相同, 那么替换, 否则作为新列存在
        df3 = df2.withColumn("value", F.explode(F.split(df2["value"], " ")))
        df3.groupBy("value").count() \
            .withColumnRenamed("value", "words") \
            .withColumnRenamed("count", "cnt") \
            .orderBy("cnt", ascending=False).show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 结果展示:
      在这里插入图片描述
  • 相关阅读:
    Jetpack Compose 中的状态管理
    【ASM】字节码操作 转换已有的类 记录方法运行时间
    goroutine学习
    【Java】List、Set、数据结构、Collections
    SQL获取正数第N个或倒数第N个数据
    Understanding the Users and Videos by Mining a Novel Danmu Dataset
    半导体二极管
    【ROS进阶篇】第一讲 常用API介绍
    Visual Studio 调试时加载符号慢
    【二叉树】链式结构的一些操作实现
  • 原文地址:https://blog.csdn.net/sinat_31854967/article/details/128132819