- RDD(Resilient Distributed Dataset)弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。它用于囊括所有内存中和磁盘中的分布式数据实体。
- RDD与数组的对比:
对比项 | 数组 | RDD |
---|
概念类型 | 数据结构实体 | 数据模型抽象 |
数据跨度 | 单机进程内 | 跨进程、跨计算节点 |
数据构成 | 数组元素 | 数据分片 |
数据定位 | 数据下标、索引 | 数据分片索引 |
RDD的特性
- RDD是有分区的:RDD分区是RDD数据存储的最小单位,一份RDD数据本质上是分成了多个分区。
- RDD所有的方法都会作用在其所有分区上。
- RDD之间是有依赖关系的。
- KV型的RDD可以有分区器(默认是Hash分区,也可以手动设置。这个特性不是绝对的)
- RDD分区规划会尽量靠近数据所在的服务器(避免过多的网络读取)。
RDD的属性
- Partitions:数据分片
每个阶段所产生的的具体实物。 - Partitioner:分片切割规则
按规则划分,数据分片的分布,是由RDD的Partitioner决定的。 - Dependencies:RDD 依赖
每个RDD都会通过Dependencies属性来记录它所依赖的前一个、或是前多个RDD,简称父RDD。 - Compute:转换函数
记录从父RDD到当前RDD的转换操作。
RDD的创建
并行化创建
- 概念:并行化创建是指将本地集合转向分布式RDD,这是分布式的开端。代码如下图所示:
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('test')
sc = SparkContext(conf = conf)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9], 3)
print(f"分区数是 {rdd.getNumPartitions()}")
print(f"RDD内容是 {rdd.collect()}")
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 获取RDD分区数:getNumPartitions API,获取RDD分区数量,返回值为INt数字
用法:rdd.getNumPartitions()
读取文件创建
- textFile API,这个API可以读取本地数据也可以读取HDFS,还包括网络存储数据。
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('test')
sc = SparkContext(conf = conf)
rdd_file = sc.textFile("../Data/input/words.txt")
print(f"rdd_file1分区数是 {rdd_file.getNumPartitions()}")
rdd_file1 = sc.textFile("../Data/input/words.txt", 3)
rdd_file2 = sc.textFile("../Data/input/words.txt", 100)
print(f"rdd_file1分区数是 {rdd_file1.getNumPartitions()}")
print(f"rdd_file2分区数是 {rdd_file2.getNumPartitions()}")
rdd_file3 = sc.textFile("hdfs://node1:8020/Test/WordCount.txt")
print(f"rdd_file3分区数是 {rdd_file3.getNumPartitions()}")
print(f"RDD内容是 {rdd_file.collect()}")
print(f"RDD内容是 {rdd_file1.collect()}")
print(f"RDD内容是 {rdd_file2.collect()}")
print(f"RDD内容是 {rdd_file3.collect()}")
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- wholeTextFile API,适合读取一堆小文件,代码如下:
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster('local[*]').setAppName('test')
sc = SparkContext(conf = conf)
rdd_files = sc.wholeTextFiles("../Data/input/tiny_files")
rdd_words = rdd_files.map(lambda x: x[1])
rdd_result = rdd_words.collect()
print(f"内容是:{rdd_result}")