弹性
- 存储的弹性:内存与磁盘的自动切换;
- 容错的弹性:数据丢失可以自动恢复;
- 计算的弹性:计算出错重试机制;
- 分片的弹性:可根据需要重新分片。
分布式:数据存储在大数据集群不同节点上
数据集:RDD 封装了计算逻辑,并不保存数据
数据抽象:RDD 是一个抽象类,需要子类具体实现
不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在 新的 RDD 里面封装计算逻辑
可分区、并行计算
在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。
- <dependencies>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.11</artifactId>
- <version>2.1.1</version>
- </dependency>
- </dependencies>
- <build>
- <finalName>SparkCoreTest</finalName>
- <plugins>
- <plugin>
- <groupId>net.alchim31.maven</groupId>
- <artifactId>scala-maven-plugin</artifactId>
- <version>3.4.6</version>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- <goal>testCompile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
代码:
- object RddTest {
- def main(args: Array[String]): Unit = {
- //创建SparkConfig
- val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
- //创建SparkContext
- val sc = new SparkContext(sparkConf)
- //创建RDD
- val list = List(1,3,5,7,9)
- //转RDD的第一种方式
- val rdd1:RDD[Int] = sc.parallelize(list)
- //转RDD的第二种方式
- val rdd2:RDD[Int] = sc.makeRDD(list)
- println(s"分区数:${rdd1.partitions.size}")//16
- println(s"分区数:${rdd2.partitions.size}")//16
- }
-
- }
运行结果:
- 22/06/24 19:35:26 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.17.1:49731 with 1918.2 MB RAM, BlockManagerId(driver, 192.168.17.1, 49731, None)
- 22/06/24 19:35:26 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.17.1, 49731, None)
- 22/06/24 19:35:26 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.17.1, 49731, None)
- 22/06/24 19:35:26 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@350b3a17{/metrics/json,null,AVAILABLE,@Spark}
- 分区数:16
- 分区数:16
- 22/06/24 19:35:26 INFO spark.SparkContext: Invoking stop() from shutdown hook
- 22/06/24 19:35:26 INFO server.ServerConnector: Stopped Spark@1c98290c{HTTP/1.1}{0.0.0.0:4040}
由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等
1)数据准备
在新建的SparkCoreTest1项目名称上右键=》新建input文件夹=》在input文件夹上右键=》分别新建1.txt和2.txt。每个文件里面准备一些word单词。
2)创建RDD
- import org.apache.spark.rdd.RDD
- import org.apache.spark.{SparkConf, SparkContext}
-
- object createrdd03_file {
-
- def main(args: Array[String]): Unit = {
-
- //1.创建SparkConf并设置App名称
- val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
-
- //2.创建SparkContext,该对象是提交Spark App的入口
- val sc: SparkContext = new SparkContext(conf)
-
- //3.读取文件。如果是集群路径:hdfs://hadoop102:9000/input
- val lineWordRdd: RDD[String] = sc.textFile("input")
-
- //4.打印
- lineWordRdd.foreach(println)
-
- //5.关闭
- sc.stop()
- }
- }
主要是通过一个RDD运算完后,再产生新的RDD。