streamingContext.checkpoint(checkpointDirectory) 来完成,另外,如果你想让你的 Application 能从 Driver 失败中恢复,你的 Application 要满足:
StreamingContext.getOrCreate 可以达到目的。object Case09_DriverHAWordCount {
val checkpointPath = "hdfs://node01:8020/checkpoint"
def creatingFunc(): StreamingContext = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint(checkpointPath)
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc)
result.print()
ssc
}
/**
* @param currentValues 表示在当前批次每个单词出现的所有的1 (hadoop,1)、(hadoop,1)、...、(hadoop,1)
* @param historyValues 表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
*/
def updateFunc(currentValues: Seq[Int], historyValues: Option[Int]): Option[Int] = {
val newValue: Int = currentValues.sum + historyValues.getOrElse(0)
Some(newValue)
}
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointPath, creatingFunc _)
ssc.start()
ssc.awaitTermination()
}
}
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-sql_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
object Case10_WordCountStreamingAndSql {
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行
val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 2. 创建StreamingContext对象
val ssc = new StreamingContext(sparkConf, Seconds(1))
// 3. 接收Socket数据
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
// 4. 对数据进行处理
val words: DStream[String] = socketTextStream.flatMap(_.split(" "))
// 5. 对DStream进行处理,将RDD转换成DataFrame
words.foreachRDD(rdd => {
// 获取 SparkSession
val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import sparkSession.implicits._
val dataFrame: DataFrame = rdd.toDF("word")
// 将dataFrame注册成表
dataFrame.createOrReplaceTempView("words")
// 统计每个单词出现的次数
val result: DataFrame = sparkSession.sql("select word, count(*) as count from words group by word")
// 展示结果
result.show()
})
// 6. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}




# Standalone: 在spark-submit提交任务时,增加两个参数 `--deploy-mode cluster` 和 `--supervise`
spark-submit \
--master spark://node01:7077 \
--deploy-mode cluster \
--supervise \
--class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
--executor-memory 1g \
--total-executor-cores 2 \
original-spark-demo-1.0.jar
# Yarn: 在spark-submit提交任务时,增加参数 `--deploy-mode cluster`,并设置 `yarn.resourcemanager.am.max-attemps`
spark-submit \
--master yarn \
--deploy-mode cluster \
--class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
--executor-memory 1g \
--total-executor-cores 2 \
original-spark-demo-1.0.jar
<property>
<name>yarn.resourcemanager.am.max-attempts</name>
<value>4</value>
<description>
The maximum number of application master execution attempts.
</description>
</property>
streamingContext.checkpoint(hdfsDirectory)
// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {
val ssc = new StreamingContext(...) // new context
val lines = ssc.socketTextStream(...) // create DStreams
...
ssc.checkpoint(checkpointDirectory) // set checkpoint directory
ssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
// Do additional setup on context that needs to be done,
// irrespective of whether it is being started or restarted
context. ...
// Start the context
context.start()
context.awaitTermination()

streamingContext.checkpoint(hdfsDirectory)
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999, StorageLevel.MEMORY_AND_DISK_SER)


spark.speculation.quantile * 10,正在运行的 task 的运行时间 > spark.speculation.multoplier * 成功运行task的平均时间,则这个正在运行的 task 需要重新等待调度。# 每隔一段时间来检查有哪些正在运行的 task 需要重新调度
spark.speculation = true
# 推测间隔时间
spark.speculation.interval = 100ms
# 推测数量阈值
spark.speculation.quantile = 0.75
spark.speculation.multoplier = 1.5

object Case11_GracefullyShutdown {
private val HDFS: String = "hdfs://node01:8020"
private val CHECKPOINT: String = HDFS + "/checkpoint"
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val ssc: StreamingContext = StreamingContext.getActiveOrCreate(CHECKPOINT, () => createSsc())
new Thread(new MonitorStop(ssc)).start()
ssc.start()
ssc.awaitTermination()
}
def createSsc(): _root_.org.apache.spark.streaming.StreamingContext = {
val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {
Some(values.sum + status.getOrElse(0))
}
val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
// 设置优雅关闭
sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
val ssc = new StreamingContext(sparkConf, Seconds(5))
ssc.checkpoint(CHECKPOINT)
val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
val wordAndCount: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1))
.updateStateByKey(updateFunc)
wordAndCount.print()
ssc
}
class MonitorStop(ssc: StreamingContext) extends Runnable {
override def run(): Unit = {
val fs: FileSystem = FileSystem.get(new URI(HDFS), new Configuration(), "hadoop")
while (true) {
try {
TimeUnit.SECONDS.sleep(5)
} catch {
case e: InterruptedException => e.printStackTrace()
}
val state: StreamingContextState = ssc.getState()
val bool: Boolean = fs.exists(new Path(HDFS + "/stopSpark"))
if (bool && state == StreamingContextState.ACTIVE) {
ssc.stop(stopSparkContext = true, stopGracefully = true)
System.exit(0)
}
}
}
}
}

