• 大数据之Spark(二)


    9.4.3、RDD持久化

    RDD之间进行相互迭代计算(Transformation的转换),当执行开启,新RDD的生成代表旧RDD消失。如果有的rdd需要重复使用就需要将rdd缓存,rdd.cache()或rdd.persist()。清理缓存rdd.unpersist()

    在这里插入图片描述

    缓存特点(分散存储):保留rdd之间的血缘关系

    from pyspark import  SparkConf,SparkContext
    from pyspark.storagelevel import StorageLevel
    if __name__ == '__main__':
        conf = SparkConf().setMaster("local[*]").setAppName("test")
        sc = SparkContext(conf=conf)
        #单机对象转分布式对象
        rdd1 = sc.textFile("../../data/input/words.txt")
        rdd2 = rdd1.flatMap(lambda x: x.split(" "))
        rdd3 = rdd2.map(lambda x: (x, 1))
        rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
        print(rdd4.collect())
    
        rdd3.cache()
        #或者使用
        rdd3.persist(StorageLevel.MEMORY_AND_DISK)
        rdd5 = rdd3.groupByKey()
        rdd6 = rdd5.mapValues(lambda x: sum(x))
        print(rdd6.collect())
        rdd3.unpersist()
    

    checkpoint仅支持硬盘存储,设计认为安全不保留血缘关系

    在这里插入图片描述

    checkpoint存储rdd数据是收集各个分区数据在HDFS上进行集中存储,而缓存是分散存储

    from pyspark import  SparkConf,SparkContext
    from pyspark.storagelevel import StorageLevel
    if __name__ == '__main__':
        conf = SparkConf().setMaster("local[*]").setAppName("test")
        sc = SparkContext(conf=conf)
        #单机对象转分布式对象
        sc.setCheckpointDir("hdfs://hadoop100:8020/output/ckp")
        rdd1 = sc.textFile("../../data/input/words.txt")
        rdd2 = rdd1.flatMap(lambda x: x.split(" "))
        rdd3 = rdd2.map(lambda x: (x, 1))
        rdd3.checkpoint()
        rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
        print(rdd4.collect())
    
    
        
        rdd5 = rdd3.groupByKey()
        rdd6 = rdd5.mapValues(lambda x: sum(x))
        print(rdd6.collect())
    
    
    、共享变量
    9.4.4.1、广播变量

    将Driver上的本地变量广播到Excutor分区进行重复使用,广播变量会避免内存浪费和网络IO

    from pyspark import  SparkConf,SparkContext
    from pyspark.storagelevel import StorageLevel
    if __name__ == '__main__':
        conf = SparkConf().setMaster("local[*]").setAppName("test")
        sc = SparkContext(conf=conf)
        stu_info_list=[
            (1,'zhangsan',11),
            (2,'lisi',13),
            (3,'wangwu',12),
            (4,'zhaoliu',11)
        ]
        broadcast = sc.broadcast(stu_info_list)
        rdd = sc.parallelize(
            [(1, "yuwen", 99), (2, 'shuxue', 99), (3, 'yingyu', 99), (4, 'baincheng', 99), (1, "yuwen", 99),
             (2, 'shuxue', 99), (3, 'yingyu', 99), (4, 'baincheng', 99), (1, "yuwen", 99), (2, 'shuxue', 99),
             (3, 'yingyu', 99), (4, 'baincheng', 99), (1, "yuwen", 99), (2, 'shuxue', 99), (3, 'yingyu', 99),
             (4, 'baincheng', 99), ])
        def map_func(data):
            id=data[0]
            for stu_info in broadcast.value:
                stu_id=stu_info[0]
                if id==stu_id:
                    name=stu_info[1]
            return (name ,data[1],data[2])
        print(rdd.map(map_func).collect())
    
    
    9.4.4.2、累加器

    普通变量无法实现在分布式下累加,通过 sc.accumulator()创建累加器

    from pyspark import  SparkConf,SparkContext
    from pyspark.storagelevel import StorageLevel
    if __name__ == '__main__':
        conf = SparkConf().setMaster("local[*]").setAppName("test")
        sc = SparkContext(conf=conf)
        rdd = sc.parallelize([1, 4, 8, 2, 7, 8, 2, 5], 2)
        sc_accumulator = sc.accumulator(0)
        def map_func(data):
            global sc_accumulator
            sc_accumulator+=1
            print(sc_accumulator)
    
        rdd.map(map_func).collect()
        print(sc_accumulator)
    
    9.4.5、Spark内核调度
    9.4.5.1、DAG

    DAG——有向无环图。

    Job与Action

    • Action返回值不是RDD算子,其作用是一个触发开关将rdd链条执行起来
    • 一个应用程序会产生多个Job,每个Job对应一个Action(如:collect ())
      在这里插入图片描述
      每个rdd链条对应一个Job,每个rdd链条(job)可以看做一个DAG,多个job合并为一个应用,每个job又分成多个stage。
      在这里插入图片描述
      结论:1个Action=1个DAG=1个JOB
    9.4.5.2、DAG的宽窄依赖和阶段划分

    在RDD前后之前的关系分为:窄依赖和宽依赖

    窄依赖:父rdd的一个分区将全部数据发给子rdd的一个分区(一对一
    在这里插入图片描述
    宽依赖(shuffle):父rdd的一个分区将数据发给子rdd的多个分区(一对多
    在这里插入图片描述
    宽窄依赖用来划分阶段

    划分依据:从后向前,遇到宽依赖就划分出一个阶段,称为stage。所以在stage内部一定是窄依赖。
    在这里插入图片描述

    9.4.5.3、内存迭代计算

    Spark默认受到全局并行度的限制,不推荐在算子上设置并行度,否则会导致shuffle(产生网络IO),影响性能。

    9.4.5.4、Spark并行度

    同一时间内task同时运行的数量为并行度。在有了6个task并行的前提下,rdd分区就被规划成6个分区。并行度会影响分区。先有并行度后有分区。

    并行度设置

    • 设置全局并行度(推荐)

      修改配置文件:

      conf/spark-defaults.conf中设置
      spark.defaults.parallelism 100
      

      在客户端提交参数

      bin/spark-submit --conf "spark.defaults.parallelism=100"
      

      在代码中设置

      conf=SparkConf()
      conf.set("spark.defaults.parallelism","100")
      
    • 针对rdd单独设置并行度(不推荐)

    并行度规划

    并行度设置为核心数的2-10倍。防止task压力不均衡会导致CPU空闲。将并行度提高,就提高了分区数,将大的任务分解成小任务,最大化利用资源。

    9.4.5.4、Spark任务调度

    Spark的任务由Driver进行调度,包含:

    1. 逻辑DAG产生
    2. 分区DAG产生
    3. Task划分
    4. 将Task分配给Executor并监控其工作

    调度流程如图:
    在这里插入图片描述
    具体流程:

    1. 构建Driver
    2. 构建SparkContext
    3. 基于DAG Scheduler(DAG调度器)构建逻辑Task分配
    4. 基于TaskScheduler(Task调度器)将逻辑Task分配到各个Executor上干活并监控
    5. Worker(Executor)被TaskScheduler管理监控,听从其指令干活并定期汇报进度

    其中,1,2,3,4都是的Driver的工作,5是Worker的工作

    Driver内两个组件:

    • DAG调度器:将逻辑DAG图进行处理得到逻辑上的Task划分
    • Task调度器:基于DAG Scheduler的产出来规划这些逻辑Task应该在哪些物理的Executor上运行,并监控管理它们

    总结
    在这里插入图片描述

    9.5、Spark SQL

    特点:

    1. 无缝集成SQL
    2. 可读写不同数据源
    3. Hive兼容
    4. 标准化连接
    9.5.1、概述

    Spark与Hive对比
    在这里插入图片描述
    数据抽象:RDD、DataFrame(二维表数据结构)

    SparkSession:在rdd中,程序执行入口对象是SparkContext。SparkSession可用于SparkSQL入口对象,也可用于SparkCore中获取SparkContext

    helloworld测试

    txt文件

    1,shuxue,99
    2,shuxue,99
    3,shuxue,9
    4,shuxue,99
    5,shuxue,88
    6,shuxue,77
    1,yuwen,5
    2,yuwen,44
    3,yuwen,6
    4,yuwen,3
    5,yuwen,6
    6,yuwen,3
    

    代码

    from pyspark.sql import SparkSession
    if __name__ == '__main__':
        spark_session = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark_session.sparkContext
        df = spark_session.read.csv("../../data/input/stu_score.txt", sep=",", header=False)
        df2 = df.toDF("id", "name", "score")
        df.printSchema()
        df2.show()
        view = df2.createTempView("score")
        spark_session.sql("select * from score where  id=1").show()
    
    9.5.2、DataFrame
    9.5.2.1、组成

    二维表结构,row行,column列

    结构层面:StructType描述表结构,StructField描述列信息
    在这里插入图片描述

    9.5.2.2、创建
    1. rdd转DataFrame

      txt

      lilei,29
      zhaosi,22
      wangwu,11
      

      方法1:代码

      from pyspark.sql import SparkSession
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          rdd = sc.textFile("../../data/input/people.txt").\
              map(lambda  x:x.split(",")).\
              map(lambda x:(x[0],int(x[1])))
          df = spark.createDataFrame(rdd, schema=['name', 'age'])
          df.printSchema()
          df.show()
      

      方法2:代码

      from pyspark.sql import SparkSession
      from pyspark.sql.types import StructType,StringType,IntegerType
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          rdd = sc.textFile("../../data/input/people.txt").\
              map(lambda  x:x.split(",")).\
              map(lambda x:(x[0],int(x[1])))
          schema = StructType().add("name", StringType(), nullable=True).add("age", IntegerType(), nullable=False)
          df = spark.createDataFrame(rdd, schema=schema)
          df.printSchema()
          df.show()
      

      方法3:代码

      from pyspark.sql import SparkSession
      from pyspark.sql.types import StructType,StringType,IntegerType
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          rdd = sc.textFile("../../data/input/people.txt").\
              map(lambda  x:x.split(",")).\
              map(lambda x:(x[0],int(x[1])))
          #方式1:
          df = rdd.toDF(["name", "age"])
          df.printSchema()
          df.show()
          #方式2
          schema = StructType().add("name", StringType(), nullable=True).add("age", IntegerType(), nullable=False)
          df1 = rdd.toDF(schema=schema)
          df1.printSchema()
          df1.show()
      
      

      方法4:代码

      from pyspark.sql import SparkSession
      from pyspark.sql.types import StructType,StringType,IntegerType
      import pandas as pd
      
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          pdf = pd.DataFrame({"id": [1, 2, 3], "name": ["zhangsan", "lisi", "wangwu"], "age": [11, 33, 44]})
          schema = StructType().add("id",IntegerType(), nullable=False).add("name", StringType(), nullable=True).add("age", IntegerType(), nullable=False)
          df = spark.createDataFrame(pdf, schema)
          df.printSchema()
          df.show()
      
    2. 读取外部数据

      通过SparkSQL的统一API进行数据读取构建DataFrame,text数据源文件读取数据将一整行只作为一个列读取,默认列名是value,类型是String

      from pyspark.sql import SparkSession
      from pyspark.sql.types import StructType,StringType,IntegerType
      import pandas as pd
      
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          schema = StructType().add("data",StringType(), nullable=False)
          df = spark.read.format("text").schema(schema=schema).load("../../data/input/people.txt")
          df.printSchema()
          df.show()
      

      结果:
      在这里插入图片描述

    3. 读取json数据源,json自带schema信息

      from pyspark.sql import SparkSession
      from pyspark.sql.types import StructType,StringType,IntegerType
      import pandas as pd
      
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          df = spark.read.format("json").load("../../data/input/people.json")
          df.printSchema()
          df.show()
      

      读取csv数据源

      from pyspark.sql import SparkSession
      from pyspark.sql.types import StructType,StringType,IntegerType
      import pandas as pd
      
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          df = spark.read.format("csv").option("seq",",").option("header",True).option("encoding","UTF-8").schema("name STRING,age INT,job STRING").load("../../data/input/people.csv")
          df.printSchema()
          df.show()
      

      读取parquet数据源,parquet是Spark常用的列存储文件格式

      from pyspark.sql import SparkSession
      from pyspark.sql.types import StructType,StringType,IntegerType
      import pandas as pd
      
      if __name__ == '__main__':
          spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
          sc = spark.sparkContext
          df = spark.read.format("parquet").load("../../data/input/people.parquet")
          df.printSchema()
          df.show()
      
    9.5.2.3、编程

    两种风格:DSL(领域特定语言)、SQL

    DSL

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StringType,IntegerType
    import pandas as pd
    
    
    if __name__ == '__main__':
        spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark.sparkContext
        rdd = sc.textFile("../../data/input/people.txt").\
            map(lambda  x:x.split(",")).\
            map(lambda x:(x[0],int(x[1])))
        df = spark.createDataFrame(rdd, schema=['name', 'age'])
        #DSL
        df.select('name').show()
        #过滤 filter=where
        df.filter("age<50").show()
        df.where(df['age']<22).show()
    
    

    SQL

    只要将DataFrame注册成一个视图或表即可

    df.createOrReplaceTempView("people")#临时表,如果存在则替换
    df.createTempView("people")#临时视图
    df.createGlobalTempView("people")#注册全局表
    

    全局表:跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询带上前缀global_temp.

    临时表:只在当前SparkSession可用

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StringType,IntegerType
    import pandas as pd
    if __name__ == '__main__':
        spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark.sparkContext
        rdd = sc.textFile("../../data/input/people.txt").\
            map(lambda  x:x.split(",")).\
            map(lambda x:(x[0],int(x[1])))
        df = spark.createDataFrame(rdd, schema=['name', 'age'])
        df.createTempView("people")  # 临时视图
        df.createOrReplaceTempView("people1")#临时表,如果存在则替换
        df.createGlobalTempView("people2")#注册全局表
        spark.sql("select name from people").show()
        spark.sql("select age from global_temp.people2").show()
    
    

    词频统计

    text

    hello hadoop
    hello spark
    hello spark
    hello flink
    

    代码

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StringType,IntegerType
    import pandas as pd
    from pyspark.sql import functions as F
    if __name__ == '__main__':
        spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark.sparkContext
        #SQL
        rdd = sc.textFile("../../data/input/words.txt").\
            flatMap(lambda  x:x.split(" ")).\
            map(lambda x:[x])
        df = rdd.toDF(["word"])
        df.createTempView("words")
        spark.sql("select word,count(*) as cnt from words group by word order by cnt desc").show()
        #DSL
        df = spark.read.format("text").load("../../data/input/words.txt")
        #withColumn方法
        #对已经存在的列进行操作返回新的列,如名字和旧列相同则替换,否则作为新列
        df2 = df.withColumn("value", F.explode(F.split(df['value'], " ")))
        df2.groupBy('value').count().withColumnRenamed("value","word").withColumnRenamed("count","cnt").orderBy("cnt",ascending=False).show()
    

    SparkSQL中当job中产生shuffle时,默认的分区数(spark.sql.shuffle.partitons)为200,项目中要合理设置
    在这里插入图片描述
    异常数据处理

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StringType,IntegerType
    import pandas as pd
    
    if __name__ == '__main__':
        spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark.sparkContext
        df = spark.read.format("csv").option("seq",",").option("header",True).option("encoding","UTF-8").load("../../data/input/people.csv")
        #行数据去重
        df.dropDuplicates().show()
        #针对某列去重
        df.dropDuplicates(['age','job']).show()
        #缺失值处理
        #缺失值该行删除
        df.dropna().show()
        #每行三个有效值
        df.dropna(thresh=3).show()
        #参数列中每行要有两个有效值
        df.dropna(thresh=2,subset=['age','job']).show()
        #缺失值填充"kong"
        df.fillna("kong").show()
        #针对列进行填充
        df.fillna("N/A",subset=['job']).show()
        #用字典设置默认填充
        df.fillna({"name":"unkown","age":1,"job":"dev"}).show()
    
    

    DataFrame数据写出统一API

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StringType,IntegerType
    import pandas as pd
    from pyspark.sql import functions as F
    if __name__ == '__main__':
        spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark.sparkContext
        df = spark.read.format("csv").option("seq",",").option("header",True).option("encoding","UTF-8").load("../../data/input/people.csv")
        df.show()
        df.select(F.concat("name","age","job")).write.mode("overwrite").format("text").save("../../data/output/text")
        df.write.mode("overwrite").format("csv").option("seq",",").option("header",True).save("../../data/output/csv")
        df.write.mode("overwrite").format("json").save("../../data/output/json")
        #默认写出parqeut格式
        df.write.mode("overwrite").save("../../data/output/parqeut")
    

    写入到MySQL,jdbc会自动建表

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StringType,IntegerType
    import pandas as pd
    from pyspark.sql import functions as F
    if __name__ == '__main__':
        spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark.sparkContext
        df = spark.read.format("csv").option("seq",",").option("header",True).option("encoding","UTF-8").load("../../data/input/people.csv")
        df.show()
        df.write.mode("overwrite").format("jdbc")\
            .option("url","jdbc:mysql://127.0.0.1:3306/dalian5?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true&autoReconnect=true")\
            .option("dbtable","spark")\
            .option("user","root") \
            .option("password", "123")\
            .save()
    

    读取MySQL数据

    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType,StringType,IntegerType
    import pandas as pd
    from pyspark.sql import functions as F
    if __name__ == '__main__':
        spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
        sc = spark.sparkContext
        df = spark.read.format("csv").option("seq",",").option("header",True).option("encoding","UTF-8").load("../../data/input/people.csv")
    
        # df.write.mode("overwrite").format("jdbc")\
        #     .option("url","jdbc:mysql://127.0.0.1:3306/dalian5?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true&autoReconnect=true")\
        #     .option("dbtable","spark")\
        #     .option("user","root") \
        #     .option("password", "123")\
        #     .save()
        spark.read.format("jdbc")\
            .option("url","jdbc:mysql://127.0.0.1:3306/dalian5?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true&allowPublicKeyRetrieval=true&autoReconnect=true")\
            .option("dbtable","spark")\
            .option("user","root") \
            .option("password", "123")\
            .load().show()
    
    9.5.3、SparkSQL的运行流程
    9.5.3.1、Catalyst优化器

    SparkSQL相比于rdd可以实现自动优化,优化器Catalyst
    在这里插入图片描述

    1. API层接收SQL语句
    2. Catalyst解析SQL并生成执行计划
    3. Catalyst输出是rdd的执行计划
    4. 集群执行

    Catalyst优化器具体流程:
    在这里插入图片描述
    5. 解析SQL并生成AST(抽象语法树)
    6. 在AST中加入元数据信息
    7. 对已经加入元数据的AST输入优化器进行优化
    8. 上述逻辑计划结束后,生成物理计划,从而生成rdd来运行

    9.5.3.2、SparkSQL的执行流程
    1. 提交SparkSQL代码
    2. Catalyst优化
    3. Driver执行环境入口构建(SparkSession)
    4. DAG调度器规划逻辑任务
    5. Task调度器分配逻辑任务到具体的Executor上工作并监控、管理任务
    6. Worker干活

    9.6、Spark on Hive

    Spark借用Metastore服务实现元数据管理的功能
    在这里插入图片描述

  • 相关阅读:
    node开发微信群聊机器人第④章
    Python怎么打印彩色字符串
    苏格拉底告诉我们什么是爱情和婚姻
    力扣每日一题
    学习java的第二十二天。。。(异常)
    [题] 前缀和 (含输入输出的耗时对比)
    Android Native 开发 要点记录
    使用UICollectionView制作轮播图(一)
    【图像去噪】基于matlab PM模型图像降噪【含Matlab源码 2107期】
    从零用VitePress搭建博客教程(6) -– 第三方组件库的使用和VitePress搭建组件库文档
  • 原文地址:https://blog.csdn.net/wslzoooo/article/details/142165460