• Spark上使用pandas API快速入门


    文章最前: 我是Octopus,这个名字来源于我的中文名--章鱼;我热爱编程、热爱算法、热爱开源。所有源码在我的个人github ;这博客是记录我学习的点点滴滴,如果您对 Python、Java、AI、算法有兴趣,可以关注我的动态,一起学习,共同进步。

      相关文章:

    1. PySpark 概述
    2. Spark连接快速入门
    3. Spark上使用pandas API快速入门

    这是 Spark 上的 pandas API 的简短介绍,主要面向新用户。本笔记本向您展示 pandas 和 Spark 上的 pandas API 之间的一些关键区别。您可以在快速入门页面的“Live Notebook:Spark 上的 pandas API”中自行运行此示例。

    习惯上,我们在Spark上导入pandas API如下:

    1. import pandas as pd
    2. import numpy as np
    3. import pyspark.pandas as ps
    4. from pyspark.sql import SparkSession

    对象创建

    通过传递值列表来创建 pandas-on-Spark 系列,让 Spark 上的 pandas API 创建默认整数索引:

    1. s = ps.Series([1, 3, 5, np.nan, 6, 8])
    2. s
    0    1.0
    1    3.0
    2    5.0
    3    NaN
    4    6.0
    5    8.0
    dtype: float64

    通过传递可转换为类似系列的对象字典来创建 pandas-on-Spark DataFrame。

    1. psdf = ps.DataFrame(
    2. {'a': [1, 2, 3, 4, 5, 6],
    3. 'b': [100, 200, 300, 400, 500, 600],
    4. 'c': ["one", "two", "three", "four", "five", "six"]},
    5. index=[10, 20, 30, 40, 50, 60])
    6. psdf
    abc
    101100one
    202200two
    303300three
    404400four
    505500five
    606600six

    创建pandas DataFrame通过numpyt array, 用datetime 作为索引,label列

    1. dates = pd.date_range('20130101', periods=6)
    2. dates

      DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04', '2013-01-05', '2013-01-06'], dtype='datetime64[ns]', freq='D')

    1. pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))
    2. pdf
    ABCD
    2013-01-010.912558-0.795645-0.2891150.187606
    2013-01-02-0.059703-1.2338970.316625-1.226828
    2013-01-030.332871-1.262010-0.434844-0.579920
    2013-01-040.924016-1.022019-0.405249-1.036021
    2013-01-05-0.772209-1.2280990.0689010.896679
    2013-01-061.485582-0.709306-0.202637-0.248766

    现在,dataframe能够转换成pandas 在spark上运行

    1. psdf = ps.from_pandas(pdf)
    2. type(psdf)

     pyspark.pandas.frame.DataFrame

    看上去和dataframe一样的使用

    psdf
    ABCD
    2013-01-010.912558-0.795645-0.2891150.187606
    2013-01-02-0.059703-1.2338970.316625-1.226828
    2013-01-030.332871-1.262010-0.434844-0.579920
    2013-01-040.924016-1.022019-0.405249-1.036021
    2013-01-05-0.772209-1.2280990.0689010.896679
    2013-01-061.485582-0.709306-0.202637-0.248766

    当然通过spark pandas dataframe创建pandas  on spark dataframe 非常容易

    1. spark = SparkSession.builder.getOrCreate()
    2. sdf = spark.createDataFrame(pdf)
    3. sdf.show()

     +--------------------+-------------------+--------------------+--------------------+ | A| B| C| D| +--------------------+-------------------+--------------------+--------------------+ | 0.91255803205208|-0.7956452608556638|-0.28911463069772175| 0.18760566615081622| |-0.05970271470242...| -1.233896949308984| 0.3166246451758431| -1.2268284000402265| | 0.33287106947536615|-1.2620100816441786| -0.4348444277082644| -0.5799199651437185| | 0.9240158461589916|-1.0220190956326003| -0.4052488880650239| -1.0360212104348547| | -0.7722090016558953|-1.2280986385313222| 0.0689011451939635| 0.8966790729426755| | 1.4855822995785612|-0.7093056426018517| -0.2026366848847041|-0.24876619876451092| +--------------------+-------------------+--------------------+--------------------+

    从 Spark DataFrame 创建 pandas-on-Spark DataFrame。

    1. psdf = sdf.pandas_api()
    2. psdf
    ABCD
    00.912558-0.795645-0.2891150.187606
    1-0.059703-1.2338970.316625-1.226828
    20.332871-1.262010-0.434844-0.579920
    30.924016-1.022019-0.405249-1.036021
    4-0.772209-1.2280990.0689010.896679
    51.485582-0.709306-0.202637-0.248766

    具有特定的dtypes。目前支持 Spark 和 pandas 通用的类型。

    psdf.dtypes
    
    A    float64
    B    float64
    C    float64
    D    float64
    dtype: object
    

    以下是如何显示下面框架中的顶行。

    请注意,Spark 数据帧中的数据默认不保留自然顺序。可以通过设置compute.ordered_head选项来保留自然顺序,但它会导致内部排序的性能开销。

    psdf.head()
    
    ABCD
    00.912558-0.795645-0.2891150.187606
    1-0.059703-1.2338970.316625-1.226828
    20.332871-1.262010-0.434844-0.579920
    30.924016-1.022019-0.405249-1.036021
    4-0.772209-1.2280990.0689010.896679

     展示index和columns 通过numpy 数据

    psdf.index
    
    Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')
    
    psdf.columns
    
    Index(['A', 'B', 'C', 'D'], dtype='object')
    
    psdf.to_numpy()
    
    array([[ 0.91255803, -0.79564526, -0.28911463,  0.18760567],
           [-0.05970271, -1.23389695,  0.31662465, -1.2268284 ],
           [ 0.33287107, -1.26201008, -0.43484443, -0.57991997],
           [ 0.92401585, -1.0220191 , -0.40524889, -1.03602121],
           [-0.772209  , -1.22809864,  0.06890115,  0.89667907],
           [ 1.4855823 , -0.70930564, -0.20263668, -0.2487662 ]])

    通过简单统计展示你的数据:

    psdf.describe()
    
    ABCD
    count6.0000006.0000006.0000006.000000
    mean0.470519-1.041829-0.157720-0.334542
    std0.8094280.2415110.2945200.793014
    min-0.772209-1.262010-0.434844-1.226828
    25%-0.059703-1.233897-0.405249-1.036021
    50%0.332871-1.228099-0.289115-0.579920
    75%0.924016-0.7956450.0689010.187606
    max1.485582-0.7093060.3166250.896679

    转置你的数据:

    psdf.T
    012345
    A0.912558-0.0597030.3328710.924016-0.7722091.485582
    B-0.795645-1.233897-1.262010-1.022019-1.228099-0.709306
    C-0.2891150.316625-0.434844-0.4052490.068901-0.202637
    D0.187606-1.226828-0.579920-1.0360210.896679-0.248766

    通过index进行排序:

    psdf.sort_index(ascending=False)
    
    ABCD
    51.485582-0.709306-0.202637-0.248766
    4-0.772209-1.2280990.0689010.896679
    30.924016-1.022019-0.405249-1.036021
    20.332871-1.262010-0.434844-0.579920
    1-0.059703-1.2338970.316625-1.226828
    00.912558-0.795645-0.2891150.187606

    按照值排序:

    psdf.sort_values(by='B')
    
    ABCD
    20.332871-1.262010-0.434844-0.579920
    1-0.059703-1.2338970.316625-1.226828
    4-0.772209-1.2280990.0689010.896679
    30.924016-1.022019-0.405249-1.036021
    00.912558-0.795645-0.2891150.187606
    51.485582-0.709306-0.202637-0.248766

     缺失数据

    Spark 上的 Pandas API 主要使用该值np.nan来表示缺失的数据。默认情况下,它不包含在计算中。

    1. pdf1 = pdf.reindex(index=dates[0:4], columns=list(pdf.columns) + ['E'])
    2. pdf1.loc[dates[0]:dates[1], 'E'] = 1
    3. psdf1 = ps.from_pandas(pdf1)
    4. psdf1

    A
    CD
    2013-01-010.912558-0.795645-0.2891150.1876061.0
    2013-01-02-0.059703-1.2338970.316625-1.2268281.0
    2013-01-030.332871-1.262010-0.434844-0.579920
    2013-01-040.924016-1.022019
     

    删除任何缺少数据的行。

    psdf1.dropna(how='any')
    
    ABCDE
    2013-01-010.912558-0.795645-0.2891150.1876061.0
    2013-01-02-0.059703-1.2338970.316625-1.226828
    psdf1.fillna(value=5)
    ABCDE
    2013-01-010.912558-0.795645-0.2891150.1876061.0
    2013-01-02-0.059703-1.2338970.316625-1.2268281.0
    2013-01-030.332871-1.262010-0.434844-0.5799205.0
    2013-01-040.924016-1.022019-0.405249-1.0360215

    操作 

    统计数据

    执行描述性统计:

    psdf.mean()
    

    A    0.470519
    B   -1.041829
    C   -0.157720
    D   -0.334542
    dtype: float64


    Spark配置

    PySpark 中的各种配置可以在 Spark 上的 pandas API 内部应用。例如,您可以启用 Arrow 优化来极大地加快内部 pandas 转换。另请参阅 PySpark 文档中的 Pandas 使用 Apache Arrow 的 PySpark 使用指南。

    1. prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled") # Keep its default value.
    2. ps.set_option("compute.default_index_type", "distributed") # Use default index prevent overhead.
    3. import warnings
    4. warnings.filterwarnings("ignore") # Ignore warnings coming from Arrow optimizations.
    1. spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
    2. %timeit ps.range(300000).to_pandas()
    每个循环 900 ms ± 186 ms(7 次运行的平均值 ± 标准差,每次 1 个循环)
    
    1. spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
    2. %timeit ps.range(300000).to_pandas()
    每个循环 3.08 s ± 227 ms(7 次运行的平均值 ± 标准差,每次 1 个循环)
    
    1. ps.reset_option("compute.default_index_type")
    2. spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev) # Set its default value back.

    分组

    我们所说的“分组依据”是指涉及以下一个或多个步骤的过程:

    • 根据某些标准将数据分组

    • 独立地将函数应用于每个组

    • 将结果组合成数据结构

    1. psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
    2. 'foo', 'bar', 'foo', 'foo'],
    3. 'B': ['one', 'one', 'two', 'three',
    4. 'two', 'two', 'one', 'three'],
    5. 'C': np.random.randn(8),
    6. 'D': np.random.randn(8)})
    7. psdf
    ABCD
    0fooone1.039632-0.571950
    1barone0.9720891.085353
    2footwo-1.931621-2.579164
    3barthree-0.654371-0.340704
    4footwo-0.1570800.893736
    5bartwo0.8827950.024978
    6fooone-0.1493840.201667
    7foothree-1.3551360.693883
    分组后求和:
    psdf.groupby('A').sum()
    
    CD
    A
    bar1.2005130.769627
    foo-2.553589-1.361828

    按多列分组形成分层索引,我们可以再次应用 sum 函数。

    psdf.groupby(['A', 'B']).sum()
    
    CD
    AB
    fooone0.890248-0.370283
    two-2.088701-1.685428
    barthree-0.654371-0.340704
    foothree-1.3551360.693883
    bartwo0.8827950.024978
    one0.9720891.085353

     绘图

    1. pser = pd.Series(np.random.randn(1000),
    2. index=pd.date_range('1/1/2000', periods=1000))
    3. psser = ps.Series(pser)
    4. psser = psser.cummax()
    5. psser.plot()

    在 DataFrame 上,plot()方法可以方便地绘制带有标签的所有列:

    1. pdf = pd.DataFrame(np.random.randn(1000, 4), index=pser.index,
    2. columns=['A', 'B', 'C', 'D'])
    3. psdf = ps.from_pandas(pdf)
    4. psdf = psdf.cummax()
    5. psdf.plot()

     有关更多详细信息,请参阅绘图文档。

    获取数据输入/输出

    CSV

    CSV 简单且易于使用。请参阅此处写入 CSV 文件,并参阅此处读取 CSV 文件。

    1. psdf.to_csv('foo.csv')
    2. ps.read_csv('foo.csv').head(10)
    ABCD
    0-1.187097-0.1346450.377094-0.627217
    10.3317410.1662180.377094-0.627217
    20.3317410.4394500.3770940.365970
    30.6216200.4394501.1901800.365970
    40.6216200.4394501.1901800.365970
    52.1691981.0691831.3956420.365970
    62.7557381.0691831.3956421.045868
    72.7557381.0691831.3956421.045868
    82.7557381.0691831.3956421.045868
    92.7557381.5087321.395642
     

    Parquet

    parquet是高效和压缩的数据格式,支持快速的读写;下图是它读写的例子。

    1. psdf.to_parquet('bar.parquet')
    2. ps.read_parquet('bar.parquet').head(10)
    ABCD
    0-1.187097-0.1346450.377094-0.627217
    10.3317410.1662180.377094-0.627217
    20.3317410.4394500.3770940.365970
    30.6216200.4394501.1901800.365970
    40.6216200.4394501.1901800.365970
    52.1691981.0691831.3956420.365970
    62.7557381.0691831.3956421.045868
    72.7557381.0691831.3956421.045868
    82.7557381.0691831.3956421.045868
    92.7557381.5087321.3956421.556933

    Spark IO 

    另外,pandas API能很好支持Spark多种数据源结构,例如ORC和其他的数据源,这里看他写入的数据源和读取数据源。

    1. psdf.to_spark_io('zoo.orc', format="orc")
    2. ps.read_spark_io('zoo.orc', format="orc").head(10)
    ABCD
    0-1.187097-0.1346450.377094-0.627217
    10.3317410.1662180.377094-0.627217
    20.3317410.4394500.3770940.365970
    30.6216200.4394501.1901800.365970
    40.6216200.4394501.1901800.365970
    52.1691981.0691831.3956420.365970
    62.7557381.0691831.3956421.045868
    72.7557381.0691831.3956421.045868
    82.7557381.0691831.3956421.045868
    92.7557381.5087321.3956421.556933

    这里看输入和输出文档获取更多的细节。

    Input/Output — PySpark 3.5.0 documentation

  • 相关阅读:
    【Java进阶】字符串和常见集合
    aj-report 报表设计器如何添加组件
    【云原生之Docker实战】使用Docker部署Homepage应用程序仪表盘
    JUC中的AQS底层详细超详解
    WPF元素绑定
    Win10卸载KB5014699补丁教程
    Flink之Watermark策略代码模板
    [附源码]Python计算机毕业设计Django电影推荐网站
    ARTS 打卡第一周
    分布式事务-TCC异常-空回滚
  • 原文地址:https://blog.csdn.net/zy345293721/article/details/133815364