<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-kafka-0-8_2.11artifactId>
<version>2.4.8version>
dependency>
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
/**
* sparkStreaming使用kafka 0.8API基于recevier来接受消息
*/
object Case12_KafkaReceiver08 {
private val zkQuorum = "192.168.254.120:2181"
private val groupId = "KafkaReceiver08"
private val topics = Map("test" -> 1)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 开启 WAL 机制
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 设置checkpoint,将接收到的数据持久化写入到HDFS
ssc.checkpoint("hdfs://node01:8020/wal")
// 接收kafka数据
val receiverDstream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
// 获取kafka的topic数据
val data: DStream[String] = receiverDstream.map(_._2)
// 单词计算
val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
# 创建topic
bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper 192.168.254.120:2181
# 生产发送数据
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
如果程序运行过程中,出现错误
java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.这是spark-core与kafka-client中lz4版本不一致导致的,可用以下方式在程序中指定其他的压缩算法进行解决(Ljava/io/InputStream;Z)V
new SparkConf().set(“spark.io.compression.codec”, “snappy”)
/**
* sparkStreaming使用kafka 0.8API基于Direct直连来接受消息
* spark direct API接收kafka消息,从而不需要经过zookeeper,直接从broker上获取信息。
*/
object Case13_KafkaDirect08 {
private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
private val groupId = "KafkaDirect08"
private val topics = Set("test")
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 开启 WAL 机制
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 接收kafka数据
val kafkaParams = Map(
"metadata.broker.list" -> kafkaCluster,
"group.id" -> groupId
)
// 使用direct直连的方式接收数据
val kafkaDstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
// 获取kafka的topic数据
val data: DStream[String] = kafkaDstream.map(_._2)
// 单词计算
val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
result.print()
ssc.start()
ssc.awaitTermination()
}
}
<dependency>
<groupId>org.apache.sparkgroupId>
<artifactId>spark-streaming-kafka-0-10_${scala.version}artifactId>
<version>${spark.version}version>
dependency>
/**
* sparkStreaming使用kafka 1.0API基于Direct直连来接受消息
*/
object Case14_KafkaDirect10 {
private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
private val groupId = "KafkaDirect10"
private val topics = Set("test")
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
// 1. 创建StreamingContext对象
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 2. 使用Direct接收kafka数据
val kafkaParams = Map(
"bootstrap.servers" -> kafkaCluster,
"group.id" -> groupId,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"enable.auto.commit" -> "false"
)
val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
ssc,
// 数据本地性策略
LocationStrategies.PreferConsistent,
// 指定要订阅的topic
ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
)
// 3. 对数据进行处理
// 注意:如果你想获取到消息消费的偏移,这里需要拿到最开始的这个DStream进行操作
// 如果你对该DStream进行了其他的转换之后,生成了新的DStream,新的DStream不再保存对应的消息的偏移量
kafkaDStream.foreachRDD(rdd => {
// 获取消息内容
val dataRdd: RDD[String] = rdd.map(_.value())
// 打印
dataRdd.foreach(line => println(line))
// 4. 提交偏移量,将偏移量信息添加到kafka中
val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
})
// 5. 开启流式计算
ssc.start()
ssc.awaitTermination()
}
}

