• 操作 Structured Streaming


    可以在streaming DataFrames/Datasets上应用各种操作.

    主要分两种:

    1. 直接执行 sql
    2. 特定类型的 api(DSL)

    基本操作

    Most of the common operations on DataFrame/Dataset are supported for streaming. 在 DF/DS 上大多数通用操作都支持作用在 Streaming DataFrame/Streaming DataSet 上

    一会要处理的数据 people.json 内容:

    {"name": "Michael","age": 29,"sex": "female"}
    {"name": "Andy","age": 30,"sex": "male"}
    {"name": "Justin","age": 19,"sex": "male"}
    {"name": "Lisi","age": 18,"sex": "male"}
    {"name": "zs","age": 10,"sex": "female"}
    {"name": "zhiling","age": 40,"sex": "female"}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    1. 弱类型 api

    import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object BasicOperation {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("BasicOperation")
                .getOrCreate()
            val peopleSchema: StructType = new StructType()
                .add("name", StringType)
                .add("age", LongType)
                .add("sex", StringType)
            val peopleDF: DataFrame = spark.readStream
                .schema(peopleSchema)
                .json("/Users/lzc/Desktop/data")
    
    
            val df: DataFrame = peopleDF.select("name","age", "sex").where("age > 20") // 弱类型 api
            df.writeStream
                .outputMode("append")
                .format("console")
                .start
                .awaitTermination()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    2. 强类型 api

    import org.apache.spark.sql.types.{LongType, StringType, StructType}
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    object BasicOperation2 {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("BasicOperation")
                .getOrCreate()
            import spark.implicits._
    
            val peopleSchema: StructType = new StructType()
                .add("name", StringType)
                .add("age", LongType)
                .add("sex", StringType)
            val peopleDF: DataFrame = spark.readStream
                .schema(peopleSchema)
                .json("/Users/lzc/Desktop/data")
    
            val peopleDS: Dataset[People] = peopleDF.as[People] // 转成 ds
    
    
            val df: Dataset[String] = peopleDS.filter(_.age > 20).map(_.name)
            df.writeStream
                .outputMode("append")
                .format("console")
                .start
                .awaitTermination()
    
    
        }
    }
    
    case class People(name: String, age: Long, sex: String)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    3. 直接执行 sql(重要)

    import org.apache.spark.sql.types.{LongType, StringType, StructType}
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    
    object BasicOperation3 {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("BasicOperation")
                .getOrCreate()
            import spark.implicits._
    
            val peopleSchema: StructType = new StructType()
                .add("name", StringType)
                .add("age", LongType)
                .add("sex", StringType)
            val peopleDF: DataFrame = spark.readStream
                .schema(peopleSchema)
                .json("/Users/lzc/Desktop/data")
    
            peopleDF.createOrReplaceTempView("people") // 创建临时表
            val df: DataFrame = spark.sql("select * from people where age > 20")
    
            df.writeStream
                .outputMode("append")
                .format("console")
                .start
                .awaitTermination()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    基于 event-time 的窗口操作

    event-time 窗口理解

    在 Structured Streaming 中, 可以按照事件发生时的时间对数据进行聚合操作, 即基于 event-time 进行操作.

    在这种机制下, 即不必考虑 Spark 陆续接收事件的顺序是否与事件发生的顺序一致, 也不必考虑事件到达 Spark 的时间与事件发生时间的关系.

    因此, 它在提高数据处理精度的同时, 大大减少了开发者的工作量.

    我们现在想计算 10 分钟内的单词, 每 5 分钟更新一次, 也就是说在 10 分钟窗口 12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20等之间收到的单词量. 注意, 12:00 - 12:10 表示数据在 12:00 之后 12:10 之前到达.

    现在,考虑一下在 12:07 收到的单词。单词应该增加对应于两个窗口12:00 - 12:10和12:05 - 12:15的计数。因此,计数将由分组键(即单词)和窗口(可以从事件时间计算)索引。

    import java.sql.Timestamp
    
    import org.apache.spark.sql.streaming.StreamingQuery
    import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
    
    object WordCountWindow {
        def main(args: Array[String]): Unit = {
    
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("WordCount1")
                .getOrCreate()
    
            import spark.implicits._
            val lines: DataFrame = spark.readStream
                .format("socket") // 设置数据源
                .option("host", "localhost")
                .option("port", 10000)
                .option("includeTimestamp", true) // 给产生的数据自动添加时间戳
                .load
    
            // 把行切割成单词, 保留时间戳
            val words: DataFrame = lines.as[(String, Timestamp)]
            				.flatMap(line => {
                				line._1.split("\\W+").map((_, line._2))  // \\W+非单词字符
           					})
            				.toDF("word", "timestamp")
            // window函数在下面包中
            import org.apache.spark.sql.functions._
    
            // 按照窗口和单词分组, 并且计算每组的单词的个数
            val wordCounts: Dataset[Row] = words.groupBy(
                /** 调用 window 函数, 返回的是一个 Column 
                    参数 1: df 中表示时间戳的列 $符取列名
                    参数 2: 窗口长度 
                    参数 3: 滑动步长
                */
                window($"timestamp", "10 minutes", "5 minutes"),
                $"word"
            ).count().orderBy($"window")  // 计数, 并按照窗口排序 或者sort("window")
    
            val query: StreamingQuery = wordCounts.writeStream
                .outputMode("complete")
                .format("console")
                .option("truncate", "false")  // 不截断.为了在控制台能看到完整信息, 最好设置为 false
                .start
            query.awaitTermination()
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    由此可以看出, 在这种窗口机制下, 无论事件何时到达, 以怎样的顺序到达, Structured Streaming 总会根据事件时间生成对应的若干个时间窗口, 然后按照指定的规则聚合.

    event-time 窗口生成规则

    org.apache.spark.sql.catalyst.analysis.TimeWindowing
    
    // 窗口个数
    /* 最大的窗口数=向上取整(窗口长度/滑动步长)*/
    maxNumOverlapping = ceil(windowDuration / slideDuration) 
    for (i <- 0 until maxNumOverlapping)
       /**
          timestamp是event-time 传进的时间戳
          startTime是window窗口参数,默认是0 second 从时间的0s
          含义:event-time从1970年...有多少个滑动步长,如果说浮点数会向上取整
       */
       windowId <- ceil((timestamp - startTime) / slideDuration)
       /**
          windowId * slideDuration  向上取能整除滑动步长的时间
          (i - maxNumOverlapping) * slideDuration 每一个窗口开始时间相差一个步长
        */
       windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
       windowEnd <- windowStart + windowDuration
       return windowStart, windowEnd
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    [将event-time向上取能整除滑动步长的时间减去最大窗口数成×滑动步长] 作为"初始窗口"的开始时间, 然后按照窗口滑动宽度逐渐向时间轴前方推进, 直到某个窗口不再包含该 event-time 为止. 最终以"初始窗口"与"结束窗口"之间的若干个窗口作为最终生成的 event-time 的时间窗口.

    每个窗口的起始时间与结束时间都是**前闭后开的区间**, 因此初始窗口和结束窗口都不会包含 event-time, 最终不会被使用.

    窗口推算
    
    window($"timestamp", "10 minutes", "5 minutes")
    2020-04-23 09:50:00,hello
    [40:00-50:00) // 无效窗口
    [45:00-55:00)
    [50:00-00:00)
    
    2020-04-23 09:49:00,hello
    [40:00-50:00) 
    [45:00-55:00)
    
    window($"timestamp", "10 minutes", "3 minutes")
    2020-04-23 09:50:00,hello
    [39:00-49:00) // 无效窗口
    [42:00-52:00) 
    [45:00-55:00)
    [48:00-58:00)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
  • 相关阅读:
    Node.js -- path模块
    JZ11 旋转数组的最小数字
    搭建和mybatis-plus官网一样主题的网站(cos+宝塔+vercel)
    正向代理和反向代理快速理解
    神经网络应用场景——自然语言处理
    文件解析工具
    5 运算符、表达式和语句
    Simulink 自动代码生成电机控制:Keil工程转到CubeIDE相关问题(2/2)
    数字资产革命:Web3带来的新商业机会
    图解LeetCode——640. 求解方程(难度:中等)
  • 原文地址:https://blog.csdn.net/wangxw1803/article/details/127457597