• Spark Structured Streaming - 1


    1.Spark Streaming和Structured Streaming

    1.1 Spark Streaming时代

    RDD的API的流式工具, 其本质还是RDD, 存储和执行过程依然类似RDD

    在这里插入图片描述

    1.2 Structured Streaming时代

    是Dataset的API的流式工具,API和Dataset保持高度一致

    在这里插入图片描述

    在这里插入图片描述

    1.3 Spark Streaming和Structured Streaming
    1. 进步就类似于Dataset相比于RDD的进步
    2. Structured Streaming已经支持了连续流模型, 也就是类似于Flink那样的实时流, 而不是小批量, 但在使用的时候仍然有限制, 大部分情况还是应该采用小批量模式
    3. 在2.2.0以后Structured Streaming被标注为稳定版本, 意味着以后的Spark流式开发不应该在采用Spark Streaming了

    Spark Streaming:

    1. 基于微批,延迟高不能做到真正的实时

    2. DStream基于RDD,不直接支持SQL

    3. 批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)

    4. 不支持EventTime事件时间
      注:
      EventTime事件时间 :事件真正发生的事件
      PorcessingTime处理时间:事件被流系统处理的时间
      IngestionTime摄入时间:事件到底流系统的时间
      如: 一条错误日志10月1日,23:59:00秒产生的(事件时间),因为网路延迟,到10月2日 00:00:10到达日志处理系统(摄入时间),10月2日 00:00:20被流系统处理(处理时间)
      如果要统计10月1日的系统bug数量,那么SparkStreaming不能正确统计,因为它不支持事件时间

    5. 数据的Exactly-Once(恰好一次语义)需要手动实现
      注: 数据的一致性语义
      最多一次
      恰好一次–是我们的目标,SparkStreaming如果要实现恰好一次,需要手动维护偏移量+其他操作
      最少一次

    Structured Streaming:

    1. 编程模型: 动态表格/无界表
    2. 数据抽象: DataFrame/DataSet
    3. 与Spark Sql 无缝连接

    2.word Count

    object SocketWordCount {
      def main(args: Array[String]): Unit = {
        // 1. 创建SparkSession
        val spark = SparkSession.builder()
          .master("local[6]")
          .appName("socket word count")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("ERROR")
    
        import spark.implicits._
    
        // 2. 读取外部数据源,并转为Dataset[String]
        val source: Dataset[String] = spark.readStream
          .format("socket")
          .option("host", "192.168.88.100")
          .option("port", 7777)
          .load()
          .as[String]
    
        // 3. 统计词频
        val words = source.flatMap(_.split(" "))
          .map((_, 1))
          .groupByKey(_._1)
          .count()
    
        // 4. 输出结果
        words.writeStream
          .outputMode(OutputMode.Complete())   // 统计全局结果,而不是一个批次
          .format("console")
          .start()
          .awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    在这里插入图片描述
    在这里插入图片描述

    • Structured Streaming 依然是小批量的流处理
    • Structured Streaming 的输出是类似 DataFrame 的, 也具有 Schema, 所以也是针对结构化数据进行优化的
    • 从输出的时间特点上来看, 是一个批次先开始, 然后收集数据, 再进行展示, 这一点和 Spark Streaming 不太一样

    3.Structured Streaming的体系和结构

    3.1 无限扩展的表格

    Spark中的DS有两种:

    1. 处理静态批量数据的DS:使用read和write进行读写
    2. 处理动态实时流的DS:使用readStream和writeStream进行读写

    如何使用DS表示流式计算

    1. 可以把流式的数据想象成一个不断增长,无限无界的表
    2. 无论是否有界,全部都使用DS这一套的API,所以可以保障流和批的处理使用完全相同的代码

    在这里插入图片描述

    在这里插入图片描述

    3.2 体系结构

    StreamExecution

    StreamExecution在流上进行基于Dataset的查询, 也就是说,Dataset之所以能够在流上进行查询, 是因为StreamExecution的调度和管理。

    在这里插入图片描述
    增量查询:

    在这里插入图片描述

    1. Structured Streaming虽然从API角度上模拟出来的是一个无限扩展的表,但不能无限存储,并且历史数据中有很多是冗余的,所以要做增量存储。
    2. 所以这里设置了一个全局范围的高可用StateStore,这个时候针对增量的查询变为如下步骤:
      1)从StateStore中取出上次执行完成后的状态
      2)把上次执行的结果加入本批次,再进行计算,得出全局结果
      3)将当前批次的结果放入StateStore中,留待下次使用
  • 相关阅读:
    [极客大挑战 2019]FinalSQL
    RxJava介绍及基本原理
    Cesium冷知识:判断cesium是否使用webgl2
    Ubuntu 20.04.06 PCL C++学习记录(二十六)
    Java.lang.Compiler类之disable()方法具有什么功能呢?
    1333.餐厅过滤器【leetcode】【Java】
    Qt加载SVG矢量图片,放大缩小图片质量不发生变化。
    契约测试理论篇
    gcc和g++的爱恨纠葛
    Java:比较两个字符串是否相等
  • 原文地址:https://blog.csdn.net/qq_43141726/article/details/126623514