/**
* sparkStreaming使用kafka 0.8API基于Direct直连来接受消息
* 手动将偏移量数据保存到ZK中
*/
object Case15_KafkaManageOffset08 {
private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
private val zkQuorum = "192.168.254.120:2181"
private val groupId = "consumer-manager"
private val topic = "wordcount"
private val topics = Set(topic)
def main(args: Array[String]): Unit = {
Logger.getLogger("org").setLevel(Level.ERROR)
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
// 开启 WAL 机制
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// 创建一个 ZKGroupTopicDirs 对象,就是用来指定在zk中的存储目录,用来保存数据偏移量
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
// 获取 ZK 中的路径 "/consumers/consumer-manager/offsets/wordcount"
val zkTopicPath = topicDirs.consumerOffsetDir
// 构造一个ZK的客户端,用来读写偏移量数据
val zkClient = new ZkClient(zkQuorum)
// 准备kafka的参数
val kafkaParams = Map(
"metadata.broker.list" -> kafkaCluster,
"group.id" -> groupId,
"enable.auto.commit" -> "false"
)
// 定义kafkaStream流
var kafkaStream: InputDStream[(String, String)] = null
// 获取指定的zk节点的子节点个数
val childrenNum = zkClient.countChildren(zkTopicPath)
// 判断是否保存过数据: 根据子节点的数量是否为0
if (childrenNum > 0) {
var fromOffsets: Map[TopicAndPartition, Long] = Map()
for (i <- 0 until childrenNum) {
// 获取子节点
val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/$i")
val tp = TopicAndPartition(topic, i)
// 获取数据偏移量: 将不同分区内的数据偏移量保存到map集合中
fromOffsets += (tp -> partitionOffset.toLong)
}
// 泛型中 key, kafka中的key value:hello tom hello jerry
// 创建函数 解析数据 转换为(topic_name, message)的元组
val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
// 利用底层的API创建DStream: 采用直连的方式(若之前已经消费了,则从指定的位置消费)
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
} else {
// 利用底层的API创建DStream 采用直连的方式(若之前没有消费,则这是第一次读取数据)
// zk中没有子节点数据,就是第一次读取数据,直接创建直连对象
kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
}
// 直接操作kafkaStream
// 依次迭代DStream中的kafkaRDD, 只有kafkaRDD才可以强转为HasOffsetRanges, 从中获取数据偏移量信息
// 之后是操作的RDD, 不能够直接操作DStream, 因为调用Transformation方法之后就不是kafkaRDD了获取不了偏移量信息
kafkaStream.foreachRDD(kafkaRDD => {
// 强转为HasOffsetRanges, 获取offset偏移量数据
val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
// 获取数据
val lines: RDD[String] = kafkaRDD.map(_._2)
// 接下来就是对RDD进行操作 触发action
lines.foreachPartition(partition => partition.foreach(x => println(x)))
// 手动提交偏移量到zk集群上
for (o <- offsetRanges) {
// 拼接zk路径
val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
// 将 partition 的偏移量数据 offset 保存到zookeeper中
ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)
}
})
ssc.start()
ssc.awaitTermination()
}
}
<dependency>
<groupId>org.scalikejdbcgroupId>
<artifactId>scalikejdbc_${scala.version}artifactId>
<version>${scalikejdbc.version}version>
dependency>
<dependency>
<groupId>org.scalikejdbcgroupId>
<artifactId>scalikejdbc-config_${scala.version}artifactId>
<version>${scalikejdbc.version}version>
dependency>
/**
* SparkStreaming EOS:
* Input: kafka
* Process: SparkStreaming
* Output: MySQL
* 保证EOS:
* 1、偏移量自己管理,即enable.auto.commit=false,这里保存在Mysql中
* 2、使用createDirectStream
* 3、事务输出: 结果存储与Offset提交在Driver端同一Mysql事务中
*/
class Case16_EOSKafkaMysqlAtomic {
@transient lazy val log = LoggerFactory.getLogger(this.getClass)
private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
private val groupId = "consumer-eos"
private val topic = "topic_eos"
private val mysqlUrl = "jdbc:mysql://node01:3306/test"
private val mysqlUsr = "root"
private val mysqlPwd = "123456"
def main(args: Array[String]): Unit = {
// 准备kafka参数
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> kafkaCluster,
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean),
"group.id" -> groupId
)
// 数据库连接池
ConnectionPool.singleton(mysqlUrl, mysqlUsr, mysqlPwd)
val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 1、初次启动或重启时,从指定的Partition、Offset构建TopicPartition
// 2、运行过程中,每个Partition、Offset保存在内部currentOffsets = Map[TopicPartition, Long]()变量中
// 3、后期Kafka Topic分区扩展,在运行过程中不能自动感知
val initOffset = DB.readOnly(implicit session => {
sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${groupId}"
.map(item => new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
.list().apply().toMap
})
// CreateDirectStream: 从指定的Topic、Partition、Offset开始消费
val sourceDStream = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Assign[String, String](initOffset.keys, kafkaParams, initOffset)
)
sourceDStream.foreachRDD(rdd => {
if (!rdd.isEmpty()) {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(offsetRange => {
log.info(s"Topic: ${offsetRange.topic}, Group: ${groupId}, Partition: ${offsetRange.partition}, fromOffset: ${offsetRange.fromOffset}, untilOffset: ${offsetRange.untilOffset}")
})
// 统计分析
val sparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
import sparkSession.implicits._
val dataFrame = sparkSession.read.json(rdd.map(_.value()).toDS())
dataFrame.createOrReplaceTempView("tmpTable")
val result = sparkSession.sql(
"""
| select eventTimeMinute, language, count(1) pv, count(distinct(userID)) uv
| from (select *, substr(eventTime,0,16) eventTimeMinute from tmpTable) as tmp
| group by eventTimeMinute, language
""".stripMargin).collect()
// 在Driver端存储数据、提交Offset,结果存储与Offset提交在同一事务中原子执行,这里将偏移量保存在Mysql中
DB.localTx(implicit session => {
result.foreach(row => {
sql"""
insert into twitter_pv_uv (eventTimeMinute,language,pv,uv) values (
${row.getAs[String]("eventTimeMinute")},
${row.getAs[String]("language")},
${row.getAs[Long]("pv")},
${row.getAs[Long]("uv")},
) on duplicate key update pv = pv, uv = uv
""".update.apply()
})
// offset 提交
offsetRanges.foreach(offsetRange => {
val affectedRows =
sql"""
update kafka_topic_offset set offset = ${offsetRange.untilOffset}
where topic = ${topic} and `group` = ${groupId} and `partition` = ${offsetRange.partition} and offset = ${offsetRange.fromOffset}
""".update.apply()
if (affectedRows != 1) {
throw new Exception(s"""Commit Kafka Topic: ${topic} Offset Faild!""")
}
})
})
}
})
ssc.start()
ssc.awaitTermination()
}
}