• 【SparkStreaming】概述、DS入门、DS创建


    SparkStreaming概述

    0 基本概念

    (1)离线和实时概念

    数据处理的延迟

    • 离线计算

    在计算开始前已知所有输入数据,输入数据不会产生变化,一般计算量级较大,计算时间也较长。例如今天早上一点,把昨天累积的日志,计算出所需结果。最经典的就是Hadoop的MapReduce方式;

    • 实时计算

    输入数据是可以以序列化的方式一个一个地输入并进行处理,也就是说在开始的时候并不需要知道所有的输入数据。与离线计算相比,运行时间短,计算量级相对较小。强调计算过程的时间要短,即所查当下给出结果。

    (2)批量和流式概念

    数据处理的方式

    • 批:处理离线数据,冷数据。单个处理数据量大,处理速度比流慢。
    • 流:在线,实时产生的数据。单次处理的数据量小,但处理速度更快。

    近年来,在Web应用、网络监控、传感监测等领域,兴起了一种新的数据密集型应用——流数据,即数据以大量、快速、时变的流形式持续到达。实例:PM2.5检测、电子商务网站用户点击流。

    流数据具有如下特征:

    • 数据快速持续到达,潜在大小也许是无穷无尽的
    • 数据来源众多,格式复杂
    • 数据量大,但是不十分关注存储,一旦经过处理,要么被丢弃,要么被归档存储

    注重数据的整体价值,不过分关注个别数据

    1 Spark Streaming是什么

    Spark流使得构建可扩展的容错流应用程序变得更加容易。

    Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。
    在这里插入图片描述

    在 Spark Streaming 中,处理数据的单位是一批而不是单条,而数据采集却是逐条进行的,因此 Spark
    Streaming 系统需要设置间隔使得数据汇总到一定的量后再一并操作,这个间隔就是批处理间隔。批处理间隔是Spark Streaming的核心概念和关键参数,它决定了Spark Streaming提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。
    在这里插入图片描述

    和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。DStreams可以由来自数据源的输入数据流来创建, 也可以通过在其他的DStreams上应用一些高阶操作来得到。所以简单来讲,DStream就是对RDD在实时数据处理场景的一种封装。

    2 Spark Streaming的特点

    • 易用

    在这里插入图片描述

    • 容错

    在这里插入图片描述

    • 易整合到Spark体系
      在这里插入图片描述

    • 缺点

      Spark Streaming是一种“微量批处理”架构, 和其他基于“一次处理一条记录”架构的系统相比, 它的延迟会相对高一些。

    3 Spark Streaming架构

    (1)架构图

    整体架构图

    在这里插入图片描述

    SparkStreaming架构图

    在这里插入图片描述

    (2)背压机制

    Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,这样虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。

    为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。

    通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

    二 Dstream入门

    1 WordCount案例实操

    需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

    (1)添加依赖

    
        org.apache.spark
        spark-streaming_2.12
        3.0.0
    
    

    (2)编写代码

    object StreamWordCount {
    
      def main(args: Array[String]): Unit = {
    
        //初始化Spark配置信息,Streaming程序执行至少需要两个线程(采集、执行)
        //不能设置为local
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
    
        //初始化SparkStreamingContext,程序执行入口对象
        //Seconds 底层调用 new Duration
        val ssc = new StreamingContext(sparkConf, Seconds(3))
    
        //通过监控端口创建DStream,读进来的数据为一行一行的
        val lineStreams = ssc.socketTextStream("hadoop101", 9999)
    
        //将每一行数据做切分,形成一个个单词
        val wordStreams = lineStreams.flatMap(_.split(" "))
    
        //将单词映射成元组(word,1)
        val wordAndOneStreams = wordStreams.map((_, 1))
    
        //将相同的单词次数做统计
        val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
    
        //打印
        wordAndCountStreams.print()
    
        //启动SparkStreamingContext采集器
        ssc.start()
          
        //默认情况下采集器不能关闭,等待采集结束之后,终止程序
        ssc.awaitTermination()
      }
    }
    

    (3)启动程序并通过netcat发送数据

    # 开启SecureCRT
    nc -lk 9999
    输入内容
    

    2 WordCount解析

    Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。

    在这里插入图片描述

    对数据的操作也是按照RDD为单位来进行的

    在这里插入图片描述

    计算过程由Spark Engine来完成

    在这里插入图片描述

    3 一些注意点

    • 一旦StreamingContext已经启动, 则不能再添加新的 streaming computations
    • 一旦一个StreamingContext已经停止(StreamingContext.stop()), 他也不能再重启
    • 在一个 JVM 内, 同一时间只能启动一个StreamingContext
    • stop() 的方式停止StreamingContext, 也会把SparkContext停掉. 如果仅仅想停止StreamingContext, 则应该这样: stop(false)
    • 一个SparkContext可以重用去创建多个StreamingContext, 前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉

    三 DStream创建

    1 RDD队列

    (1)用法及说明

    测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理

    (2)案例实操

    需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

    object RDDStream {
    
      def main(args: Array[String]) {
    
        //1.初始化Spark配置信息
        val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
    
        //2.初始化SparkStreaming上下文环境对象
        val ssc = new StreamingContext(conf, Seconds(4))
    
        //3.创建RDD队列
        val rddQueue = new mutable.Queue[RDD[Int]]()
    
        //4.创建QueueInputDStream,从队列中采集数据,获取DS
        val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
    
        //5.处理队列中的RDD数据
        val mappedStream = inputStream.map((_,1))
        val reducedStream = mappedStream.reduceByKey(_ + _)
    
        //6.打印结果
        reducedStream.print()
    
        //7.启动采集器
        ssc.start()
    
        //8.循环创建并向RDD队列中放入RDD
        for (i <- 1 to 5) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
          Thread.sleep(2000)
        }
    
        ssc.awaitTermination()
      }
    }
    

    结果展示

    -------------------------------------------
    Time: 1662277280000 ms
    -------------------------------------------
    (196,1)
    (296,1)
    (96,1)
    (52,1)
    (4,1)
    (180,1)
    (16,1)
    (156,1)
    (216,1)
    (28,1)
    ...
    
    -------------------------------------------
    Time: 1662277284000 ms
    -------------------------------------------
    (196,2)
    (296,2)
    (96,2)
    (52,2)
    (4,2)
    (180,2)
    (16,2)
    (156,2)
    (216,2)
    (28,2)
    ...
    
    -------------------------------------------
    Time: 1662277288000 ms
    -------------------------------------------
    (196,2)
    (296,2)
    (96,2)
    (52,2)
    (4,2)
    (180,2)
    (16,2)
    (156,2)
    (216,2)
    (28,2)
    ...
    
    -------------------------------------------
    Time: 1662277292000 ms
    -------------------------------------------
    

    2 自定义数据源

    (1)用法及说明

    需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集

    (2)案例实操

    需求:自定义数据源,实现监控某个端口号,获取该端口号内容

    自定义数据源
    //泛型表示读取数据的类型
    class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    
      //最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
      override def onStart(): Unit = {
        new Thread("Socket Receiver") {
          override def run() {
            receive()
          }
        }.start()
      }
    
      //读数据并将数据发送给Spark,真正处理接收数据的逻辑
      def receive(): Unit = {
    
        //创建一个Socket连接
        var socket: Socket = new Socket(host, port)
    
        //定义一个变量,用来接收端口传过来的数据
        var input: String = null
    
        //创建一个BufferedReader用于按行读取端口传来的数据
        //InputStreamReader 将字节流转换为字符流
        val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
    
        //读取一行数据
        input = reader.readLine()
    
        //当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
        while (!isStopped() && input != null) {
          store(input)
          input = reader.readLine()
        }
    
        //跳出循环则关闭资源
        reader.close()
        socket.close()
    
        //重启任务
        restart("restart")
      }
    
        override def onStop(): Unit = {
            if(socket != null ){
              socket.close()
              socket = null
            }
        }
    }
    
    使用自定义的数据源采集数据
    object FileStream {
    
      def main(args: Array[String]): Unit = {
    
        //1.初始化Spark配置信息
    val sparkConf = new SparkConf().setMaster("local[*]")
    .setAppName("StreamWordCount")
    
        //2.初始化SparkStreamingContext
        val ssc = new StreamingContext(sparkConf, Seconds(5))
    
        //3.创建自定义数据源创建receiver的Streaming
    val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop101", 9999))
    
        //4.将每一行数据做切分,形成一个个单词
        val wordStream = lineStream.flatMap(_.split("\t"))
    
        //5.将单词映射成元组(word,1)
        val wordAndOneStream = wordStream.map((_, 1))
    
        //6.将相同的单词次数做统计
        val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)
    
        //7.打印
        wordAndCountStream.print()
    
        //8.启动SparkStreamingContext
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    3 Kafka数据源

    (1)版本选型

    ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。默认情况下,offset维护在zk中。

    DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。默认情况下,offseet维护在checkpoint检查点,需要改变SparkStreamingContext的创建方式;也可以手动指定offset维护位置,为了保证数据的精准一致性,一般维护在有事务的存储上。

    在这里插入图片描述

    (2)Kafka 0-8 Receive模式

    需求

    通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。

    导入依赖
    
        org.apache.spark
        spark-streaming-kafka-0-8_2.11
        2.4.5
    
    
    编写代码
    // 通过ReciverAPI连接kafka数据源,获取数据
    object Spark04_ReceiverAPI {
    
      def main(args: Array[String]): Unit = {
    
        //1.创建SparkConf
        val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")
    
        //2.创建StreamingContext,第二个参数为采集周期
        val ssc = new StreamingContext(sparkConf, Seconds(3))
    
        //3.使用ReceiverAPI读取Kafka数据创建DStream
        val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(
          //Streaming Context
          ssc,
          //Zookeeper地址
          "hadoop101:2181,hadoop102:2181,hadoop103:2181",
          //groupid,消费者组
          "bigdata",
          //k表示主题的名字,v表示主题的分区数
          Map("mybak" -> 2))
    
        //4.计算WordCount并打印        new KafkaProducer[String,String]().send(new ProducerRecord[]())
        //获取kafka中的消息,只需要v的部分
        val lineDStream: DStream[String] = kafkaDStream.map(_._2)
        val word: DStream[String] = lineDStream.flatMap(_.split(" "))
        val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))
        val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)
        wordToCountDStream.print()
    
        //5.开启任务
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    生产数据
    # 启动Zookeeper
    zk start
    # 启动kafka
    kafka start
    # 查看所有主题
    bin/kafka-topics.sh --list --bootstrap-server hadoop101:9092
    # 创建一个和上述代码中相同的主题
    bin/kafka-topics.sh --create --bootstrap-server hadoop101:9092 --topic bigdata --partitions 2 --replication-factor 2
    # 生产者向topic发送消息
    bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic bigdata
    # 启动上述代码,查看是否可以连接到kafka,并且接收到生产者传来的消息
    # 发送的内容
    hello word
    hello scala
    hello java
    

    0-8 Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看,注意会存在一些延迟

  • 相关阅读:
    树的存储结构
    OpenCloudOS开源的操作系统
    基于vue-tianditu实现瓦片数据层添加
    Git Commit Message 规范实践
    vcruntime140.dll文件缺失,去哪下载vcruntime140.dll文件
    使用docker搭建mongodb
    JAVA爬虫系列
    【数字电路基础】进制转换:二进制、十进制、八进制、十六进制、反码、补码、原码
    Mysql基础 (二)
    创建线程并启动-创建线程的前三种方式
  • 原文地址:https://blog.csdn.net/weixin_43923463/article/details/126937503