• SparkStreaming (六) --------- 优雅关闭


    目录


    流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。使用外部文件系统来控制内部程序关闭。

    ➢ MonitorStop

    import java.net.URI
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileSystem, Path}
    import org.apache.spark.streaming.{StreamingContext, StreamingContextState}
    class MonitorStop(ssc: StreamingContext) extends Runnable {
    	override def run(): Unit = {
    		val fs: FileSystem = FileSystem.get(new URI("hdfs://linux1:9000"), new Configuration(), "fancyry")
    		while (true) {
    			try
    				Thread.sleep(5000)
    			catch {
    				case e: InterruptedException =>
    				e.printStackTrace()
    			}
    			
    			val state: StreamingContextState = ssc.getState
    			val bool: Boolean = fs.exists(new Path("hdfs://linux1:9000/stopSpark"))
    			if (bool) {
    				if (state == StreamingContextState.ACTIVE) {
    					ssc.stop(stopSparkContext = true, stopGracefully = true)
    					System.exit(0)
    				}
    			}
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    ➢ SparkTest

    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    object SparkTest {
    	def createSSC(): _root_.org.apache.spark.streaming.StreamingContext = {
    		val update: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status:
    		Option[Int]) => {
    			//当前批次内容的计算
    			val sum: Int = values.sum
    			//取出状态信息中上一次状态
    			val lastStatu: Int = status.getOrElse(0)
    			Some(sum + lastStatu)
    		}
    		val sparkConf: SparkConf = new
    		SparkConf().setMaster("local[4]").setAppName("SparkTest")
    		//设置优雅的关闭
    		sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    		val ssc = new StreamingContext(sparkConf, Seconds(5))
    		ssc.checkpoint("./ck")
    		val line: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)
    		val word: DStream[String] = line.flatMap(_.split(" "))
    		val wordAndOne: DStream[(String, Int)] = word.map((_, 1))
    		val wordAndCount: DStream[(String, Int)] = wordAndOne.updateStateByKey(update)
    		wordAndCount.print()
    		ssc
    	}
    	def main(args: Array[String]): Unit = {
    		val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck", () =>
    		createSSC())
    		new Thread(new MonitorStop(ssc)).start()
    		ssc.start()
    		ssc.awaitTermination()
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
  • 相关阅读:
    upload-Labs靶场“11-15”关通关教程
    手把手安装Origin软件
    rsync远程同步
    js知识之选择与循环
    实体店怎么通过抖音推广引流?seo优化玩转抖音
    合宙esp32c3不带串口芯片的型号如何arduino和micropython
    leetcode - 02 树专题 226~662~98~297~剑指Offer36~958~剑指Offer54~111~104~222
    设计模式之【建造者模式】
    快速删除B站的关注列表
    ES6 class类
  • 原文地址:https://blog.csdn.net/m0_51111980/article/details/126705331