• 【Spark | SparkStreaming】


    原理

    架构

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    实战

    RDD 队列

    val rddQueue = new mutable.QueueRDD[Int]

    自定义数据源

    用法及说明
    需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

    class CustomerReceiver(host: String, port: Int) extends 
    Receiver[String](StorageLevel.MEMORY_ONLY) {
     //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark
     override def onStart(): Unit = {
     new Thread("Socket Receiver") {
     override def run() {
     receive()
     }
     }.start()
     }
     //读数据并将数据发送给 Spark
     def receive(): Unit = {
     //创建一个 Socket
     var socket: Socket = new Socket(host, port)
     //定义一个变量,用来接收端口传过来的数据
     var input: String = null
     //创建一个 BufferedReader 用于读取端口传来的数据
     val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, 
    StandardCharsets.UTF_8))
     //读取数据
     input = reader.readLine()
     //当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark
     while (!isStopped() && input != null) {
     store(input)
     input = reader.readLine()
     }
     //跳出循环则关闭资源
     reader.close()
     socket.close()
     //重启任务
     restart("restart")
     }
     override def onStop(): Unit = {}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    使用自定义的数据源采集数据

    object FileStream {
     def main(args: Array[String]): Unit = {
     //1.初始化 Spark 配置信息
    val sparkConf = new SparkConf().setMaster("local[*]")
    .setAppName("StreamWordCount")
     //2.初始化 SparkStreamingContext
     val ssc = new StreamingContext(sparkConf, Seconds(5))
    //3.创建自定义 receiver 的 Streaming
    val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))
     //4.将每一行数据做切分,形成一个个单词
     val wordStream = lineStream.flatMap(_.split("\t"))
     //5.将单词映射成元组(word,1)
    val wordAndOneStream = wordStream.map((_, 1))
     //6.将相同的单词次数做统计
     val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)
     //7.打印
     wordAndCountStream.print()
     //8.启动 SparkStreamingContext
     ssc.start()
     ssc.awaitTermination()
     }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    kafka数据源

    Kafka 0-10 Direct 模式
    1)需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印
    到控制台。
    2)导入依赖

    <dependency>
     <groupId>org.apache.sparkgroupId>
     <artifactId>spark-streaming-kafka-0-10_2.12artifactId>
     <version>3.0.0version>
    dependency>
    <dependency>
     <groupId>com.fasterxml.jackson.coregroupId>
     <artifactId>jackson-coreartifactId>
     <version>2.10.1version>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    3)编写代码

    
    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, 
    LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    object DirectAPI {
     def main(args: Array[String]): Unit = {
     //1.创建 SparkConf
     val sparkConf: SparkConf = new 
    SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")
     //2.创建 StreamingContext
     val ssc = new StreamingContext(sparkConf, Seconds(3))
     //3.定义 Kafka 参数
     val kafkaPara: Map[String, Object] = Map[String, Object](
     ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> 
    "linux1:9092,linux2:9092,linux3:9092",
     ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
     "key.deserializer" -> 
    "org.apache.kafka.common.serialization.StringDeserializer",
     "value.deserializer" -> 
    "org.apache.kafka.common.serialization.StringDeserializer"
     )
     //4.读取 Kafka 数据创建 DStream
     val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = 
    KafkaUtils.createDirectStream[String, String](ssc,
     LocationStrategies.PreferConsistent,
     ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))
     //5.将每条消息的 KV 取出
     val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
     //6.计算 WordCount
     valueDStream.flatMap(_.split(" "))
     .map((_, 1))
     .reduceByKey(_ + _)
     .print()
     //7.开启任务
     ssc.start()
     ssc.awaitTermination()
     }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    查看 Kafka 消费进度

    bin/kafka-consumer-groups.sh --describe --bootstrap-server linux1:9092 --group 
    atguigu
    
    • 1
    • 2

    DStream 转换

    无状态转化操作

    无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。
    在这里插入图片描述

    Transform

    Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。

    join

    两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是
    对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

    有状态转化操作

    UpdateStateByKey

    针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
    updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
    updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

    1. 定义状态,状态可以是一个任意的数据类型。
    2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更
      新。
      使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态
    WindowOperations

    Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
    ➢ 窗口时长:计算内容的时间范围;

    Window 的操作
    (1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
    (2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
    (3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
    (4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。
    (5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。
    通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。

    DStream 输出

    输出操作如下:
    ➢ print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中,同样的操作叫 print()。
    ➢ saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。
    ➢ saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]“. Python中目前不可用。
    ➢ saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。
    ➢ foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将
    RDD 存入文件或者通过网络将其写入数据库。

    优雅关闭

    使用外部文件系统来控制内部程序关闭。

  • 相关阅读:
    C++面试100问!(三)
    redux-saga中间件
    【JS】复习和学习几个好用的js小知识
    Python代码中引用已经写好的模块、方法
    在HTML当中引入Vue控件,以element-ui为例
    【Python】Python中一些有趣的用法
    3900页手册415集视频426G资料迅为RK3568开发板
    判断是否工作在docker环境
    gradle7.0 打包插件发布到本地maven仓库
    解决gpedit.msc命令无法打开的问题
  • 原文地址:https://blog.csdn.net/pingyufeng/article/details/127667713