• Spark RDD弹性分布式数据集


    目录

           RDD简介

    一、RDD的定义

    二、RDD的5大特征

           RDD的创建方式

    一、 从文件系统加载数据创建RDD

    二、从HDFS中加载数据创建RDD

    三、通过并行集合创建RDD


     

    MapReduce VS RDD:

    传统的MapReduce虽然具有自动容错、平衡负载可拓展性强的优点,但最大的缺点采用非循环式的数据流模型,使得在迭代计算时要进行大量的磁盘I/O操作

     

    RDD是Spark提供的最重要的抽象概念,可以将RDD理解成一个分布式存储在集群中的大型数据集合不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提供了数据处理的速度和性能。

     

           RDD简介

    一、RDD的定义

    RDD(Resilient Distributted Dataset,弹性分布式数据集),是一个容错的、并行的数据结构,可以让用户显式的将数据存储到磁盘和内存中,并且含能够控制数据的分区。对于迭代式计算和交互式数据挖掘,RDD可以将中间计算的数据结果保存在内存中,需要计算时,则可以直接从内存中读取,从而极大地提高计算速度。

     

    二、RDD的5大特征

    1.分区列表:每个RDD被分为多个分区,这些分区运行在集群中的不同节点,每个分区都会被一个计算任务处理,分区数决定了并行计算的数量,创建RDD时可以指定RDD分区的个数。

     

    2.每个分区都有一个计算函数:Spark的RDD计算函数是以分片为基本单位的,每个RDD都会实现compute函数,对具体的分片进行计算。

     

    3.依赖于其他RDD:RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过对这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

     

    4.(Key,Value)数据类型的RDD分区器:当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于(Key,Value)的RDD,才会有Partitioner(分区),非(Key,Value)的RDD的Partitioner(分区)的值为None。

     

    5.每个分区都有一个优先位置列表:优先位置列表会存储每个Partitioner的优先位置,对于一个HDFS文件来说,就是每个Partitioner块的位置。按照”移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到要处理数据块的存储位置。

           RDD的创建方式

    Spark提供2种创建RDD的方式,分别是从文件系统(本地和HDFS)中加载数据创建RDD和通过并行集合创建RDD。

    一、 从文件系统加载数据创建RDD

    Spark可以从Hadoop支持的任何存储源中加载数据去创建RDD,包括本地文件系统和HDFS等文件系统。

    通过Spark中的SparkContext对象调用textFile()方法加载数据从创建RDD。

     

    ①从Linux本地文件系统加载数据创建RDD

    1. 在Liunx本地系统创建一个test.txt文件

    创建test.txt文件:touch test.txt

    并对test.txt写入数据:vi test.txt

    d7effc3dad44459b9cdca018fe8d3035.png

     

     

    1. 对test.txt写入如下内容
    1. 1 hadoop spark
    2. 2 itcast heima
    3. 3 scala spark
    4. 4 spark itcast
    5. 5 itcast hadoop

    注:中间用空格分隔!!!

     

    1. 启动spark集群服务

     

    启动所有服务:start-all.sh

    启动zookeeper服务,slave1和slave2也需要启动

    zkServer.sh start

    23ed2bb395744bdd97ce352606e3dca1.png

    e1fec64e749141dc888730cf79956a9a.png

    d9ee87aa744445b1800843e9fd86556a.png

     

     

     

    启动Spark集群服务(进入到spark目录下启动)

    cd /opt/module/spark/

    sbin/start-all.sh

    93d84786c6b9462e852be7b4700a0e1d.png

     

     

    1. 启动Spark-Shell交互页面

    bin/spark-shell --master local[2]    #local[2]:在本地开启2个工作线程

    ceabb050879548519d48213cd7d28638.png

     

    1. 在linux本地系统读取test.txt文件数据创建RDD

    val test = sc.textFile(“file:///opt/test.txt”)

    注:此处的路径为test.txt创建的路径

    84fc1bbc8007494c8f7710339935db40.png

     

    二、从HDFS中加载数据创建RDD

    新建一个master窗口如下

    e70c80d6f9d94f84b934e66896c9087b.png

     

     

    在HDFS上创建/data目录,并将test.txt文件上传至/data目录下

    切换到hadoop用户:su hadoop

    创建/data目录:hadoop fs -mkdir /data

    切换到test.txt目录/opt下进行上传:hadoop fs -put test.txt /data

    1c0bdbf64f384ea98e0f9d7b5db64380.png

     

    回到Spark-Shell交互界面下:bin/spark-shell --master local[2]

    加载HDFS中的数据创建RDD:val testRDD = sc.textFile(‘/data/test.txt’)

    注:testFile()中的参数,路径没有指定file时,则默认为HDFS路径

    5d42c695cfb3433390343f610e20bed6.png

     

    三、通过并行集合创建RDD

    要创建RDD,需要先创建一个数组:val array = Array(1,2,3,4,5)

    执行parallelize()方法实现:val arrRDD = sc.parallelize(array)

    c2411d7c55624afc841312e17315397d.png

     

     

     

  • 相关阅读:
    Collectors类作用:
    线程池指令系统
    IO流原理及流的分类
    微服务06-Dockerfile自定义镜像+DockerCompose部署多个镜像
    安装Milvus的可视化工具Attu教程
    python安装第三方模块方法
    南卡和FIIL蓝牙耳机深度大pk,南卡和FIIL蓝牙耳机哪个更好?
    进程和线程的区别&&run和start区别与联系
    【随笔】Git 高级篇 -- 分离 HEAD(十一)
    算法-递推
  • 原文地址:https://blog.csdn.net/m0_57781407/article/details/126328639