• Structured Streaming join 操作


    join 操作

    Structured Streaming 支持 streaming DataSet/DataFrame 与静态的DataSet/DataFrame 进行 join, 也支持 streaming DataSet/DataFrame与另外一个streaming DataSet/DataFrame 进行 join.

    join 的结果也是持续不断的生成, 类似于前面学习的 streaming 的聚合结果.

    Stream-static Joins

    模拟的静态数据:

    lisi,male
    zhiling,female
    zs,male
    
    • 1
    • 2
    • 3

    模拟的流式数据:

    lisi,20
    zhiling,40
    ww,30
    
    • 1
    • 2
    • 3
    内连接
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    object StreamingStatic {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("StreamingStatic")
                .getOrCreate()
            import spark.implicits._
    
            // 1. 静态 df
            val arr = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));
            var staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")
    
            // 2. 流式 df
            val lines: DataFrame = spark.readStream
                .format("socket")
                .option("host", "localhost")
                .option("port", 10000)
                .load()
            val streamDF: DataFrame = lines.as[String].map(line => {
                val arr = line.split(",")
                (arr(0), arr(1).toInt)
            }).toDF("name", "age")
    
            // 3. join   等值内连接  a.name=b.name
            val joinResult: DataFrame = streamDF.join(staticDF, "name")  // 或者传seq("name")
    
            // 4. 输出
            joinResult.writeStream
                .outputMode("append")
                .format("console")
                .start
                .awaitTermination()
    
        }
    }
    +-------+---+------+
    |   name|age|   sex|
    +-------+---+------+
    |zhiling| 40|female|
    |   lisi| 20|  male|
    +-------+---+------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    外连接
    val joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left") // 流在那边写那边
    +-------+---+------+
    |   name|age|   sex|
    +-------+---+------+
    |zhiling| 40|female|
    |     ww| 30|  null|
    |   lisi| 20|  male|
    +-------+---+------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    Stream-stream Joins

    在 Spark2.3, 开始支持 stream-stream join.

    Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join 操作, 但这会导致状态无限增长. 因此, 在对两个流进行 join 操作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长.

    inner join

    对 2 个流式数据进行 join 操作. 输出模式仅支持append模式

    第 1 个数据格式: 姓名,年龄,事件时间

    lisi,female,2019-09-16 11:50:00
    zs,male,2019-09-16 11:51:00
    ww,female,2019-09-16 11:52:00
    zhiling,female,2019-09-16 11:53:00
    fengjie,female,2019-09-16 11:54:00
    yifei,female,2019-09-16 11:55:00
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    第 2 个数据格式: 姓名,性别,事件时间

    lisi,18,2019-09-16 11:50:00
    zs,19,2019-09-16 11:51:00
    ww,20,2019-09-16 11:52:00
    zhiling,22,2019-09-16 11:53:00
    yifei,30,2019-09-16 11:54:00
    fengjie,98,2019-09-16 11:55:00
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    不带 watermark的 inner join
    import java.sql.Timestamp
    
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
      * 
      * Date 2019/8/16 5:09 PM
      */
    object StreamStream1 {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("StreamStream1")
                .getOrCreate()
            import spark.implicits._
    
            // 第 1 个 stream
            val nameSexStream: DataFrame = spark.readStream
                .format("socket")
                .option("host", "hadoop201")
                .option("port", 10000)
                .load
                .as[String]
                .map(line => {
                    val arr: Array[String] = line.split(",")
                    (arr(0), arr(1), Timestamp.valueOf(arr(2)))
                }).toDF("name", "sex", "ts1")
    
            // 第 2 个 stream
            val nameAgeStream: DataFrame = spark.readStream
                .format("socket")
                .option("host", "hadoop201")
                .option("port", 20000)
                .load
                .as[String]
                .map(line => {
                    val arr: Array[String] = line.split(",")
                    (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
                }).toDF("name", "age", "ts2")
    
    
            // join 操作
            val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")
    
            joinResult.writeStream
                .outputMode("append")
                .format("console")
                .trigger(Trigger.ProcessingTime(0))
                .start()
                .awaitTermination()
        }
    }
    +-------+------+-------------------+---+-------------------+
    |   name|   sex|                ts1|age|                ts2|
    +-------+------+-------------------+---+-------------------+
    |zhiling|female|2019-09-16 11:53:00| 22|2019-09-16 11:53:00|
    |     ww|female|2019-09-16 11:52:00| 20|2019-09-16 11:52:00|
    |  yifei|female|2019-09-16 11:55:00| 30|2019-09-16 11:54:00|
    |     zs|  male|2019-09-16 11:51:00| 19|2019-09-16 11:51:00|
    |fengjie|female|2019-09-16 11:54:00| 98|2019-09-16 11:55:00|
    |   lisi|female|2019-09-16 11:50:00| 18|2019-09-16 11:50:00|
    +-------+------+-------------------+---+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64

    join 的速度很慢, 需要等待.

    带 watermark的 inner join
    import java.sql.Timestamp
    
    import org.apache.spark.sql.streaming.Trigger
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._
    /**
      * 
      * Date 2019/8/16 5:09 PM
      */
    object StreamStream2 {
        def main(args: Array[String]): Unit = {
            val spark: SparkSession = SparkSession
                .builder()
                .master("local[*]")
                .appName("StreamStream1")
                .getOrCreate()
            import spark.implicits._
    
            // 第 1 个 stream
            val nameSexStream: DataFrame = spark.readStream
                .format("socket")
                .option("host", "hadoop201")
                .option("port", 10000)
                .load
                .as[String]
                .map(line => {
                    val arr: Array[String] = line.split(",")
                    (arr(0), arr(1), Timestamp.valueOf(arr(2)))
                }).toDF("name1", "sex", "ts1")
                .withWatermark("ts1", "2 minutes")
    
            // 第 2 个 stream
            val nameAgeStream: DataFrame = spark.readStream
                .format("socket")
                .option("host", "hadoop201")
                .option("port", 20000)
                .load
                .as[String]
                .map(line => {
                    val arr: Array[String] = line.split(",")
                    (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
                }).toDF("name2", "age", "ts2")
                .withWatermark("ts2", "1 minutes") 
    
    
            // join 操作
            val joinResult: DataFrame = nameSexStream.join(
                nameAgeStream,
                expr(
                    """
                      |name1=name2 and
                      |ts2 >= ts1 and
                      |ts2 <= ts1 + interval 1 minutes
                    """.stripMargin))
    
            joinResult.writeStream
                .outputMode("append")
                .format("console")
                .trigger(Trigger.ProcessingTime(0))
                .start()
                .awaitTermination()
        }
    }
    +-------+------+-------------------+-------+---+-------------------+
    |  name1|   sex|                ts1|  name2|age|                ts2|
    +-------+------+-------------------+-------+---+-------------------+
    |zhiling|female|2019-09-16 11:53:00|zhiling| 22|2019-09-16 11:53:00|
    |     ww|female|2019-09-16 11:52:00|     ww| 20|2019-09-16 11:52:00|
    |     zs|  male|2019-09-16 11:51:00|     zs| 19|2019-09-16 11:51:00|
    |fengjie|female|2019-09-16 11:54:00|fengjie| 98|2019-09-16 11:55:00|
    |   lisi|female|2019-09-16 11:50:00|   lisi| 18|2019-09-16 11:50:00|
    +-------+------+-------------------+-------+---+-------------------+
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    outer join

    外连接必须使用 watermast

    和你连接相比, 代码几乎一致, 只需要在连接的时候指定下连接类型即可:joinType = "left_join"

    val joinResult: DataFrame = nameSexStream.join(
                nameAgeStream,
                expr(
                    // 连接条件
                    """
                      |name1=name2 and
                      |ts2 >= ts1 and
                      |ts2 <= ts1 + interval 1 minutes
                    """.stripMargin),
                joinType = "left_join")
    
    Streaming DF/DS 不支持的操作
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    到目前, DF/DS 的有些操作 Streaming DF/DS 还不支持.

    1. 多个Streaming 聚合(例如在 DF 上的聚合链)目前还不支持
    2. limit 和取前 N 行 updatem模式还不支持
    3. distinct 也不支持
    4. 仅仅支持对 complete 模式下的聚合操作进行排序操作
    5. 仅支持有限的外连接
    6. 有些方法不能直接用于查询和返回结果, 因为他们用在流式数据上没有意义.
      • count() 不能返回单行数据, 必须是s.groupBy().count()
      • foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)
      • show() 不能直接使用, 而是使用 console sink

    如果执行上面操作会看到这样的异常: operation XYZ is not supported with streaming DataFrames/Datasets.

  • 相关阅读:
    # Web server failed to start. Port 9793 was already in use
    计算机毕业设计Java精准扶贫管理系统统(源码+mysql数据库+系统+lw文档)
    全国核辐射检测数据月度表
    YOLOv8血细胞检测(4):Dual-ViT:一种多尺度双视觉Transformer ,Dualattention助力小目标检测| 顶刊TPAMI 2023
    整理最新java面试宝典2019
    UE4 解决车轮等模型快速旋转时画面模糊
    使用树莓派搭建文件共享服务器-samba服务器
    DC电源模块对效率有什么要求?
    在Vue关于ue的computed属性中传递参数
    最近面了12个人,发现这个测试基础题都答不上来...
  • 原文地址:https://blog.csdn.net/wangxw1803/article/details/127457715