• Pyspark的使用语法


    背景

    Pyspark可以主要被用来处理数据。

    相关操作

    配置

    config("spark.executor.instances", "10") #使用多少个执行器
    config("spark.sql.shuffle.partitions","1000") #讲数据shuffle多少份
    
    • 1
    • 2

    基本案例

    建立一个RDD

    from gettext import find
    import findspark
    findspark.init("/home/ztw/spark")
    from pyspark import SparkConf, SparkContext
    #setMaster设置了Spark Master的方式为local[*]
    conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    
    rdd = sc.parallelize(["hello world", "hello spark"])
    rdd2 = rdd.flatMap(lambda line:line.split(" "))
    rdd3 = rdd2.map(lambda word:(word, 1))
    #collect可以将RDD类型的数据转化为数组
    rdd5 = rdd3.reduceByKey(lambda a,b: a+b)
    
    print(rdd5.collect())
    sc.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    去重

    from gettext import find
    import findspark
    findspark.init("/home/ztw/spark")
    from pyspark import SparkConf, SparkContext
    conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    
    rdd = sc.parallelize(["a","a","c"])
    print("rdd.distinct().collect()",rdd.distinct().collect())
    sc.stop()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    对每个字符串加一个值并返回

    from gettext import find
    import findspark
    findspark.init("/home/ztw/spark")
    from pyspark import SparkConf, SparkContext
    conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    
    ress = []
    def fun(x):
        return [x+"bb"]
    
    rdd = sc.parallelize(["a","a","c"])
    res = rdd.flatMap(fun)
    
    print("res.collect()",res.collect())
    
    sc.stop()
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    agg
    df.groupBy('Job','Country').agg(fn.max('salary'), fn.max('seniority')).show()
    
    • 1

    https://blog.csdn.net/qq_42363032/article/details/115614675

    alias

    alias:返回一个设置别名的新DataFrame

    subset

    使用subse得到对应的列名称。

    withColumns

    添加或更新列,第一个参数表示的是列名

    apply

    https://cloud.tencent.com/developer/article/1847068

    按照定义的函数进行相关运算

    将rdd转化为dataframe

    from gettext import find
    import findspark
    findspark.init("/home/ztw/spark")
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.session import SparkSession
    from pyspark.sql.types import *
    
    conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
    
    data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed
    rdd_ =  spark.sparkContext.parallelize(data)
    # schema
    schema = StructType([
            # true代表不为空
            StructField("name", StringType(), True),
            StructField("gender", StringType(), True),
            StructField("num", StringType(), True)
        ])
    df = spark.createDataFrame(rdd_, schema=schema)  # working when the struct of data is same.
    print(df.show())
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    转化为dataframe后实现sql查询
    from gettext import find
    import findspark
    findspark.init("/home/ztw/spark")
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.session import SparkSession
    from pyspark.sql.types import *
    
    
    conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
    
    data = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] # mixed
    rdd_ =  spark.sparkContext.parallelize(data)
    # schema
    schema = StructType([
            # true代表不为空
            StructField("name", StringType(), True),
            StructField("gender", StringType(), True),
            StructField("num", StringType(), True)
        ])
    df = spark.createDataFrame(rdd_, schema=schema)  # working when the struct of data is same.
    
    df.createOrReplaceTempView("df")
    spark.sql("select * from df where name='Alex' ").show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    转化为dataframe后做差集
    from gettext import find
    import findspark
    findspark.init("/home/ztw/spark")
    from pyspark import SparkConf, SparkContext
    from pyspark.sql.session import SparkSession
    from pyspark.sql.types import *
    
    conf = SparkConf().setAppName("WordCount").setMaster("local[*]")
    sc = SparkContext(conf=conf)
    spark = SparkSession(sc)
    
    dataa = [('Alex','male',3),('Nancy','female',6),['Jack','male',9]] 
    rdda =  spark.sparkContext.parallelize(dataa)
    # schema
    schema = StructType([
            # true代表不为空
            StructField("name", StringType(), True),
            StructField("gender", StringType(), True),
            StructField("num", StringType(), True)
        ])
    dfa = spark.createDataFrame(rdda, schema=schema)  # working when the struct of data is same.
    
    datab = [('Alex','male',4),('Nancy','female',1),['Jack','male',9]] 
    rddb =  spark.sparkContext.parallelize(datab)
    dfb = spark.createDataFrame(rddb, schema=schema)  # working when the struct of data is same.
    
    newDF = dfa.select("num").subtract(dfb.select("num"))
    #newDF = dfa.select("name","num","gender").subtract(dfb.select("name","num","gender"))
    newDF.show()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    join left等

    https://blog.csdn.net/u012762410/article/details/118944914

    inner求交集

    对应的列填充固定值
    df.fillna(0, subset=['a', 'b'])
    
    • 1

    limit

    直接从spark获取前n行

    limit(n)
    
    • 1
    给数据添加字段
    datga.withColumn('a_name', func.lit('字段值'))
    
    • 1
    assign

    可以在同一个 assign 中分配多个列,但不能引用新创建或修改的列。 Python 3.6 及更高版本的 pandas 支持此函数,但 pandas-on-Spark 不支持此函数。在pandas-on-Spark 中,首先计算所有项目,然后分配。

    http://www.manongjc.com/detail/31-atwzhtjqonqyiey.html

    问题

    为什么要将rdd的数据转化为dataframe

    因为这样的话,我们就可以直接针对HDFS等任何可以构建为RDD的数据,使用Spark SQL进行SQL查询了。这个功能是无比强大的。
    想象一下,针对HDFS中的数据,直接就可以使用SQL进行查询。

    参考资料

    https://leetcode.cn/problems/biao-shi-shu-zhi-de-zi-fu-chuan-lcof/solution/mian-shi-ti-20-biao-shi-shu-zhi-de-zi-fu-chuan-y-2/
    http://www.xueai8.com/course/286/article
    https://zhuanlan.zhihu.com/p/446201510 (rdd与dataframe的转换)
    https://www.cnblogs.com/qi-yuan-008/p/12504882.html (rdd与dataframe的转换)
    https://www.jianshu.com/p/2f9a407ba950 (做差集并集去重等)

  • 相关阅读:
    如何判断自己的qt版本呢?
    内核中的信号量
    黑马瑞吉外卖之菜品的启售停售
    猿创征文| Unity之C#高级开发①
    净亏损2.9亿元,财务业绩陷入困境后,逸仙电商盈利仍遥遥无期
    torch.manual_seed()解析
    leetcode 42. 接雨水(困难、单调栈的应用)
    bootstrap下拉菜单学习(五)
    独立接软件外包遇到的问题总结,希望能帮到你
    GZ038 物联网应用开发赛题第7套
  • 原文地址:https://blog.csdn.net/doswynkfsw/article/details/125266608