join 操作
Structured Streaming 支持 streaming DataSet/DataFrame 与静态的DataSet/DataFrame 进行 join, 也支持 streaming DataSet/DataFrame与另外一个streaming DataSet/DataFrame 进行 join.
join 的结果也是持续不断的生成, 类似于前面学习的 streaming 的聚合结果.
模拟的静态数据:
lisi,male
zhiling,female
zs,male
模拟的流式数据:
lisi,20
zhiling,40
ww,30
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
object StreamingStatic {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StreamingStatic")
.getOrCreate()
import spark.implicits._
// 1. 静态 df
val arr = Array(("lisi", "male"), ("zhiling", "female"), ("zs", "male"));
var staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")
// 2. 流式 df
val lines: DataFrame = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 10000)
.load()
val streamDF: DataFrame = lines.as[String].map(line => {
val arr = line.split(",")
(arr(0), arr(1).toInt)
}).toDF("name", "age")
// 3. join 等值内连接 a.name=b.name
val joinResult: DataFrame = streamDF.join(staticDF, "name") // 或者传seq("name")
// 4. 输出
joinResult.writeStream
.outputMode("append")
.format("console")
.start
.awaitTermination()
}
}
+-------+---+------+
| name|age| sex|
+-------+---+------+
|zhiling| 40|female|
| lisi| 20| male|
+-------+---+------+
val joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left") // 流在那边写那边
+-------+---+------+
| name|age| sex|
+-------+---+------+
|zhiling| 40|female|
| ww| 30| null|
| lisi| 20| male|
+-------+---+------+
在 Spark2.3, 开始支持 stream-stream join.
Spark 会自动维护两个流的状态, 以保障后续流入的数据能够和之前流入的数据发生 join 操作, 但这会导致状态无限增长. 因此, 在对两个流进行 join 操作时, 依然可以用 watermark 机制来消除过期的状态, 避免状态无限增长.
对 2 个流式数据进行 join 操作. 输出模式仅支持append模式
第 1 个数据格式: 姓名,年龄,事件时间
lisi,female,2019-09-16 11:50:00
zs,male,2019-09-16 11:51:00
ww,female,2019-09-16 11:52:00
zhiling,female,2019-09-16 11:53:00
fengjie,female,2019-09-16 11:54:00
yifei,female,2019-09-16 11:55:00
第 2 个数据格式: 姓名,性别,事件时间
lisi,18,2019-09-16 11:50:00
zs,19,2019-09-16 11:51:00
ww,20,2019-09-16 11:52:00
zhiling,22,2019-09-16 11:53:00
yifei,30,2019-09-16 11:54:00
fengjie,98,2019-09-16 11:55:00
import java.sql.Timestamp
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
/**
*
* Date 2019/8/16 5:09 PM
*/
object StreamStream1 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StreamStream1")
.getOrCreate()
import spark.implicits._
// 第 1 个 stream
val nameSexStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 10000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1), Timestamp.valueOf(arr(2)))
}).toDF("name", "sex", "ts1")
// 第 2 个 stream
val nameAgeStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 20000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
}).toDF("name", "age", "ts2")
// join 操作
val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")
joinResult.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}
+-------+------+-------------------+---+-------------------+
| name| sex| ts1|age| ts2|
+-------+------+-------------------+---+-------------------+
|zhiling|female|2019-09-16 11:53:00| 22|2019-09-16 11:53:00|
| ww|female|2019-09-16 11:52:00| 20|2019-09-16 11:52:00|
| yifei|female|2019-09-16 11:55:00| 30|2019-09-16 11:54:00|
| zs| male|2019-09-16 11:51:00| 19|2019-09-16 11:51:00|
|fengjie|female|2019-09-16 11:54:00| 98|2019-09-16 11:55:00|
| lisi|female|2019-09-16 11:50:00| 18|2019-09-16 11:50:00|
+-------+------+-------------------+---+-------------------+
join 的速度很慢, 需要等待.
import java.sql.Timestamp
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
/**
*
* Date 2019/8/16 5:09 PM
*/
object StreamStream2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("StreamStream1")
.getOrCreate()
import spark.implicits._
// 第 1 个 stream
val nameSexStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 10000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1), Timestamp.valueOf(arr(2)))
}).toDF("name1", "sex", "ts1")
.withWatermark("ts1", "2 minutes")
// 第 2 个 stream
val nameAgeStream: DataFrame = spark.readStream
.format("socket")
.option("host", "hadoop201")
.option("port", 20000)
.load
.as[String]
.map(line => {
val arr: Array[String] = line.split(",")
(arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
}).toDF("name2", "age", "ts2")
.withWatermark("ts2", "1 minutes")
// join 操作
val joinResult: DataFrame = nameSexStream.join(
nameAgeStream,
expr(
"""
|name1=name2 and
|ts2 >= ts1 and
|ts2 <= ts1 + interval 1 minutes
""".stripMargin))
joinResult.writeStream
.outputMode("append")
.format("console")
.trigger(Trigger.ProcessingTime(0))
.start()
.awaitTermination()
}
}
+-------+------+-------------------+-------+---+-------------------+
| name1| sex| ts1| name2|age| ts2|
+-------+------+-------------------+-------+---+-------------------+
|zhiling|female|2019-09-16 11:53:00|zhiling| 22|2019-09-16 11:53:00|
| ww|female|2019-09-16 11:52:00| ww| 20|2019-09-16 11:52:00|
| zs| male|2019-09-16 11:51:00| zs| 19|2019-09-16 11:51:00|
|fengjie|female|2019-09-16 11:54:00|fengjie| 98|2019-09-16 11:55:00|
| lisi|female|2019-09-16 11:50:00| lisi| 18|2019-09-16 11:50:00|
+-------+------+-------------------+-------+---+-------------------+
外连接必须使用 watermast
和你连接相比, 代码几乎一致, 只需要在连接的时候指定下连接类型即可:joinType = "left_join"
val joinResult: DataFrame = nameSexStream.join(
nameAgeStream,
expr(
// 连接条件
"""
|name1=name2 and
|ts2 >= ts1 and
|ts2 <= ts1 + interval 1 minutes
""".stripMargin),
joinType = "left_join")
Streaming DF/DS 不支持的操作
到目前, DF/DS 的有些操作 Streaming DF/DS 还不支持.
count() 不能返回单行数据, 必须是s.groupBy().count()foreach() 不能直接使用, 而是使用: ds.writeStream.foreach(...)show() 不能直接使用, 而是使用 console sink如果执行上面操作会看到这样的异常: operation XYZ is not supported with streaming DataFrames/Datasets.