• spark学习笔记(十二)——sparkStreaming-RDD队列/自定义数据源/kafka数据源/DStream转换/DStream输出/优雅关闭


    目录

    RDD队列

    自定义数据源

     kafka数据源

    DStream转换

    无状态转化操作

    有状态转化操作

    DStream输出

    优雅关闭


    用到的全部依赖:

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.spark</groupId>
    4. <artifactId>spark-core_2.12</artifactId>
    5. <version>3.0.0</version>
    6. </dependency>
    7. <dependency>
    8. <groupId>org.apache.spark</groupId>
    9. <artifactId>spark-sql_2.12</artifactId>
    10. <version>3.0.0</version>
    11. </dependency>
    12. <dependency>
    13. <groupId>mysql</groupId>
    14. <artifactId>mysql-connector-java</artifactId>
    15. <version>5.1.27</version>
    16. </dependency>
    17. <dependency>
    18. <groupId>org.apache.spark</groupId>
    19. <artifactId>spark-hive_2.12</artifactId>
    20. <version>3.0.0</version>
    21. </dependency>
    22. <dependency>
    23. <groupId>org.apache.hive</groupId>
    24. <artifactId>hive-exec</artifactId>
    25. <version>1.2.1</version>
    26. </dependency>
    27. <dependency>
    28. <groupId>org.apache.spark</groupId>
    29. <artifactId>spark-streaming_2.12</artifactId>
    30. <version>3.0.0</version>
    31. </dependency>
    32. <dependency>
    33. <groupId>org.apache.spark</groupId>
    34. <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    35. <version>2.4.1</version>
    36. </dependency>
    37. <dependency>
    38. <groupId>org.apache.spark</groupId>
    39. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    40. <version>3.0.0</version>
    41. </dependency>
    42. <dependency>
    43. <groupId>com.fasterxml.jackson.core</groupId>
    44. <artifactId>jackson-core</artifactId>
    45. <version>2.10.1</version>
    46. </dependency>
    47. </dependencies>
    48. <build>
    49. <plugins>
    50. <!--该插件用于把Scala代码编译成为class文件-->
    51. <plugin>
    52. <groupId>net.alchim31.maven</groupId>
    53. <artifactId>scala-maven-plugin</artifactId>
    54. <version>3.2.2</version>
    55. <executions>
    56. <execution>
    57. <!--声明绑定到maven的compile阶段-->
    58. <goals>
    59. <goal>testCompile</goal>
    60. </goals>
    61. </execution>
    62. </executions>
    63. </plugin>
    64. <plugin>
    65. <groupId>org.apache.maven.plugins</groupId>
    66. <artifactId>maven-assembly-plugin</artifactId>
    67. <version>3.1.0</version>
    68. <configuration>
    69. <descriptorRefs>
    70. <descriptorRef>jar-with-dependencies</descriptorRef>
    71. </descriptorRefs>
    72. </configuration>
    73. <executions>
    74. <execution>
    75. <id>make-assembly</id>
    76. <phase>package</phase>
    77. <goals>
    78. <goal>single</goal>
    79. </goals>
    80. </execution>
    81. </executions>
    82. </plugin>
    83. </plugins>
    84. </build>

    RDD队列

    可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD都会作为一个DStream处理。

    循环创建几个RDD并放入队列。通过SparkStream创建Dstream计算WordCount。

    代码:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.rdd.RDD
    3. import org.apache.spark.streaming.{Seconds, StreamingContext}
    4. import scala.collection.mutable
    5. object RDD {
    6. def main(args: Array[String]): Unit = {
    7. //TODO 创建配置环境
    8. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    9. val ssc = new StreamingContext(sparkConf,Seconds(5))
    10. //TODO 操作
    11. //创建RDD队列
    12. val Rdd = new mutable.Queue[RDD[Int]]()
    13. //创建queueStream
    14. val imputStream = ssc.queueStream(Rdd, oneAtATime = false)
    15. //处理RDD数据
    16. val mapRDD = imputStream.map((_, 1))
    17. val reduceRDD = mapRDD.reduceByKey(_ + _)
    18. //打印
    19. reduceRDD.print()
    20. //启动任务
    21. ssc.start()
    22. //循环创建RDD
    23. for (i <- 1 to 50) {
    24. Rdd += ssc.sparkContext.makeRDD(1 to 100,5)
    25. Thread.sleep(2000)
    26. }
    27. ssc.awaitTermination()
    28. }
    29. }

    结果:

    自定义数据源

    继承Receiver,实现onStart、onStop方法来自定义数据源采集。

    案例1代码:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.storage.StorageLevel
    3. import org.apache.spark.streaming.receiver.Receiver
    4. import org.apache.spark.streaming.{Seconds, StreamingContext}
    5. import scala.util.Random
    6. object UserDefined_DataSource {
    7. def main(args: Array[String]): Unit = {
    8. //TODO 创建配置环境
    9. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
    10. val ssc = new StreamingContext(sparkConf,Seconds(5))
    11. //采集数据
    12. val messageDS = ssc.receiverStream(new MyReceiver())
    13. messageDS.print()
    14. //开始
    15. ssc.start()
    16. ssc.awaitTermination()
    17. }
    18. /*
    19. 自定义数据采集器
    20. 1.继承Receiver,定义泛型,传递参数
    21. 2.重写方法
    22. */
    23. class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    24. private var fig = true
    25. //最初启动,读数据
    26. override def onStart(): Unit = {
    27. new Thread(new Runnable {
    28. override def run(): Unit = {
    29. while (true) {
    30. val message = "采集的数据为:" + new Random().nextInt(10).toString
    31. store(message)
    32. Thread.sleep(500)
    33. }
    34. }
    35. }).start()
    36. }
    37. //停止
    38. override def onStop(): Unit = {
    39. fig = false
    40. }
    41. }
    42. }

    结果:

    案列2代码:

    1. import java.io.{BufferedReader, InputStreamReader}
    2. import java.net.Socket
    3. import java.nio.charset.StandardCharsets
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.storage.StorageLevel
    6. import org.apache.spark.streaming.receiver.Receiver
    7. import org.apache.spark.streaming.{Seconds, StreamingContext}
    8. object UserDefined_DataSource02 {
    9. def main(args: Array[String]): Unit = {
    10. //初始化Spark配置信息
    11. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    12. //初始化SparkStreamingContext
    13. val ssc = new StreamingContext(sparkConf, Seconds(10))
    14. //创建自定义数据接收器
    15. val lineStream = ssc.receiverStream(new MyReceiver("hadoop01", 9999))
    16. //数据切分
    17. val word = lineStream.flatMap(_.split("\t"))
    18. //映射为(word,1)
    19. val word2 = word.map((_, 1))
    20. //统计
    21. val wordCount = word2.reduceByKey(_ + _)
    22. //7.打印
    23. wordCount.print()
    24. //启动
    25. ssc.start()
    26. ssc.awaitTermination()
    27. }
    28. /*
    29. 自定义数据采集器
    30. 1.继承Receiver,定义泛型,传递参数
    31. 2.重写方法
    32. */
    33. class MyReceiver(host: String,port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    34. //最初启动,调用方法
    35. override def onStart(): Unit = {
    36. new Thread("Socket Receiver") {
    37. override def run() {
    38. receive()
    39. }
    40. }.start()
    41. }
    42. //读数据并发送给Spark
    43. def receive(): Unit = {
    44. //创建Socket
    45. var socket: Socket = new Socket(host, port)
    46. //定义变量,接收数据
    47. var input: String = null
    48. //创建BufferedReader,读取数据
    49. val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))
    50. //读数据
    51. input = reader.readLine()
    52. //如果receiver没有关闭且输入数据不为空
    53. //循环发送数据给Spark
    54. while (!isStopped() && input != null) {
    55. store(input)
    56. input = reader.readLine()
    57. }
    58. //跳出循环,关闭资源
    59. reader.close()
    60. socket.close()
    61. //重启
    62. restart("restart")
    63. }
    64. override def onStop(): Unit = {}
    65. }
    66. }

     结果:

       

      

     kafka数据源

    ReceiverAPI:需要一个专门的Executor接收数据,然后发送给其他的Executor计算。存在数据的节点内存溢出问题。

    DirectAPI:由计算的Executor主动消费Kafka的数据,速度由自身控制。

    添加依赖

    1. <dependency>
    2. <groupId>org.apache.spark</groupId>
    3. <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
    4. <version>3.0.0</version>
    5. </dependency>
    6. <dependency>
    7. <groupId>com.fasterxml.jackson.core</groupId>
    8. <artifactId>jackson-core</artifactId>
    9. <version>2.10.1</version>
    10. </dependency>

    代码:

    1. import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    2. import org.apache.spark.SparkConf
    3. import org.apache.spark.streaming.dstream.InputDStream
    4. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    5. import org.apache.spark.streaming.{Seconds, StreamingContext}
    6. object UserDefined_DataSource03_kafka {
    7. def main(args: Array[String]): Unit = {
    8. //初始化Spark配置信息
    9. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Kafka")
    10. //初始化SparkStreamingContext
    11. val ssc = new StreamingContext(sparkConf, Seconds(15))
    12. //定义kafka参数
    13. val kafkaPara: Map[String, Object] = Map[String, Object](
    14. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop01:9092",
    15. ConsumerConfig.GROUP_ID_CONFIG -> "testkafka",
    16. "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    17. "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    18. )
    19. //读取Kafka数据创建DStream
    20. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
    21. KafkaUtils.createDirectStream[String, String](
    22. ssc,
    23. LocationStrategies.PreferConsistent,
    24. ConsumerStrategies.Subscribe[String, String](Set("testkafka"), kafkaPara))
    25. //取出KV对
    26. val value = kafkaDStream.map(record => record.value())
    27. //计算
    28. value.flatMap(_.split(" "))
    29. .map((_,1))
    30. .reduceByKey(_+_)
    31. .print()
    32. //启动
    33. ssc.start()
    34. ssc.awaitTermination()
    35. }
    36. }

    测试:

    1)开启zookeeper

    2)开启kafka

    3)创建主题

    进入kafka目录,运行:

    bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic testkafka

    4)打开生产者

    进入kafka目录,运行:

    bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic testkafka

    5)打开消费者

    打开另一个窗口,进入kafka目录,运行:

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic testkafka

    6)运行程序

    结果

     

     

    DStream转换

    DStream的操作和RDD类似,分为Transformations(转换)和Output Operations(输出)两种,转换操作中还有一些比较特殊的原语:updateStateByKey()transform()以及各种Window相关的原语。

    无状态转化和有状态转化:是否保存一个采集周期的数据,保存就是有状态,不保存就是无状态。

    无状态转化操作

    无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个 RDD

    注意:1)针对键值对的DStream转化操作要添加import StreamingContext._才能在Scala上使用。

    2)每个DStream在内部是由许多RDD组成,且无状态转化操作是分别应用到每个RDD上,例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

    join

    两个流之间的join需要两个流的批次大小一致才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join使与两个RDDjoin效果相同。

    代码:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3. object Join {
    4. def main(args: Array[String]): Unit = {
    5. //初始化Spark配置信息
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Join")
    7. //初始化SparkStreamingContext
    8. val ssc = new StreamingContext(sparkConf, Seconds(15) )
    9. //从端口获取数据创建流
    10. val line01 = ssc.socketTextStream("hadoop01", 9999)
    11. val line02 = ssc.socketTextStream("hadoop01", 8888)
    12. //转换为KV类型
    13. val word01 = line01.flatMap(_.split(" ")).map((_,1))
    14. val word02 = line02.flatMap(_.split(" "))map((_,"YES"))
    15. //JOIN
    16. val join = word01.join(word02)
    17. //打印
    18. join.print()
    19. //启动任务
    20. ssc.start()
    21. ssc.awaitTermination()
    22. }
    23. }

    测试结果:

     

    transform

    transform允许DStream上执行任意的RDD-to-RDD函数;该函数并没有在DStream的API中暴露出来方便扩展Spark API;该函数每一批次调度一次;其实也就是对DStream中的RDD应用转换。

     代码:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3. object Transform {
    4. def main(args: Array[String]): Unit = {
    5. //初始化Spark配置信息
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Traneform")
    7. //初始化SparkStreamingContext
    8. val ssc = new StreamingContext(sparkConf, Seconds(15) )
    9. //创建Dstream
    10. val line = ssc.socketTextStream("hadoop01", 9999)
    11. //转化为RDD
    12. val DStreamToRDD = line.transform(
    13. rdd => {
    14. val word = rdd.flatMap(_.split(" "))
    15. val wordmap = word.map((_, 1))
    16. val value = wordmap.reduceByKey(_ + _)
    17. value
    18. }
    19. )
    20. //打印
    21. DStreamToRDD.print()
    22. //启动任务
    23. ssc.start()
    24. ssc.awaitTermination()
    25. }
    26. }

     测试结果:

    有状态转化操作

    UpdateStateByKey

    UpdateStateByKey原语用于记录历史记录。

    有时我们需要在DStream中跨批次维护状态(如:流计算中累加wordcount);updateStateByKey()提供对一个状态变量的访问,给一个由(键,事件)对构成的DStream,传递一个函数,该函数指定根据新的事件更新每个键对应的状态并构建出一个新的DStream,其内部数据为(键,状态) 对。

    updateStateByKey() 的结果是一个新的DStream,其内部的RDD序列是由每个时间区间对应的(键,状态)对组成的。

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3. object UpdateStateByKey {
    4. def main(args: Array[String]): Unit = {
    5. //初始化Spark配置信息
    6. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Count")
    7. //初始化SparkStreamingContext
    8. val ssc = new StreamingContext(sparkConf, Seconds(3))
    9. //使用有状态操作时需要设置检查点路径
    10. ssc.checkpoint("/ck")
    11. //创建DStream
    12. val lines = ssc.socketTextStream("hadoop01", 9999)
    13. val wordMap = lines.map((_,1))
    14. //updateStateByKey:根据key进行数据状态更新
    15. //第一个值:相同key的value数据
    16. //第二个值:缓冲区相同key的value数据
    17. val wordCount = wordMap.updateStateByKey(
    18. (seq: Seq[Int], buff: Option[Int]) => {
    19. val newCount = buff.getOrElse(0) + seq.sum
    20. Option(newCount)
    21. }
    22. )
    23. wordCount.print()
    24. //启动任务
    25. ssc.start()
    26. ssc.awaitTermination()
    27. }
    28. }

    Window

    Window Operations可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态;所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动周期。

    窗口时长:计算内容的时间范围;

    滑动周期:隔多久触发一次计算;

    注:这两者都必须为采集周期大小的整数倍。

    例1:采集周期为3秒,窗口时长为12秒,滑动周期为6秒。

    解:3秒采集一次数据,6秒计算一次,计算的时间范围为12秒也就是4次采集的数据,部分数据可能会重复计算

    例2:采集周期为3秒,窗口时长为6秒,滑动周期为6秒。

    (3秒采集一次数据,6秒计算一次,计算的时间范围为6秒也就是2次采集的数据,数据不会重复计算,因为计算时间和滑动时间一样)

    代码:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3. object WindowOperations {
    4. def main(args: Array[String]): Unit = {
    5. //TODO 采集数据周期为:3秒 窗口时长为:12秒 滑动周期为:6秒
    6. //初始化Spark配置信息
    7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    8. //初始化SparkStreamingContext
    9. val ssc = new StreamingContext(sparkConf, Seconds(3) )
    10. //创建Dstream
    11. val line = ssc.socketTextStream("hadoop01", 9999)
    12. //window(窗口时长,滑动周期)
    13. //窗口的范围应该是采集数据周期的整数倍
    14. //默认以一个采集周期进行滑动
    15. //为了避免重复数据可以改变滑动的幅度,窗口时长=滑动周期时避免重复计算
    16. val word = line.map((_, 1))
    17. //窗口时长为:12秒 滑动周期为:6秒
    18. val windowDS = word.window(Seconds(12),Seconds(6))
    19. val Count = windowDS.reduceByKey(_ + _)
    20. Count.print()
    21. //启动任务
    22. ssc.start()
    23. ssc.awaitTermination()
    24. }
    25. }

    测试结果:

    输入13个a

    结果有21个a,数据重复计算了。

     

    Window操作的其它方法:

    1)window(windowLength, slideInterval): 基于对源DStream窗化的批次进行计算返回一个新的 Dstream;

    2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;

    3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;

    4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的DStream上调用此函数会返回一个新的(K,V)对的DStream,通过对滑动窗口中的批次数据使用reduce函数来整合每个key的value值;

    5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算;通过reduce进入到滑动窗口数据并反向reduce离开窗口的旧数据来实现这个操作;

    一个例子是随着窗口滑动对keys的“加”“减”计数。

    当窗口时长大,滑动周期小时,通过增加数据和删除数据避免重复计算;

    例1:采集周期为3秒,窗口时长为6秒,滑动周期为3秒。

    输入数据(个)(3秒内):0,6,4,3,1,0,0

    输出(3秒):

    0,6(0+6),10(6-0+4),7(10-6+3),4(7-4+1),1(4-3+0),0(1-1+0)

    代码:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3. object WindowOperations2 {
    4. def main(args: Array[String]): Unit = {
    5. //TODO 采集数据周期为:3秒 窗口时长为:12秒 滑动周期为:6秒
    6. //初始化Spark配置信息
    7. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    8. //初始化SparkStreamingContext
    9. val ssc = new StreamingContext(sparkConf, Seconds(3) )
    10. ssc.checkpoint("ck")
    11. //创建Dstream
    12. val line = ssc.socketTextStream("hadoop01", 9999)
    13. val word = line.map((_, 1))
    14. //窗口时长为:12秒 滑动周期为:6秒
    15. val windowDS = word.reduceByKeyAndWindow(
    16. (x:Int,y:Int) => {x + y},
    17. (x:Int,y:Int) => {x - y},
    18. Seconds(6),
    19. Seconds(3)
    20. )
    21. windowDS.print()
    22. //启动任务
    23. ssc.start()
    24. ssc.awaitTermination()
    25. }
    26. }

    DStream输出

    DStream输出操作:流数据经转化操作得到的数据所要执行的操作(保存到外部数据库或输出到屏幕)。

    RDD中的惰性求值类似,一个DStream及其派生DStream都没有被执行输出操作,则这些DStream就不会被求值;如果StreamingContext中没有设定输出操作,整个context就都不会启动。

    输出操作:

    (1)print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。

    (2)foreachRDD(func):这是最通用的输出操作,将函数func用于产生于Stream的每一个RDD;其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统(如将RDD存入文件或者通过网络将其写入数据库)。

    (3)saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容;每一批次的存储文件名基于参数中的prefixsuffix。”prefix-Time_IN_MS[.suffix]”。

    (4)saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为SequenceFiles,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]".。

    (5)saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为Hadoop files,每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"

    通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算;这和transform()有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。

    注: 1) 连接不能写在driver层面(序列化);

    2) 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;

    3) 增加foreachPartition,在分区创建(获取)。

    优雅关闭

    优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭。

    流式任务需要7*24小时执行,有时升级代码需要主动停止程序,分布式程序没办法做到一个个进程的杀死。 应该使用外部文件系统来控制内部程序关闭。

    MonitorStop

    代码:

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
    3. object MonitorStop2 {
    4. def main(args: Array[String]): Unit = {
    5. /*
    6. 线程关闭
    7. val thread = new Thread()
    8. thread.start()//线程开启
    9. thread.stop()//强制关闭
    10. */
    11. //初始化Spark配置信息
    12. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    13. //初始化SparkStreamingContext
    14. val ssc = new StreamingContext(sparkConf, Seconds(3) )
    15. //创建Dstream
    16. val line = ssc.socketTextStream("hadoop01", 9999)
    17. val word = line.map((_, 1))
    18. word.print()
    19. //启动任务
    20. ssc.start()
    21. //创建新的线程用作关闭线程
    22. //而且需要在第三方程序中增加关闭程序
    23. new Thread(
    24. new Runnable {
    25. override def run(): Unit = {
    26. //优雅关闭:计算节点不再接收新的数据,把已经有的数据处理完毕后关闭
    27. //Mysql:Table(stopSpark) => Row => data
    28. //Redis:Data(K-V)
    29. //ZK:/stopSpark
    30. //HDFS:/stopSpark
    31. Thread.sleep(5000)
    32. //获取SparkStreaming状态
    33. val state = ssc.getState()
    34. if (state == StreamingContextState.ACTIVE) {
    35. ssc.stop(true,true)
    36. }
    37. //退出线程
    38. System.exit(0)
    39. }
    40. }
    41. ).start()
    42. ssc.awaitTermination() //block阻塞main线程
    43. }
    44. }

    HDFS

    代码:

    1. import java.net.URI
    2. import org.apache.hadoop.conf.Configuration
    3. import org.apache.hadoop.fs.{FileSystem, Path}
    4. import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
    5. class MonitorStop(ssc: StreamingContext) extends Runnable {
    6. override def run(): Unit = {
    7. val fs: FileSystem = FileSystem.get(new URI("hdfs://hadoop01:9000"), new
    8. Configuration(), "spark")
    9. while (true) {
    10. try
    11. Thread.sleep(5000)
    12. catch {
    13. case e: InterruptedException =>
    14. e.printStackTrace()
    15. }
    16. val state: StreamingContextState = ssc.getState
    17. val bool: Boolean = fs.exists(new Path("hdfs://hadoop01:9000/stopSpark"))
    18. if (bool) {
    19. if (state == StreamingContextState.ACTIVE) {
    20. ssc.stop(stopSparkContext = true, stopGracefully = true)
    21. System.exit(0)
    22. }
    23. }
    24. }
    25. }
    26. }

    SparkTest

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    3. import org.apache.spark.streaming.{Seconds, StreamingContext}
    4. object SparkTest {
    5. def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
    6. val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
    7. Option[Int]) => {
    8. //当前批次内容的计算
    9. val sum: Int = values.sum
    10. //取出状态信息中上一次状态
    11. val lastStatu: Int = status.getOrElse(0)
    12. Some(sum + lastStatu)
    13. }
    14. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkTest")
    15. //设置优雅的关闭
    16. sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    17. val ssc = new StreamingContext(sparkConf, Seconds(5))
    18. ssc.checkpoint("./ck")
    19. val line: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop01", 9999)
    20. val word: DStream[String] = line.flatMap(_.split(" "))
    21. val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
    22. val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
    23. wordAndCount.print()
    24. ssc
    25. }
    26. def main(args: Array[String]): Unit = {
    27. val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () => createSSC())
    28. new Thread(new MonitorStop(ssc)).start()
    29. ssc.start()
    30. ssc.awaitTermination()
    31. }
    32. }

    恢复数据

    设置checkpoint用于恢复数据。

    1. import org.apache.spark.SparkConf
    2. import org.apache.spark.streaming.{Seconds, StreamingContext}
    3. object RecoverData {
    4. def main(args: Array[String]): Unit = {
    5. //TODO 恢复数据
    6. val ssc = StreamingContext.getActiveOrCreate("ck", () => {
    7. //初始化Spark配置信息
    8. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming")
    9. //初始化SparkStreamingContext
    10. val ssc = new StreamingContext(sparkConf, Seconds(3))
    11. //创建Dstream
    12. val line = ssc.socketTextStream("hadoop01", 9999)
    13. val word = line.map((_, 1))
    14. word.print()
    15. ssc
    16. })
    17. ssc.checkpoint("ck")
    18. //启动任务
    19. ssc.start()
    20. ssc.awaitTermination() //block阻塞main线程
    21. }
    22. }

    本文仅仅是学习笔记的记录!!!

  • 相关阅读:
    Redis高级篇——Redis的优化
    测试到底是个啥
    2022年朝阳区科技创新课之“产品创新与成果转化”训练营活动圆满结束
    AR人脸道具SDK解决方案,实现道具与人脸的自然融合
    LeetCode:2. 两数之和
    C++如何监控键盘输入输出
    300. 最长递增子序列
    23年计算机408复习规划以及高分技巧--上岸学姐总结
    Selenium4.0 + Python手撸自动化框架系列之 Web截图 与 封装
    计算输入的正数和负数个数
  • 原文地址:https://blog.csdn.net/qq_55906442/article/details/126328882