• 如何基于RDD方式完成DataFrame的代码构建?


    DataFrame对象可以从RDD转换而来,都是分布式数据集 其实就是转换一下内部存储的结构,转换为二维表结构。

    将RDD转换为DataFrame方式1:

    调用spark

    # 首先构建一个RDD rdd[(name, age), ()]
    rdd = sc.textFile("../data/sql/people.txt").\
      map(lambda x: x.split(',')).\
      map(lambda x: [x[0], int(x[1])])               # 需要做类型转换, 因为类型从RDD中探测
    # 构建DF方式1
    df = spark.createDataFrame(rdd, schema = ['name', 'age'])
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    通过SparkSession对象的createDataFrame方法来将RDD转换为DataFrame,这里只传入列名称,类型从RDD中进行推断,是否允许为空默认为允许(True)。

    # coding:utf8
    # 演示DataFrame创建的三种方式
    from pyspark.sql import SparkSession
    if __name__ == '__main__':
        spark = SparkSession.builder.\
           appName("create df").\
    master("local[*]").\
    getOrCreate()
    
    sc = spark.sparkContext
    # 首先构建一个RDD rdd[(name, age), ()]
    rdd = sc.textFile("../data/sql/people.txt").\
    map(lambda x: x.split(',')).\
    map(lambda x: [x[0], int(x[1])]) # 需要做类型转换, 因为类型从RDD中探测
    # 构建DF方式1
    df = spark.createDataFrame(rdd, schema = ['name', 'age'])
    # 打印表结构
    df.printSchema()
    # 打印20行数据
    df.show()
    df.createTempView("ttt")
    spark.sql("select * from ttt where age< 30").show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    将RDD转换为DataFrame方式2:
    通过StructType对象来定义DataFrame的“表结构”转换RDD

    # 创建DF , 首先创建RDD 将RDD转DF
    rdd = sc.textFile("../data/sql/stu_score.txt").\
      map(lambda x:x.split(',')).\
      map(lambda x:(int(x[0]), x[1], int(x[2])))
    
    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
      add("id", IntegerType(), nullable=False).\
      add("name", StringType(), nullable=True).\
      add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add, 每一个add代表一个StructField
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    df = spark.createDataFrame(rdd, schema)
    # coding:utf8
    # 需求: 基于StructType的方式构建DataFrame 同样是RDD转DF
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    if __name__ == '__main__':
      spark = SparkSession.builder.\
        appName("create_df"). \
        config("spark.sql.shuffle.partitions", "4"). \
        getOrCreate()
      # SparkSession对象也可以获取 SparkContext
      sc = spark.sparkContext
      # 创建DF , 首先创建RDD 将RDD转DF
      rdd = sc.textFile("../data/sql/stu_score.txt").\
        map(lambda x:x.split(',')).\
        map(lambda x:(int(x[0]), x[1], int(x[2])))
      # StructType 类
      # 这个类 可以定义整个DataFrame中的Schema
      schema = StructType().\
        add("id", IntegerType(), nullable=False).\
        add("name", StringType(), nullable=True).\
        add("score", IntegerType(), nullable=False)
      # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
      # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
      df = spark.createDataFrame(rdd, schema)
      df.printSchema()
      df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    将RDD转换为DataFrame方式3:

    使用RDD的toDF方法转换RDD

    # StructType 类
    # 这个类 可以定义整个DataFrame中的Schema
    schema = StructType().\
      add("id", IntegerType(), nullable=False).\
      add("name", StringType(), nullable=True).\
      add("score", IntegerType(), nullable=False)
    # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
    # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
    
    # 方式1: 只传列名, 类型靠推断, 是否允许为空是true
    df = rdd.toDF(['id', 'subject', 'score'])
    df.printSchema()
    df.show()
    
    # 方式2: 传入完整的Schema描述对象StructType
    df = rdd.toDF(schema)
    df.printSchema()
    df.show()
    # coding:utf8
    # 需求: 使用toDF方法将RDD转换为DF
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    if __name__ == '__main__':
        spark = SparkSession.builder.\
          appName("create_df"). \
          config("spark.sql.shuffle.partitions", "4"). \
          getOrCreate()
        # SparkSession对象也可以获取 SparkContext
        sc = spark.sparkContext
        # 创建DF , 首先创建RDD 将RDD转DF
        rdd = sc.textFile("../data/sql/stu_score.txt").\
          map(lambda x:x.split(',')).\
          map(lambda x:(int(x[0]), x[1], int(x[2])))
        # StructType 类
        # 这个类 可以定义整个DataFrame中的Schema
        schema = StructType().\
           add("id", IntegerType(), nullable=False).\
           add("name", StringType(), nullable=True).\
           add("score", IntegerType(), nullable=False)
        # 一个add方法 定义一个列的信息, 如果有3个列, 就写三个add
        # add方法: 参数1: 列名称, 参数2: 列类型, 参数3: 是否允许为空
        # 方式1: 只传列名, 类型靠推断, 是否允许为空是true
        df = rdd.toDF(['id', 'subject', 'score'])
        df.printSchema()
        df.show()
        # 方式2: 传入完整的Schema描述对象StructType
        df = rdd.toDF(schema)
        df.printSchema()
        df.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
  • 相关阅读:
    9.适配器模式
    kubeadm部署v1.26.1
    141 环形链表
    六大科研工具推荐,外文文献阅读管理全都搞定!
    第十一章 : 如何使用Swagger2构建强大的API文档
    2023年厦门市高等职业院校技能竞赛软件测试竞赛规程
    音频基础学习二——声音的波形
    Count-based exploration with neural density models论文笔记
    MySQL - 慢查询优化
    能源路由器入门必读:面向能源互联网的架构和功能
  • 原文地址:https://blog.csdn.net/cz_00001/article/details/133173569