• SparkStreaming—入门概述


    一、基本概念

    1.什么是SparkStreaming

    定义:

    Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。

    SparkStreaming 准实时(延迟时间:秒,分钟为单位),微批次(设置一段时间进行处理数据 如:3s、5s等)的数据处理框架。

    在这里插入图片描述

    和Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装

    Discretized Stream:

    Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。

    SparkStreaming特点:

    1. 易用
    2. 容错
    3. 易整合到SparkStreaming中

    2.快速入门

    案例说明:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并
    统计不同单词出现的次数

    netcat 工具解压后进入该文件夹,输入cmd,在cmd中输入:nc -lp 9999

    环境配置

    <dependency>
    	<groupId>org.apache.spark</groupId>
    	<artifactId>spark-streaming_2.12</artifactId>
    	<version>3.0.0</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    代码

    package com.bigdata.SparkStreaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    /**
     * @author wangbo
     * @version 1.0
     */
    object SparkStreaming_01_wordcount {
      def main(args: Array[String]): Unit = {
        //TODO 创建环境对象
        //StreamingContext创建时,需要传递两个参数
        //第一个参数表示环境配置
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        //第二个参数表示批量处理的周期(采集周期)
        val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
    
        //TODO 逻辑处理
        //获取端口数据
        //注意这里需要到 F:\尚硅谷-大数据\spark3.0\2.资料\netcat-win32-1.12 输入 cmd ,然后在cmd中输入 nc -lp 9999
        val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
        val wordToCount: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        wordToCount.print()
    
        /*
        由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
        如果main方法执行完毕,应用程序也会自动结束,所以不能让mian执行完毕
         */
    
        //启动采集器
        ssc.start()
        //等待采集器的关闭
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    3.DStream 创建

    DStream的创建有很多,主要有以下三种方式:

    1. RDD队列的方式
    2. 自定义数据源
    3. Kafka 数据源(面试、开发重点

    (1)RDD队列的方式

    用法及其说明:可以通过使用 ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理

    package com.bigdata.SparkStreaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.rdd.RDD
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    
    /**
     * @author wangbo
     * @version 1.0
     */
    object SparkStreaming_02_Queue {
      def main(args: Array[String]): Unit = {
        //TODO 创建环境对象
        //StreamingContext创建时,需要传递两个参数
        //第一个参数表示环境配置
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        //第二个参数表示批量处理的周期(采集周期)
        val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
    
        //3.创建 RDD 队列
        val rddQueue = new mutable.Queue[RDD[Int]]()
    
        //4.创建 QueueInputDStream
        val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
    
        //5.处理队列中的 RDD 数据
        val mappedStream = inputStream.map((_,1))
        val reducedStream = mappedStream.reduceByKey(_ + _)
    
        //6.打印结果
        reducedStream.print()
    
        //7.启动采集器
        ssc.start()
    
        //8.循环创建并向 RDD 队列中放入 RDD
        for (i <- 1 to 5) {
          rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
          Thread.sleep(2000)
        }
    
        //8.等待采集器的关闭
        ssc.awaitTermination()
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    (2)自定义数据源的方式

    用法及其说明:需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

    需求:自定义数据源,实现监控某个端口号,获取该端口号内容

    package com.bigdata.SparkStreaming
    
    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.dstream.ReceiverInputDStream
    import org.apache.spark.streaming.receiver.Receiver
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable
    import scala.util.Random
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 自定义数据源的方式创建DStream
     */
    object SparkStreaming_02_DIY {
      def main(args: Array[String]): Unit = {
        //TODO 创建环境对象
        //StreamingContext创建时,需要传递两个参数
        //第一个参数表示环境配置
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        //第二个参数表示批量处理的周期(采集周期)
        val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
    
        val messageDStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)
    
        messageDStream.print()
    
        //7.启动采集器
        ssc.start()
    
        //8.等待采集器的关闭
        ssc.awaitTermination()
      }
    
      /*
        自定义数据采集器
        1. 继承Receiver,定义泛型,传递参数
        2. 重写方法
       */
      class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
        private var flag = true
    
        override def onStart(): Unit = {
          new Thread(new Runnable {   //随机生成数
            override def run(): Unit = {
              while (true){
                val message: String = "采集的数据为:" + new Random().nextInt(10).toString
                store(message)
                Thread.sleep(500)
              }
            }
          })
    
        }
    
        override def onStop(): Unit = {
          flag = false
        }
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66

    (3)Kafka数据源的方式

    版本:DirectAPI 是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。

    需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台,Kafka 0-10 Direct 模式

    注意:zookeeper和kafka要启动

    添加依赖

    <dependency>
     <groupId>org.apache.spark</groupId>
     <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
     <version>3.0.0</version>
    </dependency>
    <dependency>
     <groupId>com.fasterxml.jackson.core</groupId>
     <artifactId>jackson-core</artifactId>
     <version>2.10.1</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    代码部分

    package com.bigdata.SparkStreaming
    
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.InputDStream
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.util.Random
    
    /**
     * @author wangbo
     * @version 1.0
     */
    
    /**
     * 自定义数据源的方式创建DStream
     */
    object SparkStreaming_04_Kafka {
      def main(args: Array[String]): Unit = {
        //TODO 创建环境对象
        //第一个参数表示环境配置
        val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
        //第二个参数表示批量处理的周期(采集周期)
        val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
    
        //定义kafka参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
            "hadoop100:9092,hadoop102:9092,hadoop103:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "kafka",
          "key.deserializer" ->
            "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" ->
            "org.apache.kafka.common.serialization.StringDeserializer"
        )
    
    
        val kafkaData: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara) //Set("kafka")主题为kafka,kafkaPara为kafka的配置
        )
    
        kafkaData.map(_.value()).print()
        
        //7.启动采集器
        ssc.start()
    
        //8.等待采集器的关闭
        ssc.awaitTermination()
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
  • 相关阅读:
    C# +.Net C/S架构,在二甲医院全面实际使用三年的LIS系统源码
    亳州市的自然风光与旅游资源:欣赏安徽省中部的壮丽景色
    信钰证券:6G概念强势拉升,通宇通讯、世嘉科技涨停,硕贝德等走高
    基于zk的分布式锁使用及原理分析
    无服务器学习01:基本概念+优点+面临的挑战
    IDENTITY_INSERT 设置为 OFF 时,不能为表 ‘t_user‘ 中的标识列插入显式值
    根据键名解析特定属性的值相关API
    iNFTnews | 元宇宙的欢乐世界:别开生面的游戏、音乐会、主题公园和电影
    计算机毕业设计ssm+vue+elementUI高校志愿者服务招募网站
    【无标题】
  • 原文地址:https://blog.csdn.net/weixin_44604159/article/details/127187947