定义:
Spark Streaming 用于流式数据的处理。Spark Streaming 支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后可以用 Spark 的高度抽象原语如:map、reduce、join、window 等进行运算。而结果也能保存在很多地方,如 HDFS,数据库等。
SparkStreaming 准实时(延迟时间:秒,分钟为单位),微批次(设置一段时间进行处理数据 如:3s、5s等)的数据处理框架。
和Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”)。所以简单来将,DStream 就是对 RDD 在实时数据处理场景的一种封装
Discretized Stream:
Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每个 RDD 含有一段时间间隔内的数据。
SparkStreaming特点:
案例说明:使用 netcat 工具向 9999 端口不断的发送数据,通过 SparkStreaming 读取端口数据并
统计不同单词出现的次数
netcat 工具解压后进入该文件夹,输入cmd,在cmd中输入:nc -lp 9999
环境配置
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.12</artifactId>
<version>3.0.0</version>
</dependency>
代码
package com.bigdata.SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* @author wangbo
* @version 1.0
*/
object SparkStreaming_01_wordcount {
def main(args: Array[String]): Unit = {
//TODO 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//第二个参数表示批量处理的周期(采集周期)
val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
//TODO 逻辑处理
//获取端口数据
//注意这里需要到 F:\尚硅谷-大数据\spark3.0\2.资料\netcat-win32-1.12 输入 cmd ,然后在cmd中输入 nc -lp 9999
val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)
val wordToCount: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
wordToCount.print()
/*
由于SparkStreaming采集器是长期执行的任务,所以不能直接关闭
如果main方法执行完毕,应用程序也会自动结束,所以不能让mian执行完毕
*/
//启动采集器
ssc.start()
//等待采集器的关闭
ssc.awaitTermination()
}
}
DStream的创建有很多,主要有以下三种方式:
用法及其说明:可以通过使用 ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理
package com.bigdata.SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
/**
* @author wangbo
* @version 1.0
*/
object SparkStreaming_02_Queue {
def main(args: Array[String]): Unit = {
//TODO 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//第二个参数表示批量处理的周期(采集周期)
val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
//3.创建 RDD 队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的 RDD 数据
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动采集器
ssc.start()
//8.循环创建并向 RDD 队列中放入 RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
//8.等待采集器的关闭
ssc.awaitTermination()
}
}
用法及其说明:需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
需求:自定义数据源,实现监控某个端口号,获取该端口号内容
package com.bigdata.SparkStreaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
import scala.util.Random
/**
* @author wangbo
* @version 1.0
*/
/**
* 自定义数据源的方式创建DStream
*/
object SparkStreaming_02_DIY {
def main(args: Array[String]): Unit = {
//TODO 创建环境对象
//StreamingContext创建时,需要传递两个参数
//第一个参数表示环境配置
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//第二个参数表示批量处理的周期(采集周期)
val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
val messageDStream: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver)
messageDStream.print()
//7.启动采集器
ssc.start()
//8.等待采集器的关闭
ssc.awaitTermination()
}
/*
自定义数据采集器
1. 继承Receiver,定义泛型,传递参数
2. 重写方法
*/
class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){
private var flag = true
override def onStart(): Unit = {
new Thread(new Runnable { //随机生成数
override def run(): Unit = {
while (true){
val message: String = "采集的数据为:" + new Random().nextInt(10).toString
store(message)
Thread.sleep(500)
}
}
})
}
override def onStop(): Unit = {
flag = false
}
}
}
版本:DirectAPI 是由计算的 Executor 来主动消费 Kafka 的数据,速度由自身控制。
需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印到控制台,Kafka 0-10 Direct 模式
注意:zookeeper和kafka要启动
添加依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.10.1</version>
</dependency>
代码部分
package com.bigdata.SparkStreaming
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random
/**
* @author wangbo
* @version 1.0
*/
/**
* 自定义数据源的方式创建DStream
*/
object SparkStreaming_04_Kafka {
def main(args: Array[String]): Unit = {
//TODO 创建环境对象
//第一个参数表示环境配置
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
//第二个参数表示批量处理的周期(采集周期)
val ssc = new StreamingContext(sparkConf, Seconds(3)) //这里的时间参数以秒为单位
//定义kafka参数
val kafkaPara: Map[String, Object] = Map[String, Object](
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
"hadoop100:9092,hadoop102:9092,hadoop103:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "kafka",
"key.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer"
)
val kafkaData: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Set("kafka"), kafkaPara) //Set("kafka")主题为kafka,kafkaPara为kafka的配置
)
kafkaData.map(_.value()).print()
//7.启动采集器
ssc.start()
//8.等待采集器的关闭
ssc.awaitTermination()
}
}