数据处理的延迟
在计算开始前已知所有输入数据,输入数据不会产生变化,一般计算量级较大,计算时间也较长。例如今天早上一点,把昨天累积的日志,计算出所需结果。最经典的就是Hadoop的MapReduce方式;
输入数据是可以以序列化的方式一个一个地输入并进行处理,也就是说在开始的时候并不需要知道所有的输入数据。与离线计算相比,运行时间短,计算量级相对较小。强调计算过程的时间要短,即所查当下给出结果。
数据处理的方式
近年来,在Web应用、网络监控、传感监测等领域,兴起了一种新的数据密集型应用——流数据,即数据以大量、快速、时变的流形式持续到达。实例:PM2.5检测、电子商务网站用户点击流。
流数据具有如下特征:
注重数据的整体价值,不过分关注个别数据
Spark流使得构建可扩展的容错流应用程序变得更加容易。
Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。
在 Spark Streaming 中,处理数据的单位是一批而不是单条,而数据采集却是逐条进行的,因此 Spark
Streaming 系统需要设置间隔使得数据汇总到一定的量后再一并操作,这个间隔就是批处理间隔。批处理间隔是Spark Streaming的核心概念和关键参数,它决定了Spark Streaming提交作业的频率和数据处理的延迟,同时也影响着数据处理的吞吐量和性能。
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为RDD 存在,而DStream是由这些RDD所组成的序列(因此得名“离散化”)。DStreams可以由来自数据源的输入数据流来创建, 也可以通过在其他的DStreams上应用一些高阶操作来得到。所以简单来讲,DStream就是对RDD在实时数据处理场景的一种封装。
易整合到Spark体系
缺点
Spark Streaming是一种“微量批处理”架构, 和其他基于“一次处理一条记录”架构的系统相比, 它的延迟会相对高一些。
整体架构图
SparkStreaming架构图
Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,这样虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
org.apache.spark
spark-streaming_2.12
3.0.0
object StreamWordCount {
def main(args: Array[String]): Unit = {
//初始化Spark配置信息,Streaming程序执行至少需要两个线程(采集、执行)
//不能设置为local
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")
//初始化SparkStreamingContext,程序执行入口对象
//Seconds 底层调用 new Duration
val ssc = new StreamingContext(sparkConf, Seconds(3))
//通过监控端口创建DStream,读进来的数据为一行一行的
val lineStreams = ssc.socketTextStream("hadoop101", 9999)
//将每一行数据做切分,形成一个个单词
val wordStreams = lineStreams.flatMap(_.split(" "))
//将单词映射成元组(word,1)
val wordAndOneStreams = wordStreams.map((_, 1))
//将相同的单词次数做统计
val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_)
//打印
wordAndCountStreams.print()
//启动SparkStreamingContext采集器
ssc.start()
//默认情况下采集器不能关闭,等待采集结束之后,终止程序
ssc.awaitTermination()
}
}
# 开启SecureCRT
nc -lk 9999
输入内容
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据。
对数据的操作也是按照RDD为单位来进行的
计算过程由Spark Engine来完成
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理
需求:循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
object RDDStream {
def main(args: Array[String]) {
//1.初始化Spark配置信息
val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")
//2.初始化SparkStreaming上下文环境对象
val ssc = new StreamingContext(conf, Seconds(4))
//3.创建RDD队列
val rddQueue = new mutable.Queue[RDD[Int]]()
//4.创建QueueInputDStream,从队列中采集数据,获取DS
val inputStream = ssc.queueStream(rddQueue,oneAtATime = false)
//5.处理队列中的RDD数据
val mappedStream = inputStream.map((_,1))
val reducedStream = mappedStream.reduceByKey(_ + _)
//6.打印结果
reducedStream.print()
//7.启动采集器
ssc.start()
//8.循环创建并向RDD队列中放入RDD
for (i <- 1 to 5) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
}
ssc.awaitTermination()
}
}
结果展示
-------------------------------------------
Time: 1662277280000 ms
-------------------------------------------
(196,1)
(296,1)
(96,1)
(52,1)
(4,1)
(180,1)
(16,1)
(156,1)
(216,1)
(28,1)
...
-------------------------------------------
Time: 1662277284000 ms
-------------------------------------------
(196,2)
(296,2)
(96,2)
(52,2)
(4,2)
(180,2)
(16,2)
(156,2)
(216,2)
(28,2)
...
-------------------------------------------
Time: 1662277288000 ms
-------------------------------------------
(196,2)
(296,2)
(96,2)
(52,2)
(4,2)
(180,2)
(16,2)
(156,2)
(216,2)
(28,2)
...
-------------------------------------------
Time: 1662277292000 ms
-------------------------------------------
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集
需求:自定义数据源,实现监控某个端口号,获取该端口号内容
//泛型表示读取数据的类型
class CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
//最初启动的时候,调用该方法,作用为:读数据并将数据发送给Spark
override def onStart(): Unit = {
new Thread("Socket Receiver") {
override def run() {
receive()
}
}.start()
}
//读数据并将数据发送给Spark,真正处理接收数据的逻辑
def receive(): Unit = {
//创建一个Socket连接
var socket: Socket = new Socket(host, port)
//定义一个变量,用来接收端口传过来的数据
var input: String = null
//创建一个BufferedReader用于按行读取端口传来的数据
//InputStreamReader 将字节流转换为字符流
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
//读取一行数据
input = reader.readLine()
//当receiver没有关闭并且输入数据不为空,则循环发送数据给Spark
while (!isStopped() && input != null) {
store(input)
input = reader.readLine()
}
//跳出循环则关闭资源
reader.close()
socket.close()
//重启任务
restart("restart")
}
override def onStop(): Unit = {
if(socket != null ){
socket.close()
socket = null
}
}
}
object FileStream {
def main(args: Array[String]): Unit = {
//1.初始化Spark配置信息
val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")
//2.初始化SparkStreamingContext
val ssc = new StreamingContext(sparkConf, Seconds(5))
//3.创建自定义数据源创建receiver的Streaming
val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop101", 9999))
//4.将每一行数据做切分,形成一个个单词
val wordStream = lineStream.flatMap(_.split("\t"))
//5.将单词映射成元组(word,1)
val wordAndOneStream = wordStream.map((_, 1))
//6.将相同的单词次数做统计
val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)
//7.打印
wordAndCountStream.print()
//8.启动SparkStreamingContext
ssc.start()
ssc.awaitTermination()
}
}
ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用。默认情况下,offset维护在zk中。
DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。默认情况下,offseet维护在checkpoint检查点,需要改变SparkStreamingContext的创建方式;也可以手动指定offset维护位置,为了保证数据的精准一致性,一般维护在有事务的存储上。
通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算,最终打印到控制台。
org.apache.spark
spark-streaming-kafka-0-8_2.11
2.4.5
// 通过ReciverAPI连接kafka数据源,获取数据
object Spark04_ReceiverAPI {
def main(args: Array[String]): Unit = {
//1.创建SparkConf
val sparkConf: SparkConf = new SparkConf().setAppName("Spark04_ReceiverAPI").setMaster("local[*]")
//2.创建StreamingContext,第二个参数为采集周期
val ssc = new StreamingContext(sparkConf, Seconds(3))
//3.使用ReceiverAPI读取Kafka数据创建DStream
val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(
//Streaming Context
ssc,
//Zookeeper地址
"hadoop101:2181,hadoop102:2181,hadoop103:2181",
//groupid,消费者组
"bigdata",
//k表示主题的名字,v表示主题的分区数
Map("mybak" -> 2))
//4.计算WordCount并打印 new KafkaProducer[String,String]().send(new ProducerRecord[]())
//获取kafka中的消息,只需要v的部分
val lineDStream: DStream[String] = kafkaDStream.map(_._2)
val word: DStream[String] = lineDStream.flatMap(_.split(" "))
val wordToOneDStream: DStream[(String, Int)] = word.map((_, 1))
val wordToCountDStream: DStream[(String, Int)] = wordToOneDStream.reduceByKey(_ + _)
wordToCountDStream.print()
//5.开启任务
ssc.start()
ssc.awaitTermination()
}
}
# 启动Zookeeper
zk start
# 启动kafka
kafka start
# 查看所有主题
bin/kafka-topics.sh --list --bootstrap-server hadoop101:9092
# 创建一个和上述代码中相同的主题
bin/kafka-topics.sh --create --bootstrap-server hadoop101:9092 --topic bigdata --partitions 2 --replication-factor 2
# 生产者向topic发送消息
bin/kafka-console-producer.sh --broker-list hadoop101:9092 --topic bigdata
# 启动上述代码,查看是否可以连接到kafka,并且接收到生产者传来的消息
# 发送的内容
hello word
hello scala
hello java
0-8 Receive模式,offset维护在zk中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。可通过get /consumers/bigdata/offsets/主题名/分区号 查看,注意会存在一些延迟