如果不设置reduce,系统会默认给定一个reduce,该reduce不进行任何数据处理,生成的文件数量等于MapTask的数量。如果不需要此默认reduce,可以将reduce数量设置为0.
conf.setNumReduceTasks(0)
当将jar程序提交到hadoop集群后,会向yarn发送资源请求,yarn分配container资源后,在指定机器上启动AM,driver也运行在该台机器。然后对driver程序进行解析执行,先根据InputPath读取输入文件,然后根据文件大小,以及hdfs块大小、设定的最大、最小块大小确定划分,划分完成一块分片对应于一个MapTask。然后根据本地化原则,在最优的机器上启动对应的MapTask,MapTask根据分配的FileSplit信息,去hdfs读取对应于当前MapTask的数据。然后逐行调用map函数进行处理。
map函数每处理完成一个数据,通过context收集结果。数据会先缓冲到kvbuffer中,当到达kvbuffer的设定比例时,开始触发溢写操作,先对kvbuffer进行分区、排序,都写入一个文件中,然后将同一分区放在连续的空间上,并使用一个单独的索引文件记录每个分区在文件中的起始和终止位置,方便在MapTask完成所有任务后进行多个spill溢写文件分区合并。当MapTask完成所有任务后,就会生成n个文件,n是触发溢写的次数,每个文件包含多个分区的数据,为了进行后续的reduce阶段,需要将这些文件的数据按照分区重新整理到对应的文件中,每个文件发往对应的结点,给reduce使用。由于分区内是有序的,只需要根据之前存储的索引文件查询对应分区,然后进行归并排序。
Reduce在某个MapTask完成计算后,就开始拉取其计算结果到对应结点,此过程就是Reduce Read。从多个MapTask上读取的数据需要合并到一起,再逐个传递给reduce函数进行处理,同一个key值的数据,会被同一个reduce函数进行处理,再通过context收集每个key处理的结果。
由于计算需要对应的数据和计算逻辑,可以通过移动计算的逻辑到对应结点,避免数据在节点之间的传输。
数据本地性有几个级别:
1当mapTask和数据在同一个结点时,称为本地级别的数据本地性,是最好的场景。
2机架内的数据本地性。由于本地级别并不是总能实现,数据迁移是不可避免的。在数据迁移时,尽量让数据和计算在同一个机架上。
3跨机架,是最差的情况。
内存缓冲区同时存储了键值对数据及其索引数据kvMeta,kvMeta是一个四元组,(key起始位置,value起始位置,value长度,partition),两者在圆形缓冲区内对向增长,当完成一次溢写。再取两者索引的终点为分割线,并掉头增长。partition、sort过程本质是对索引进行处理。
如果设置了Combine类,就会在spill之前进行combine操作,将同key值的数据提前进行规约,减少数据的条数。
1先请求NameNode写入文件,NameNode检查写入权限等合法性,如果合法就将操作的元信息写入缓存,当触发刷盘时写入edit_log文件。
2NN返回同意请求及可用的DN地址列表,
3client按照block大小切分文件,将切分好的数据与DN地址列表一起发送给最近的DN,以packet=64kb发送给DN。由该DN与地址列表中的多个DN形成pipline管道,此后,client每向第一个DN写入一个packet,DN就会将这个packet传递个后续的DN,并逐级返回ack信息。
4写入完毕,关闭连接。发送完成信号给NN。
调优主要从优化rdd 优化,内存优化方面,参数调优。
资源参数调优
–driver-mrmory
–num-executors
–executor-memory
–executor-cores
1持久化策略达到性能最优:如果一个rdd的计算时间很长或很复杂,都将这个RDD保存到HDFS上,这样更加安全。
2使用shuffle时,尽量使用带combiner的算子,例如reduceByKey,aggregateByKey,combineByKey,减少map spill磁盘的数据量,以及reduce copy的数据量。
3优化小表join:使用map join。
4使用kryo序列化减少序列化对象空间
5数据本地化:合适的本地化等待时间,可能影响执行时间。
1.spark.memory.fraction,存储内存和运行内存的比例
2.堆外内存,spark底层shuffle使用netty,netty会使用堆外内存
1.重分区 : repartition , coalesce
2.byKey: reduceByKey,aggregateByKey,combineByKey sortByKey
3.两表join、集合操作:intersection,subtract,join
left join,right join,inner join,full outer join,semi join,cross join
聚合:sum,count()
排序:row_number,dense_rank(),rank()
取行:lead(),lag()
over(partition by order by row between 2 rows precedding and 3 rows following)
笛卡尔积不判断join条件,是左边任意一个与右边所有相乘。