• Spark Core之RDD


    一、什么是 RDD

    弹性

    • 存储的弹性:内存与磁盘的自动切换;
    • 容错的弹性:数据丢失可以自动恢复;
    • 计算的弹性:计算出错重试机制;
    • 分片的弹性:可根据需要重新分片。

    分布式:数据存储在大数据集群不同节点上

    数据集:RDD 封装了计算逻辑,并不保存数据

    数据抽象:RDD 是一个抽象类,需要子类具体实现

    不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在 新的 RDD 里面封装计算逻辑

    可分区、并行计算

    二、创建RDD

     在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD、从外部存储创建RDD、从其他RDD创建。

    创建maven项目

    编写pom.xml

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.spark</groupId>
    4. <artifactId>spark-core_2.11</artifactId>
    5. <version>2.1.1</version>
    6. </dependency>
    7. </dependencies>
    8. <build>
    9. <finalName>SparkCoreTest</finalName>
    10. <plugins>
    11. <plugin>
    12. <groupId>net.alchim31.maven</groupId>
    13. <artifactId>scala-maven-plugin</artifactId>
    14. <version>3.4.6</version>
    15. <executions>
    16. <execution>
    17. <goals>
    18. <goal>compile</goal>
    19. <goal>testCompile</goal>
    20. </goals>
    21. </execution>
    22. </executions>
    23. </plugin>
    24. </plugins>
    25. </build>

    1.从集合中创建

    代码:

    1. object RddTest {
    2. def main(args: Array[String]): Unit = {
    3. //创建SparkConfig
    4. val sparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
    5. //创建SparkContext
    6. val sc = new SparkContext(sparkConf)
    7. //创建RDD
    8. val list = List(1,3,5,7,9)
    9. //转RDD的第一种方式
    10. val rdd1:RDD[Int] = sc.parallelize(list)
    11. //转RDD的第二种方式
    12. val rdd2:RDD[Int] = sc.makeRDD(list)
    13. println(s"分区数:${rdd1.partitions.size}")//16
    14. println(s"分区数:${rdd2.partitions.size}")//16
    15. }
    16. }

    运行结果:

    1. 22/06/24 19:35:26 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.17.1:49731 with 1918.2 MB RAM, BlockManagerId(driver, 192.168.17.1, 49731, None)
    2. 22/06/24 19:35:26 INFO storage.BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.17.1, 49731, None)
    3. 22/06/24 19:35:26 INFO storage.BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.17.1, 49731, None)
    4. 22/06/24 19:35:26 INFO handler.ContextHandler: Started o.s.j.s.ServletContextHandler@350b3a17{/metrics/json,null,AVAILABLE,@Spark}
    5. 分区数:16
    6. 分区数:16
    7. 22/06/24 19:35:26 INFO spark.SparkContext: Invoking stop() from shutdown hook
    8. 22/06/24 19:35:26 INFO server.ServerConnector: Stopped Spark@1c98290c{HTTP/1.1}{0.0.0.0:4040}

    2.从外部存储系统的数据集创建

    由外部存储系统的数据集创建RDD包括:本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、HBase等
    1)数据准备
    在新建的SparkCoreTest1项目名称上右键=》新建input文件夹=》在input文件夹上右键=》分别新建1.txt和2.txt。每个文件里面准备一些word单词。
    2)创建RDD

    1. import org.apache.spark.rdd.RDD
    2. import org.apache.spark.{SparkConf, SparkContext}
    3. object createrdd03_file {
    4. def main(args: Array[String]): Unit = {
    5. //1.创建SparkConf并设置App名称
    6. val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    7. //2.创建SparkContext,该对象是提交Spark App的入口
    8. val sc: SparkContext = new SparkContext(conf)
    9. //3.读取文件。如果是集群路径:hdfs://hadoop102:9000/input
    10. val lineWordRdd: RDD[String] = sc.textFile("input")
    11. //4.打印
    12. lineWordRdd.foreach(println)
    13. //5.关闭
    14. sc.stop()
    15. }
    16. }

    3.从其他RDD创建

    主要是通过一个RDD运算完后,再产生新的RDD。

  • 相关阅读:
    网页html产生随机MAC地址
    mulesoft Module 2 quiz 解析
    高防服务器如何对异常流量进行识别
    Maven进阶
    基于SSM的农产品仓库管理系统设计与实现
    【matplotlib基础】--几何图形
    java---卡特兰数---满足条件的01序列(每日一道算法2022.9.29)
    深究数据库E-R模型设计
    在 TIME_WAIT 状态的 TCP 连接,收到 SYN 后会发生什么?
    【C++】匿名对象 ② ( 将 “ 匿名对象 “ 初始化给变量 | 将 “ 匿名对象 “ 赋值给变量 )
  • 原文地址:https://blog.csdn.net/m0_55834564/article/details/125451131