• 大数据之Spark(一)


    九、Spark(PySpark 3.2)

    9.1、基本概念

    定义:用于大规模数据处理的统一分析引擎
    在这里插入图片描述
    运行模式

    • 本地模式(单机):一个独立进程,多个线程模拟spark运行时环境
    • Standalone模式(集群):各个角色独立进程组成集群环境
    • YARN模式(集群):各个角色运行在yarn容器内部组成集群环境
    • k8s(容器集群):各个角色运行在k8s容器内部组成集群环境
    • 云服务

    架构角色

    • 资源管理层
      • Master:集群管理者
      • Worker:单机管理者
    • 任务计算层
      • 单任务管理者Driver:单个任务管理
      • 单任务执行者Excutor:单个任务计算
      • 在这里插入图片描述

    9.2、各种模式部署

    下载spark,地址:https://archive.apache.org/dist/spark/

    下载anaconda,地址https://repo.anaconda.com/archive/
    在这里插入图片描述
    安装anaconda

     sh ./Anaconda3-2021.04-Linux-x86_64.sh
    

    修改环境变量

     vim ~/.condarc
    

    清华源

    channels:
      - defaults
    show_channel_urls: true
    channel_alias: https://mirrors.tuna.tsinghua.edu.cn/anaconda
    default_channels:
      - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
      - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free
      - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
      - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/pro
      - https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
    custom_channels:
      conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
      msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
      bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
      menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
      pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
      simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
    

    先logout再登录环境才能生效

    创建虚拟环境

     conda create -n pyspark python=3.8
    

    切换虚拟环境

    conda activate pyspark
    

    解压spark

    tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz
    

    配置spark环境变量

    vim /etc/profile
    
    export JAVA_HOME=/usr/lib/jvm/java
    export HADOOP_HOME=/export/server/hadoop-3.3.0
    export SPARK_HOME=/export/soft/spark-3.2.0-bin-hadoop3.2
    export PYSPARK_PYTHON=/export/soft/anaconda3/envs/pyspark/bin/python3.8
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    
    
    vim /root/.bashrc
    
    export JAVA_HOME=/usr/lib/jvm/java
    export PYSPARK_PYTHON=/export/soft/anaconda3/envs/pyspark/bin/python3.8
    
    9.2.1、Local模式

    启动一个JVM Process进程启动多个线程执行任务Task

     cd /export/soft/spark-3.2.0-bin-hadoop3.2/bin
     ./pyspark
    

    在这里插入图片描述
    监控页面WebUI:http://hadoop100:4040

    9.2.2、Standalone模式

    spark自带的集群模式,Driver角色在运行时存在于Master进程内,Executor运行于Worker进程内

    Standalone架构

    • 主节点Master:管理整个集群资源,并托管运行各个任务的Driver
    • 从节点Worker:管理每个节点的资源,分配对应资源来运行Executor(Task)
    • 历史服务器(可选):保存日志到HDFS,启动HistoryServer可查看应用运行相关信息
    • 在这里插入图片描述
      在其他两台服务器进行9.2.1操作

    在主节点编辑/export/soft/spark-3.2.0-bin-hadoop3.2/conf目录下的work(先改名),增加Hadoop100,Hadoop101,Hadoop102节点

    修改spark-env.sh(先改名)

    JAVA_HOME=/usr/lib/jvm/java
    HADOOP_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
    YARN_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
    export SPARK_MASTER_IP=hadoop100
    export SPARK_MASTER_PORT=7077
    export SPARK_WEBUI_PORT=8080
    SPARK_WORKER_CORES=1
    SPARK_WORK_MEMORY=1G
    export SPARK_WORKER_PORT=7078
    SPARK_WORKER_WEBUI_PORT=8081
    export SPARK_HISTORY_OPTS="
    -Dspark.history.fs.logDirectory=hdfs://hadoop100:8020/sparklog/
    -Dspark.history.fs.cleaner.enable=true"
    
    

    创建spark历史文件夹

     hadoop fs -mkdir /sparklog
     hadoop fs -chmod 777 /sparklog
    

    修改spark-defaults.conf

    spark.eventLog.enable true
    spark.eventLog.dir hdfs://hadoop100:8020/sparklog/
    spark.eventLog.compress true
    

    修改log4j.properties,将日志级别修改为WARN

    启动history server

     /export/soft/spark-3.2.0-bin-hadoop3.2/sbin/start-history-server.sh
    
    

    启动角色

    /export/soft/spark-3.2.0-bin-hadoop3.2/sbin/start-all.sh
    

    webUI地址:http://192.168.132.100:8080/
    在这里插入图片描述
    启动集群客户端

     cd /export/soft/spark-3.2.0-bin-hadoop3.2/bin
      ./pyspark --master spark://hadoop100:7077
    

    执行代码

     sc.parallelize([1,2,3,4,5]).map(lambda x:x*10).collect()
    

    在这里插入图片描述
    Spark程序运行层次结构**

    job、state、task区别:

    • 一个spark程序被分成多个子任务job
    • 每个job分成多个阶段state
    • 每个state阶段分成多个task线程

    各个端口区别

    • 4040:一个运行Application运行过程中临时绑定的端口,来查看当前任务状态,程序运行完成后注销
    • 8080:默认Standalone下Master的Web端口,用来查看当前Master集群状态
    • 18080:默认是历史服务器的端口
    9.2.3、yarn模式

    无需部署spark 集群,只要找一台服务器当spark客户端将spark任务提交到集群

    master由yarn的ResourceManager担任,work由yarn的NodeManager担任,

    Driver角色运行在yarn容器内或提交任务的客户端进程中,Excutor运行在yarn容器中
    在这里插入图片描述
    注意spark-env.sh中要有如下配置:

    HADOOP_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
    YARN_CONF_DIR=/export/server/hadoop-3.3.0/etc/hadoop
    

    运行命令

    bin/pyspark --master yarn
    

    在这里插入图片描述
    在这里插入图片描述
    spark on yarn有两种运行模式,一种是cluster模式,一种是client模式,两个模式区别在于Driver运行的位置

    • cluster:Driver运行在yarn容器内部,和ApplicationMaster在一个容器(查看输入需要进入容器内部,比较麻烦)

      在这里插入图片描述

      在这里插入图片描述

    • client(默认):Driver运行在客户端进程中(通信成本较高,但查看输出、测试方便)

      在这里插入图片描述

      在这里插入图片描述

    两种模式对照

    clusterclient
    Driver运行位置yarn容器内客户端进程内
    通讯效率
    日志查看日志输出在容器内,查看不方便日志输出在客户端的标准输出流中,方便查看
    生产环境推荐不推荐
    稳定性稳定受到客户端进程影响

    client模式命令

     bin/spark-submit --master yarn --deploy-mode client --driver-memory 512m --executor-memory 512m --num-executors 3 --total-executor-cores 3 /export/soft/spark-3.2.0-bin-hadoop3.2/examples/src/main/python/pi.py 100
    
    

    控制台输出:Pi is roughly 3.152320

    cluster模式命令

     bin/spark-submit --master yarn --deploy-mode cluster --driver-memory 512m --executor-memory 512m --num-executors 3 --total-executor-cores 3 /export/soft/spark-3.2.0-bin-hadoop3.2/examples/src/main/python/pi.py 100
    

    控制台不会输出,可通过查看yarn日志查看输出
    在这里插入图片描述
    客户端模式提交流程:
    在这里插入图片描述
    集群模式提交流程:
    在这里插入图片描述

    9.3、Spark编程基础

    程序入口为SparkContext构建步骤:1、创建SaprkConf对象 2、基于SaprkConf创建SparkContext

    测试wordcount

    from pyspark import  SparkConf,SparkContext
    
    if __name__ == '__main__':
        conf = SparkConf().setMaster("local[*]").setAppName("WordCount")
        sc = SparkContext(conf=conf)
        file_rdd = sc.textFile("hdfs://hadoop100:8020/input/words.txt")
        rdd_flat_map = file_rdd.flatMap(lambda x: x.split(" "))
        withOne = rdd_flat_map.map(lambda x: (x, 1))
        reslut = withOne.reduceByKey(lambda a, b: a + b)
        print(reslut.collect())
    

    wordcount原理
    在这里插入图片描述
    Python on spark执行原理
    在这里插入图片描述
    在这里插入图片描述

    9.4、Spark Core

    9.4.1、RDD定义

    RDD-弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合

    RDD特性

    1. RDD有分区

      RDD的分区是RDD数据存储的最小单位,一份RDD数据本质上分割成多个分区(分区是物理概念

    2. RDD的方法会作用在其所有分区上

    3. RDD之间有依赖关系

    4. kv型的RDD可以有分区器(可选)

    5. RDD的分区规划会尽量靠近数据所在的服务器

      尽量走本地读取,避免网络读取
      在这里插入图片描述

    9.4.2、RDD编程
    9.4.2.1、RDD创建

    1、通过并行化集合创建(本地对象转分布式RDD)

    from pyspark import  SparkConf,SparkContext
    if __name__ == '__main__':
        conf = SparkConf().setMaster("local[*]").setAppName("test")
        sc = SparkContext(conf=conf)
        #单机对象转分布式对象
        rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
        print("默认分区数是硬件核心数",rdd.getNumPartitions())
        rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 2)
        print("指定分区数", rdd.getNumPartitions())
        #collect收集数据,将分布式对象收集成本地对象,分区数据发送到Driver
        print("rdd内容",rdd.collect())
    
    

    2、读取外部数据源(读取文件)

    from pyspark import  SparkConf,SparkContext
    if __name__ == '__main__':
        conf = SparkConf().setMaster("local[*]").setAppName("test")
        sc = SparkContext(conf=conf)
    
        rdd = sc.textFile("/tmp/pycharm_project_337/data/input/words.txt")
        print("默认分区数", rdd.getNumPartitions())
        print("rdd内容", rdd.collect())
    

    默认分区数与核心数无关

    wholeTextFile小文件专用

    9.4.2.2、RDD算子

    分布式集合对象的API称为算子

    分类:

    • transformation算子

      返回值仍然是RDD称为转换算子,懒加载,没有action算子时,transformation算子不生效

    • action算子

      返回值不是rdd的算子
      在这里插入图片描述
      transformation算子**

    1. map算子

      将RDD数据一条条处理

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          rdd = sc.parallelize([1, 2, 3, 4, 5, 6])
          #写法1:
          def add(data):
              return data*10
          rdd_map = rdd.map(add)
          print(rdd_map.collect())
          # 写法2:
          print(rdd.map(lambda x:x*5).collect())
      
    2. flatmap算子

      对rdd执行map操作,然后进行解除嵌套操作

      list=[[1,2,3],[4,5,6],[7,8,9]]
      #解除嵌套后
      list=[1,2,3,4,5,6,7,8,9]
      

      使用map获得嵌套的结果

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize(["hadoop spark hadoop","spark hadoop hadoop","flink spark hadoop"])
          print(rdd.map(lambda x: x.split(" ")).collect())
      

      [[‘hadoop’, ‘spark’, ‘hadoop’], [‘spark’, ‘hadoop’, ‘hadoop’], [‘flink’, ‘spark’, ‘hadoop’]]

      使用flatMap解除嵌套,无需传参

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize(["hadoop spark hadoop","spark hadoop hadoop","flink spark hadoop"])
          print(rdd.flatMap(lambda x: x.split(" ")).collect())
      

      [‘hadoop’, ‘spark’, ‘hadoop’, ‘spark’, ‘hadoop’, ‘hadoop’, ‘flink’, ‘spark’, ‘hadoop’]

    3. reduceByKey

      针对kv型rdd,按照key分组聚合

      rdd.reduceByKey(func)
      #func:(V,V)->V
      #接受2个参数(类型要一致),返回一个返回值,类型和传入要求一致
      
      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
          # reduceByKey的参数为迭代器,a为迭代结果,b为每条相同key的值
          print(rdd.reduceByKey(lambda a,b:a+b).collect())
      
    4. mapValues

      rdd.mapValues(func)
      #func:(V)->U
      #传入参数是二元元组的value,这个方法只对value处理
      
      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
          # reduceByKey的参数为迭代器,a为迭代结果,b为每条相同key的值
          print(rdd.mapValues(lambda x:x*10).collect())
      
    5. groupBy

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
          # groupBy参数:按谁分组返回谁
          rdd_group_by = rdd.groupBy(lambda x: x[0])
          print(rdd_group_by.map(lambda x: (x[0], list(x[1]))).collect())
      
      
    6. filter

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,2,3,4,5])
          # filter参数:过滤方法返回bool,true保留,false丢弃
          print(rdd.filter(lambda x: x%2==0).collect())
      
      
    7. distinct

      通过map、reduce去重

          def distinct(self, numPartitions=None):
              """
              Return a new RDD containing the distinct elements in this RDD.
      
              Examples
              --------
              >>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
              [1, 2, 3]
              """
              return self.map(lambda x: (x, None)) \
                         .reduceByKey(lambda x, _: x, numPartitions) \
                         .map(lambda x: x[0])
      

      测试

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,2,4,5,2,3,4,5])
          # filter参数:过滤方法返回bool,true保留,false丢弃
          print(rdd.distinct().collect())
      
      
    8. union

      合并rdd但不去重

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,2,4,5,'a','c'])
        
          print(rdd.union(sc.parallelize([13,5,7,"2"])).collect())
      
      
    9. join

      只能用于二元元组

      rdd.join(other_rdd)#内连接
      rdd.leftOutJoin(other_rdd)#左外连接
      rdd.rightOutJoin(other_rdd)#右外连接
      
      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd1 = sc.parallelize([('a',1),('d',2),('b',2),('c',1)])
          rdd2 = sc.parallelize([('a', 'aa'), ('d', 'dd')])
          # 只能按照key关联,不能通过value关联
          print(rdd1.join(rdd2).collect())
          print(rdd1.leftOuterJoin(rdd2).collect())
      
      
    10. intersection

    求两个rdd的交集

    from pyspark import  SparkConf,SparkContext
    if __name__ == '__main__':
     conf = SparkConf().setMaster("local[*]").setAppName("test")
     sc = SparkContext(conf=conf)
     #单机对象转分布式对象
     rdd = sc.parallelize([1,2,4,5,'a','c'])
    
     print(rdd.intersection(sc.parallelize([13, 5, 7, "2"])).collect())
    
    
    
    1. glom

      按照分区将数据嵌套

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,2,4,5,'a','c'])
      
          print(rdd.glom().collect())
      
      
      
    2. groupByKey

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([('a',1),('a',2),('b',2),('c',1)])
      
          rdd_group_by = rdd.groupByKey()
          print(rdd_group_by.map(lambda x: (x[0], list(x[1]))).collect())
      
      
      
    3. sortBy

      对rdd进行排序,只保证分区内有序,分区数设为1则全局有序

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([('a',1),('d',2),('b',2),('c',1),('e',3),('f',5)])
          #对键排序
          print(rdd.sortBy(lambda x: x[0], True,3).collect())
          #对值排序
          print(rdd.sortBy(lambda x: x[1], True, 3).collect())
      
      
      
    4. sortByKey

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([('a',1),('d',2),('b',2),('c',1),('e',3),('f',5)])
          #对键排序
          print(rdd.sortByKey().collect())
      
      

    action算子

    1. countBykey

      按key计数

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          file_rdd = sc.textFile("../../data/input/words.txt")
          rdd = file_rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))
          print(rdd.countByKey().items())
      
    2. collect

      将各个分区的数据都拉取到Driver形成list,会占用Driver内容,所以数据集不能太大

    3. reduce

      对rdd数据按照自定义逻辑进行聚合

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,2,3,4,5])
          print(rdd.reduce(lambda a,b:a+b))
      
    4. fold

      和reduce一样,只不过聚合带有初始值

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,2,3,4,5,6],3)
          print(rdd.fold(10,lambda a,b:a+b))
      

      分区内有初始值10,分区间也有初始值10

    5. first、take、top、count

      first取出rdd第一个元素

      take(n)取rdd的前n个元素

      top对rdd降序取前n个(只能降序)

      count返回rdd有多少个元素

    6. takeSample

      参数1:取的数据是否可以重复,参数2:取几个数据,参数3:随机数种子(默认不写)

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,2,3,4,5,6])
          print(rdd.takeSample(True,3))
      
      
    7. takeOrdered

      对rdd进行排序取前n个

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,4,8,2,7,8,2,5])
          print(rdd.takeOrdered(3))
          print(rdd.takeOrdered(3,lambda x:-x))
      
      
    8. foreach

      对rdd每个元素执行逻辑操作,相比map没有返回值。由Excutor直接执行,不需要经过Driver,效率高

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,4,8,2,7,8,2,5],1)
          rdd.foreach(lambda x:print(x*10))
      
    9. saveAsTextFile

      将rdd数据写入到文件,支持本地和hdfs写出。由Excutor直接执行,不需要经过Driver,效率高

    分区操作算子

    1. mapPartitions1.

      在这里插入图片描述

      mapPartitions一次传递的是一整个分区的数据,作为一个迭代器(一次性list)对象传入过来。一次操作一整个分区,map一次操作一条数据

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,4,8,2,7,8,2,5],3)
          def process(iter):
              result=list()
              for i in iter:
                  result.append(i*10)
              return  result
          print(rdd.mapPartitions(process).collect())
      
      
      
    2. foreachPartition

      类似mapPartitions

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,4,8,2,7,8,2,5],3)
          def process(iter):
              result=list()
              for i in iter:
                  result.append(i*10)
              print(result)
          rdd.foreachPartition(process)
      
      
    3. partitionBy

      对RDD进行自定义分区

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([('a',1),('d',2),('b',2),('c',1),('e',3),('f',5)])
          def process(k):
              if'a'==k or 'b'==k:return 0
              if 'c'==k:return 1
              return 2
      
      
          print(rdd.partitionBy(3, process).glom().collect())
      
      
      
    4. repartiton(慎重使用)

      对rdd分区执行重分区。修改分区会影响并行计算,分区增加很可能导致shuffle。尽量减少不增加分区。

      from pyspark import  SparkConf,SparkContext
      if __name__ == '__main__':
          conf = SparkConf().setMaster("local[*]").setAppName("test")
          sc = SparkContext(conf=conf)
          #单机对象转分布式对象
          rdd = sc.parallelize([1,4,8,2,7,8,2,5],3)
          print(rdd.repartition(1).getNumPartitions())
          print(rdd.repartition(5).getNumPartitions())
          #建议使用
          print(rdd.coalesce(1).getNumPartitions())
          print(rdd.coalesce(5).getNumPartitions())
          print(rdd.coalesce(5, True).getNumPartitions())
      
      
      
  • 相关阅读:
    常见排序算法及其使用场景
    敏感词过滤--golang
    2022年最新四川水利水电施工安全员模拟试题题库及答案
    origin 上下图标
    Rancher的主要功能有哪些?
    electron初学
    C++知识点大全(第二版)
    网络限速导致的服务器访问https异常得处理过程
    【牛客 - 剑指offer】JZ64 求1+2+3+...+n Java实现
    C语言趣味代码(一)
  • 原文地址:https://blog.csdn.net/wslzoooo/article/details/142164713