是RDD的API的流式工具, 其本质还是RDD, 存储和执行过程依然类似RDD
是Dataset的API的流式工具,API和Dataset保持高度一致
Spark Streaming:
基于微批,延迟高不能做到真正的实时
DStream基于RDD,不直接支持SQL
流批处理的API应用层不统一,(流用的DStream-底层是RDD,批用的DF/DS/RDD)
不支持EventTime事件时间
注:
EventTime事件时间 :事件真正发生的事件
PorcessingTime处理时间:事件被流系统处理的时间
IngestionTime摄入时间:事件到底流系统的时间
如: 一条错误日志10月1日,23:59:00秒产生的(事件时间),因为网路延迟,到10月2日 00:00:10到达日志处理系统(摄入时间),10月2日 00:00:20被流系统处理(处理时间)
如果要统计10月1日的系统bug数量,那么SparkStreaming不能正确统计,因为它不支持事件时间
数据的Exactly-Once(恰好一次语义)需要手动实现
注: 数据的一致性语义
最多一次
恰好一次–是我们的目标,SparkStreaming如果要实现恰好一次,需要手动维护偏移量+其他操作
最少一次
Structured Streaming:
object SocketWordCount {
def main(args: Array[String]): Unit = {
// 1. 创建SparkSession
val spark = SparkSession.builder()
.master("local[6]")
.appName("socket word count")
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
// 2. 读取外部数据源,并转为Dataset[String]
val source: Dataset[String] = spark.readStream
.format("socket")
.option("host", "192.168.88.100")
.option("port", 7777)
.load()
.as[String]
// 3. 统计词频
val words = source.flatMap(_.split(" "))
.map((_, 1))
.groupByKey(_._1)
.count()
// 4. 输出结果
words.writeStream
.outputMode(OutputMode.Complete()) // 统计全局结果,而不是一个批次
.format("console")
.start()
.awaitTermination()
}
}
Spark中的DS有两种:
如何使用DS表示流式计算
StreamExecution
StreamExecution在流上进行基于Dataset的查询, 也就是说,Dataset之所以能够在流上进行查询, 是因为StreamExecution的调度和管理。
增量查询: