Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍,甚至能够将应用在磁盘上的运行速度提升10倍。除了Map和Reduce操作之外,Spark还支持SQL查询,流数据,机器学习和图表数据处理。开发者可以在一个数据管道用例中单独使用某一能力或者将这些能力结合在一起使用。
Driver:是作业的主进程,是程序入口,Driver进行启动后,向master发送请求,进行注册,申请资源;
Master:进行管理所有worker节点,分配任务,收集运行信息,监控worker存活状态;
worker:与master节点通信,管理spark任务,启动Executor
Executor:一个Executor执行多个task
task:task是一个线程,具体的spark任务就是在task上,有多少分区就有多少task
(1) 构建Spark Application运行环境,SparkContext向资源管理器注册,并申请Executor,资源管理器分配并启动Executor
(2) Executor发送心跳到资源管理器,进行注册;然后SparkContext构建DAG图,DAGScheduler将DAG分解成Satge和task,并发送给TaskScheduler
(3) Executor向sparkContext申请task,TaskScheduler将task发送给Executor运行,同时sparkContext将应用程序发送给Executor,运行完毕并释放资源。
聊一聊Spark实现TopN的几种方式 - 知乎 (zhihu.com)
(1)将数据使用reparation或coalesce分区成多个RDD;
(2)每个分区数据加载到内存使用局部排序算法在每个分区获取topn;
(3)将每个分区topn收集到驱动程序,并合并成一个RDD;
(4)对该RDD进行全局排序,获取最终topn;
(5)排序算法可以使用takeordered或sortby等操作;
groupbykey:根据key对RDD进行分组,直接进行shuffle操作,这样方法会将RDD所有的元素打乱重新分组,处理大数据时可能会导致性能问题。
reducebykey方法先将相同key元素进行合并,然后对value进行聚合操作,最终返回一个新的RDD。这样在本地聚合,在进行shuffle操作可以减少shuffle操作数量。
// 最终结果
("a", 3), ("b", 7), ("c", 5)
// 最终结果
("a", Iterable(1,2)), ("b", Iterable(3,4)), ("c", Iterable(5))
hash join : 依次读取 小表的数据,对于每一行数据根据 join key 进行 hash,hash 到对应的 bucket, 生成 Hash Table 中一条记录。数据缓存在内存里,如果内存放不下,需dump 到外存。然后再次扫描 大表的数据,使用相同的 hash 函数映射 HashTable 中的记录,映射成功之后再检查 join 的条件,如果匹配成功就可以将二者 join 在一起。
broadcast hash join:broadcast阶段: 将小表广播分发到大表所在的所有主机;hash join阶段: 在每个executor上执行单机版hash join,小表映射,大表试探
shuffle hash join:在大数据条件下如果一张表很小,执行join操作最优的选择无疑是broadcast hash join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,broadcast hash join就不再是最优方案。此时先在shufle阶段: 分别将两个表按照join key进行分区,将相同join key的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。其次hash join阶段: 每个分区节点上的数据单独执行单机hash join算法
sort-merge join:首先,shufle阶段: 将两张大表根据join key进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理,
第二阶段,sort阶段: 对单个分区节点的两表数据,分别进行排序。最后,merge阶段: 对排好序的两张分区表数据执行join操作。join操作很简单,分别遍历两个有序序列,碰到相同ioin key就merge输出,否则取更小一边。比如按升序排列,某个值明显比它大了,后面肯定就不会有相等的,就不用继续比较了,节省了时间和内存。
Spark是一个分布式计算框架,它有很多参数可以用来调优性能和资源利用。根据不同的场景和需求,可以选择合适的参数来优化Spark任务的执行效率。一般来说,Spark的参数可以分为以下几类:
spark.executor.cores
用来设置每个Executor的核数,spark.executor.memory
用来设置每个Executor的内存大小,spark.default.parallelism
用来设置默认的并行度等。spark.shuffle.file.buffer
用来设置Shuffle文件写入磁盘时的缓冲区大小,spark.shuffle.compress
用来设置是否对Shuffle文件进行压缩,spark.sql.shuffle.partitions
用来设置Shuffle操作产生的分区数等。spark.sql.adaptive.enabled
用来开启自适应查询执行(AQE),它可以根据运行时的统计信息动态调整Shuffle分区数和Join策略等,spark.sql.broadcastTimeout
用来设置广播Join的超时时间,spark.sql.autoBroadcastJoinThreshold
用来设置广播Join的阈值等。https://blog.csdn.net/zxl55/article/details/79572475
数据倾斜的原理很简单:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。此时第一个task的运行时间可能是另外两个task的7倍,而整个stage的运行速度也由运行最慢的那个task所决定。
可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出现数据倾斜时,可能就是你的代码中使用了这些算子中的某一个所导致的。
Spark Web UI上深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
假设我们有一个spark作业,需要对两个Hive表进行join操作,然后进行一些聚合和分析。
避免创建重复的RDD:之前对于某一份数据已经创建过一个RDD了,从而导致对于同一份数据,创建了多个RDD,进而增加了作业的性能开销。
尽可能复用同一个RDD:两个RDD的value数据是完全一样的,那么此时我们可以只使用key-value类型的那个RDD。
对多次使用的RDD进行持久化:persist(StorageLevel.MEMORY_AND_DISK_SER)
避免使用shuffle类算子:shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。
因此可以使用Broadcast与map进行join:
使用map-side预聚合的shuffle操作:map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。通常来说,在可能的情况下,建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子。
使用高性能的算子:
广播大变量:在算子函数中,使用广播变量时,首先会判断当前task所在Executor内存中,是否有变量副本。 如果有则直接使用;如果没有则从Driver或者其他Executor节点上远程拉取一份放到本地Executor内存中。每个Executor内存中,就只会驻留一份广播变量副本。
使用Kryo优化序列化性能:1、在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。2、将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。3、使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
未经优化的HashShuffleManager:shuffle write阶段,主要就是在一个stage结束计算之后,为了下一个stage可以执行shuffle类的算子(比如reduceByKey),而将每个task处理的数据按key进行“分类”。所谓“分类”,就是对相同的key执行hash算法,从而将相同key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游stage的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。下一个stage的task有多少个,当前stage的每个task就要创建多少份磁盘文件。shuffle write的过程中,task给下游stage的每个task都创建了一个磁盘文件,因此shuffle read的过程中,每个task只要从上游stage的所有task所在节点上,拉取属于自己的那一个磁盘文件即可。
优化后的HashShuffleManager:开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。**当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。**也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。
SortShuffleManager的普通运行机制:在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。
SortShuffleManager的bypass运行机制:此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行 排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也 就节省掉了这部分的性能开销。
spark 与 hadoop 最大的区别在于迭代式计算模型。基于 mapreduce 框架的 Hadoop 主要分为 map 和 reduce 两个阶段,两个阶段完了就结束了,所以在一个 job 里面能做的处理很有限;spark 计算模型是基于内存的迭代式计算模型,可以分为 n 个阶段,根据用户编写的 RDD 算子和程序,在处理完一个阶段后可以继续往下处理很多个阶段,而不只是两个阶段。所以 spark 相较于 mapreduce,计算模型更加灵活,可以提供更强大的功能。
但是 spark 也有劣势,由于 spark 基于内存进行计算,虽然开发容易,但是真正面对大数据的时候,在没有进行调优的情况下,可能会出现各种各样的问题,比如 OOM 内存溢出等情况,导致 spark 程序可能无法运行起来,而 mapreduce 虽然运行缓慢,但是至少可以慢慢运行完。
Spark SQL是Spark系统的核心组件,它可以将用户编写的SQL语句或者DataFrame/Dataset API转换成Spark Core的RDD操作,从而实现对结构化或者半结构化数据的高效处理。Spark SQL的执行流程主要包括以下几个步骤:
cache()默认将RDD持久化到内存中,如果内存不足,可以将数据写入磁盘。
persist0可以让用户指定数据持久化的级别,可以将RDD持久化到内存、磁盘或者是内存和磁盘的组合中。此外,persist0还可以让用户指定数据持久化的序列化方式,这对于一些需要自定义序列化方式的对象非常有用。
checkpoint0方法则可以将RDD的数据写入到磁盘上,从而避免OOM的问题。此外,checkpoint方法也可以优化RDD的依赖关系,减少宽依赖的产生,提高任务的并行度,从而提高任务的执行效率。checkpoint方法会触发一个新的job来计算RDD,并将计算结果写入磁盘。因此,使用checkpoint0方法会比cache和persist方法产生更多的开销。checkpoint方法一般用于对一些重要的、频繁使用的RDD进行持久化,以提高程序的健壮性和执行效率。
根据算子是否Action算子,来划分job
根据算子是否需要shuffle,来划分stage
根据rdd分区数,划分task
Action操作触发一个job,jon交给DAGsheduler分解成stage
RDD 是 Spark 提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。
- 即如果某个结点上的 RDD partition 因为节点故障,导致数据丢失,那么 RDD 可以通过自己的数据来源重新计算该 partition。这一切对使用者都是透明的。
- RDD 的弹性体现在于 RDD 上自动进行内存和磁盘之间权衡和切换的机制。
一般来说,如果你的数据是结构化或半结构化的,比如表格、JSON、CSV等,那么使用DataFrame会更方便,因为它可以提供更高层次的API,支持SQL查询,以及通过Catalyst优化器进行性能优化。而如果你的数据是非结构化的,比如文本、图像、二进制等,那么使用RDD会更灵活,因为它可以让你自定义转换和处理函数,以及控制数据分区和缓存等底层细节。
RDD 是 Spark 的基础数据抽象,它可以处理各种类型的数据,包括结构化、半结构化和非结构化数据。DataFrame 是 Spark SQL 模块提供的一种高层数据抽象,它只针对结构化或半结构化数据,需要指定数据的 schema(结构信息)。
RDD 是分布式的 Java 对象的集合,每个对象可以是任意类型,Spark 不关心对象的内部结构。DataFrame 是分布式的 Row 对象的集合,每个 Row 对象包含多个列,每列有名称和类型,Spark 可以根据 schema 优化数据的存储和计算。
RDD 提供了 low-level 的转换和行动操作,可以用函数式编程的风格来操作数据,但是不支持 SQL 语言和 DSL(特定领域语言)。DataFrame 提供了 high-level 的转换和行动操作,可以用 SQL 语言和 DSL 来操作数据,比如 select, groupby 等。
RDD 的优点是编译时类型安全,可以在编译时发现错误,而且具有面向对象编程的风格。DataFrame 的优点是利用 schema 信息来提升执行效率、减少数据读取以及执行计划的优化,比如 filter 下推、裁剪等。
RDD 的缺点是构建大量的 Java 对象占用了大量的堆内存空间,导致频繁的垃圾回收(GC),影响程序执行效率。而且数据的序列化和反序列化性能开销很大。DataFrame 的缺点是编译时类型不安全,只能在运行时发现错误,而且不具有面向对象编程的风格。
使用程序中的集合创建RDD
如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用**SparkContext的paralleize()**方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,也就是一个RDD。
//案例:1到10累加求和(scala)
val arr = Array(1,2,3,4,5,6,7,8,9,10)
val rdd = sc.parallelize(arr)
val sum = rdd.reduce(_+_)
使用本地的文件创建RDD
使用HDFS来创建RDD
通过调用SparkContext的textFile()方法,可以针对本地文件或HDFS文件创建RDD。
//案例:文件字数统计
val rdd = sc.textFile("data.txt")
val wordcount = rdd.map(line => line.length).reduce(_+_)
RDD 持久化是 Spark 的一个重要特性,它可以将 RDD 的数据保存在内存或磁盘中,避免重复计算。RDD 持久化的实现主要有以下几个步骤:
Spark streaming 是 spark core API 的一种扩展,可以用于进行大规模、高吞吐量、容错的实时数据流的处理。
它支持从多种数据源读取数据,比如 Kafka、Flume、Twitter 和 TCP Socket,并且能够使用算子比如 map、reduce、join 和 window 等来处理数据,处理后的数据可以保存到文件系统、数据库等存储中。
Spark streaming 内部的基本工作原理是:接受实时输入数据流,然后将数据拆分成 batch,比如每收集一秒的数据封装成一个 batch,然后将每个 batch 交给 spark 的计算引擎进行处理,最后会生产处一个结果数据流,其中的数据也是一个一个的 batch 组成的。
https://zhuanlan.zhihu.com/p/495217561
Kafka 本质上是一个 MQ(Message Queue),使用消息队列的好处?
解耦:允许我们独立的扩展或修改队列两边的处理过程。
可恢复性:即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
缓冲:有助于解决生产消息和消费消息的处理速度不一致的情况。
灵活性&峰值处理能力:不会因为突发的超负荷的请求而完全崩溃,消息队列能够使关键组件顶住突发的访问压力。
异步通信:消息队列允许用户把消息放入队列但不立即处理它。
生产者(Producer)向Kafka集群发送消息,消息被封装成一个ProducerRecord对象,该对象包含了要发送的主题(Topic)、分区(Partition)、键(Key)、值(Value)等信息。
Kafka集群根据ProducerRecord对象的信息,将消息存储到相应的主题和分区中。主题是逻辑上的消息分类单位,分区是物理上的消息存储单位,每个分区对应一个日志文件(Log),日志文件中的消息都有一个唯一的偏移量(Offset)。
消费者(Consumer)从Kafka集群订阅主题,并从指定的分区中拉取消息。消费者属于某个消费者组(Consumer Group),同一个消费者组内的消费者可以消费同一个主题的不同分区,同一个分区只能被同一个消费者组内的某个消费者消费,以避免重复消费。消费者会记录自己消费到了哪个分区的哪个偏移量,以便出错恢复时继续消费。
Kafka集群依赖于Zookeeper来保存和管理集群元数据,例如主题、分区、副本、偏移量等信息。Kafka集群中的每个节点都是一个Broker,每个Broker可以容纳多个主题和分区。每个分区都有多个副本,其中一个为主副本(Leader),负责处理读写请求,其余为从副本(Follower),负责同步主副本的数据。当主副本发生故障时,会从从副本中选举出一个新的主副本。
topic 是逻辑上的概念,而 partition 是物理上的概念,每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。Producer 生产的数据会不断追加到该 log 文件末端,且每条数据都有自己的 offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费
列出现有的主题
[root@node1 ~]# kafka-topics.sh --list --zookeeper localhost:2181/myKafka
创建主题,该主题包含一个分区,该分区为Leader分区,它没有Follower分区副本。
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --create --topic topic_1 --partitions 1 --replication-factor 1
查看分区信息
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --list
查看指定主题的详细信息
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --describe --topic topic_1
删除指定主题
[root@node1 ~]# kafka-topics.sh --zookeeper localhost:2181/myKafka --delete --topic topic_1
开启生产者
[root@node1 ~]# kafka-console-producer.sh --topic topic_1 --broker-list localhost:9020
开启消费者
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1
开启消费者方式二,从头消费,不按照偏移量消费
[root@node1 ~]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic_1 --from-beginning