使用 Structured Streaming 最重要的就是对 Streaming DataFrame 和 Streaming DataSet 进行各种操作.
从 Spark2.0 开始, DataFrame 和 DataSet 可以表示静态有界的表, 也可以表示流式无界表.
与静态 Datasets/DataFrames 类似,我们可以使用公共入口点 SparkSession 从流数据源创建流式 Datasets/DataFrames,并对它们应用与静态 Datasets/DataFrames 相同的操作。
通过spark.readStream()得到一个DataStreamReader对象, 然后通过这个对象加载流式数据源, 就得到一个流式的 DataFrame.
park 内置了几个流式数据源, 基本可以满足我们的所有需求.
| Source | Options | Fault-tolerant | Notes | |
|---|---|---|---|---|
| File source | path: path to the input directory, and common to all file formats. maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to true, the following files would be considered as the same file, because their filenames, “dataset.txt”, are the same: “file:///dataset.txt” “s3://a/dataset.txt” “s3n://a/b/dataset.txt” “s3a://a/b/c/dataset.txt” For file-format-specific options, see the related methods in DataStreamReader(Scala/Java/Python/R). E.g. for “parquet” format options see DataStreamReader.parquet(). In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for “parquet”, see Parquet configuration section. | Yes | Supports glob paths, but does not support multiple comma-separated paths/globs. | Option ‘basePath’ must be a directory. Schema must be specified when creating a streaming source DataFrame. If some files already exist in the directory, then depending on the file format you may be able to create a static DataFrame on that directory with ‘spark.read.load(directory)’ and infer schema from it. |
| Socket Source | host: host to connect to, must be specified port: port to connect to, must be specified | No | ||
| Rate Source | rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second. rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark’s default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed. | Yes | ||
| Kafka Source | See the Kafka Integration Guide. | Yes |
socket source
具体案例参考前面的快速入门
路径指定到文件夹 ,readStream必须指定schema;read可以推断文件类型
```scala
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadFromFile {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master(“local[*]”)
.appName(“ReadFromFile”)
.getOrCreate()
// 定义 Schema, 用于指定列名以及列中的数据类型
val userSchema: StructType = new StructType().add("name", StringType).add("age", LongType).add("job", StringType)
val user: DataFrame = spark.readStream
.format("csv")
.schema(userSchema) // 指定schema
.load("/Users/lzc/Desktop/csv") // 必须是目录, 不能是文件名
val query: StreamingQuery = user.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0)) // 触发器 数字表示毫秒值. 0 表示立即处理
.format("console")
.start()
query.awaitTermination()
}
}
注意: 一个文件读取完 ,更改文件内数据是监听不到
前面获取user的代码也可以使用下面的替换:
val user: DataFrame = spark.readStream
.schema(userSchema)
.csv("/Users/lzc/Desktop/csv")
当文件夹被命名为 “key=value” 形式时, Structured Streaming 会自动递归遍历当前文件夹下的所有子文件夹, 并根据文件名实现自动分区.
如果文件夹的命名规则不是"key=value"形式, 则不会触发自动分区. 另外, 同级目录下的文件夹的命名规则必须一致.
user1.csv
lisi,male,18
zhiling,female,28
user2.csv
lili,femal,19
fengjie,female,40
import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object ReadFromFile2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("ReadFromFile")
.getOrCreate()
// 定义 Schema, 用于指定列名以及列中的数据类型
val userSchema: StructType = new StructType().add("name", StringType).add("sex", StringType).add("age", IntegerType)
val user: DataFrame = spark.readStream
.schema(userSchema)
.csv("/Users/lzc/Desktop/csv")
val query: StreamingQuery = user.writeStream
.outputMode("append")
.trigger(Trigger.ProcessingTime(0)) // 触发器 数字表示毫秒值. 0 表示立即处理
.format("console")
.start()
query.awaitTermination()
}
}
参考文档: http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
object KafkaSourceDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("KafkaSourceDemo")
.getOrCreate()
// 得到的 df 的 schema 是固定的: key,value,topic,partition,offset,timestamp,timestampType
val df: DataFrame = spark.readStream
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "topic1") // 也可以订阅多个主题: "topic1,topic2"
.load
df.writeStream
.outputMode("update")
.format("console")
.trigger(Trigger.Continuous(1000))
// timestamp显示完整时间
.option("truncate",false)
.start
.awaitTermination()
}
}
对于kafka来说格式是固定的:
value:字节数组
timestamp:kafka写入数据时间戳
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object KafkaSourceDemo2 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("KafkaSourceDemo")
.getOrCreate()
import spark.implicits._
// 得到的 df 的 schema 是固定的: key,value,topic,partition,offset,timestamp,timestampType
val lines: Dataset[String] = spark.readStream
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "topic1") // 也可以订阅多个主题: "topic1,topic2"
.load
// 转换成string
.selectExpr("CAST(value AS string)")
.as[String]
val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()
query.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", "./ck1") // 下次启动的时候, 可以从上次的位置开始读取
.start
.awaitTermination()
}
}
这种模式一般需要设置消费的其实偏移量和结束偏移量, 如果不设置 checkpoint 的情况下, 默认起始偏移量 earliest, 结束偏移量为 latest.
该模式为一次性作业(批处理), 而非持续性的处理数据.
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
object KafkaSourceDemo3 {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("KafkaSourceDemo")
.getOrCreate()
import spark.implicits._
val lines: Dataset[String] = spark.read // 使用 read 方法,而不是 readStream 方法
.format("kafka") // 设置 kafka 数据源
.option("kafka.bootstrap.servers", "hadoop201:9092,hadoop202:9092,hadoop203:9092")
.option("subscribe", "topic1")
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.load
.selectExpr("CAST(value AS STRING)")
.as[String]
val query: DataFrame = lines.flatMap(_.split("\\W+")).groupBy("value").count()
query.write // 使用 write 而不是 writeStream
.format("console")
.save()
}
}
以固定的速率生成固定格式的数据, 用来测试 Structured Streaming 的性能.
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.{DataFrame, SparkSession}
object RateSourceDemo {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[*]")
.appName("RateSourceDemo")
.getOrCreate()
val rows: DataFrame = spark.readStream
.format("rate") // 设置数据源为 rate
.option("rowsPerSecond", 10) // 设置每秒产生的数据的条数, 默认是 1
.option("rampUpTime", 1) // 设置多少秒到达指定速率 默认为 0
.option("numPartitions", 2) /// 设置分区数 默认是 spark 的默认并行度
.load
rows.writeStream
.outputMode("append")
.trigger(Trigger.Continuous(1000))
.format("console")
.start()
.awaitTermination()
}
}