• 大数据高级开发工程师——Spark学习笔记(10)


    Spark内存计算框架

    Spark Streaming

    Checkpoint

    1. checkpoint的基本介绍
    • checkpoint 是 SparkStreaming 当中为了解决流式处理程序意外停止造成的数据丢失问题,checkpoint 的目的是保证长时间运行的任务在意外挂掉之后,能够在拉起来时不丢失数据。
    • checkpoint 中包含两种数据:
      • metadata 元数据信息:用户恢复 Driver 端的数据
        • 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统,用来恢复 Driver,元数据包括:配置(用户创建该 Streaming Application的所有配置)、DStream(一系列的操作)、未完成的batches(那些提交了 job 但尚未执行或未完成的batches)
      • data:保存已生成的 RDDs 至可靠的存储;这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖随时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链。
    2. 什么时候需要使用checkpoint
    • 满足以下任一条件:
      • 使用了 stateful 转换:如果 Application 中使用了 updateStateByKey 或 reduceByKeyAndWindow 等 stateful 操作,必须提供 checkpoint 目录来允许定时的 RDD 进行 checkpoint。
      • 希望从意外中恢复 Driver
    • 如果 Streaming App 没有 stateful 操作,也允许 Driver 挂掉之后再次重启的进度丢失,就没有启用 checkpoint 的必要了。
    3. 如何使用checkpoint
    • 启用 checkpoint,需要设置一个支持容错的、可靠的文件系统(如HDFS、S3等)目录来保存 checkpoint 数据。
    • 通过调用 streamingContext.checkpoint(checkpointDirectory) 来完成,另外,如果你想让你的 Application 能从 Driver 失败中恢复,你的 Application 要满足:
      • 若 Application 为首次重启,将创建一个新的 StreamContext 实例;
      • 若 Application 是从失败中重启,将会从 checkpoint 目录导入 checkpoint 数据来重新创建 StreamingContext 实例;
      • 通过 StreamingContext.getOrCreate 可以达到目的。
    • checkpoint 不仅仅可以保存运行结果中的数据,还可以存储 Driver 端的信息,通过 checkpoint 可以实现 Driver 端的高可用。代码实现如下:
    object Case09_DriverHAWordCount {
      val checkpointPath = "hdfs://node01:8020/checkpoint"
    
      def creatingFunc(): StreamingContext = {
        Logger.getLogger("org").setLevel(Level.ERROR)
    
        val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint(checkpointPath)
    
        val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
        val result: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(updateFunc)
    
        result.print()
    
        ssc
      }
    
      /**
        * @param currentValues  表示在当前批次每个单词出现的所有的1 (hadoop,1)、(hadoop,1)、...、(hadoop,1)
        * @param historyValues  表示在之前所有批次中每个单词出现的总次数 (hadoop,100)
        */
      def updateFunc(currentValues: Seq[Int], historyValues: Option[Int]): Option[Int] = {
        val newValue: Int = currentValues.sum + historyValues.getOrElse(0)
        Some(newValue)
      }
    
      def main(args: Array[String]): Unit = {
        val ssc: StreamingContext = StreamingContext.getOrCreate(checkpointPath, creatingFunc _)
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 如果 checkpointDirectory 存在,那么 context 将导入 checkpoint 数据;如果目录不存在,函数 functionToCreateContext 将被调用并创建新的 context。
    • 除了调用 getOrCreate 外,还需要你的集群模式执行 Driver 挂掉之后重启之。
      • 例如,在 yarn 模式下,Driver 是运行在 ApplicationMaster 中,若 ApplicationMaster 挂掉,yarn 会自动在另一个节点启动一个新的 ApplicationMaster。
    • 需要注意的是,随着 Streaming Application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置 checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的 5~10 倍。

    Spark Streaming和Spark SQL整合

    • pom.xml 文件添加依赖:
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-sql_${scala.version}artifactId>
      <version>${spark.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 代码开发:
    object Case10_WordCountStreamingAndSql {
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
    
        // 1. 创建SparkConf对象,注意这里至少给两个线程,一个线程没办法执行
        val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
    
        // 2. 创建StreamingContext对象
        val ssc = new StreamingContext(sparkConf, Seconds(1))
    
        // 3. 接收Socket数据
        val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
    
        // 4. 对数据进行处理
        val words: DStream[String] = socketTextStream.flatMap(_.split(" "))
    
        // 5. 对DStream进行处理,将RDD转换成DataFrame
        words.foreachRDD(rdd => {
          // 获取 SparkSession
          val sparkSession: SparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
          import sparkSession.implicits._
          val dataFrame: DataFrame = rdd.toDF("word")
          // 将dataFrame注册成表
          dataFrame.createOrReplaceTempView("words")
          // 统计每个单词出现的次数
          val result: DataFrame = sparkSession.sql("select word, count(*) as count from words group by word")
    
          // 展示结果
          result.show()
        })
    
        // 6. 开启流式计算
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    Spark Streaming容错

    1. 节点失败容错
    • SparkStreaming运行流程:

    在这里插入图片描述

    • 当一个 Executor 失败时:会将 task 重新发送到备份的数据块所在的 Executor
      • Tasks和Receiver自动的重启,不需要做任何的配置

    在这里插入图片描述

    • 当 Driver 失败时:使用 checkpoint 机制恢复失败的 Driver

    在这里插入图片描述

    • 使用 checkpoint 机制,会定期将 Driver 信息写入到 HDFS 中

    在这里插入图片描述

    • 步骤一:设置自动重启 Driver 程序
    # Standalone: 在spark-submit提交任务时,增加两个参数 `--deploy-mode cluster` 和 `--supervise`
    spark-submit \
    --master spark://node01:7077 \
    --deploy-mode cluster \
    --supervise \
    --class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
    --executor-memory 1g \
    --total-executor-cores 2 \
    original-spark-demo-1.0.jar
    
    # Yarn: 在spark-submit提交任务时,增加参数 `--deploy-mode cluster`,并设置 `yarn.resourcemanager.am.max-attemps`
    spark-submit \
    --master yarn \
    --deploy-mode cluster \
    --class com.yw.spark.example.streaming.cases.Case01_SocketWordCount \
    --executor-memory 1g \
    --total-executor-cores 2 \
    original-spark-demo-1.0.jar
    
    <property>
      <name>yarn.resourcemanager.am.max-attempts</name>
      <value>4</value>
      <description>
        The maximum number of application master execution attempts.
      </description>
    </property>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 步骤二:设置 HDFS 的 checkpoint 目录
    streamingContext.checkpoint(hdfsDirectory) 
    
    • 1
    • 步骤三:代码实现
    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...
    
    // Start the context
    context.start()
    context.awaitTermination()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    2. 数据丢失如何处理
    • 可以利用 WAL 机制,将数据写入到 HDFS 中,这样当发生节点宕机时,可以从 WAL 中恢复

    在这里插入图片描述

    • 步骤一:设置 checkpoint 目录
    streamingContext.checkpoint(hdfsDirectory)
    
    • 1
    • 步骤二:开启 WAL 日志
    sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    
    • 1
    • 步骤三:需要 reliable receiver
      • 当数据写完 WAL 后,才告诉数据源已经消费,对于没有告诉数据的数据,可以从数据源中重新消费数据。
    • 步骤四:取消备份
      • 使用 StorageLevel.MEMORY_AND_DISK_SER 来存储数据源,不需要后缀为 2 的策略,因为 HDFS 已经是多副本了。
    val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999, StorageLevel.MEMORY_AND_DISK_SER)
    
    • 1

    在这里插入图片描述

    • Reliable Receiver:当数据接收到,并且已经备份存储后,再发送回执给数据源;
    • Unreliable Receiver:不发送回执给数据源
    • WAL:使用在文件系统和数据库中用于数据操作的持久性,先把数据写到一个持久化的日志中,然后对数据做操作,如果操作过程中系统挂了,恢复的时候可以重新读取日志文件再次进行操作。
      • 对于像kafka和flume这些使用接收器来接收数据的数据源。接收器作为一个长时间的任务运行在executor中,负责从数据源接收数据,如果数据源支持的话,向数据源确认接收到数据,然后把数据存储在executor的内存中,然后在exector上运行任务处理这些数据。
      • 如果wal启用了,所有接收到的数据会保存到一个日志文件中去(HDFS), 这样保存接收数据的持久性。
      • 此外,如果只有在数据写入到log中之后接收器才向数据源确认,这样driver重启后那些保存在内存中但是没有写入到log中的数据将会重新发送,这两点保证的数据的无丢失。
    3. Task执行很慢容错

    在这里插入图片描述

    • 开启推测机制:假设总的 task 有 10 个,成功的 task 数量 > spark.speculation.quantile * 10,正在运行的 task 的运行时间 > spark.speculation.multoplier * 成功运行task的平均时间,则这个正在运行的 task 需要重新等待调度。
    # 每隔一段时间来检查有哪些正在运行的 task 需要重新调度
    spark.speculation = true
    # 推测间隔时间
    spark.speculation.interval = 100ms
    # 推测数量阈值
    spark.speculation.quantile = 0.75
    spark.speculation.multoplier = 1.5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在这里插入图片描述

    • 在分布式环境中,导致某个 task 执行缓慢的情况有很多:负载不均、程序 bug、资源不均、数据倾斜等,而且这些情况在分布式计算环境中是常态。Speculative Task(推测Task) 这种以空间换时间的思路设计对计算资源是种压榨,同时,如果 Speculative Task 本身也变成了 Slow Task 会导致情况进一步恶化。

    优雅关闭

    • 流式任务需要 7 * 24h 执行,但有时涉及到代码升级,需要主动停止程序。但是分布式程序,没办法做到一个个进程去杀死,所以配置优雅关闭就显得至关重要了。
    • 使用外部文件系统来控制内部程序关闭,代码实现如下:
    object Case11_GracefullyShutdown {
      private val HDFS: String = "hdfs://node01:8020"
      private val CHECKPOINT: String = HDFS + "/checkpoint"
    
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
    
        val ssc: StreamingContext = StreamingContext.getActiveOrCreate(CHECKPOINT, () => createSsc())
    
        new Thread(new MonitorStop(ssc)).start()
    
        ssc.start()
        ssc.awaitTermination()
      }
    
      def createSsc(): _root_.org.apache.spark.streaming.StreamingContext = {
        val updateFunc: (Seq[Int], Option[Int]) => Some[Int] = (values: Seq[Int], status: Option[Int]) => {
          Some(values.sum + status.getOrElse(0))
        }
        val sparkConf: SparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
    
        // 设置优雅关闭
        sparkConf.set("spark.streaming.stopGracefullyOnShutdown", "true")
    
        val ssc = new StreamingContext(sparkConf, Seconds(5))
        ssc.checkpoint(CHECKPOINT)
        val socketTextStream: ReceiverInputDStream[String] = ssc.socketTextStream("node01", 9999)
        val wordAndCount: DStream[(String, Int)] = socketTextStream.flatMap(_.split(" ")).map((_, 1))
          .updateStateByKey(updateFunc)
    
        wordAndCount.print()
    
        ssc
      }
    
      class MonitorStop(ssc: StreamingContext) extends Runnable {
        override def run(): Unit = {
          val fs: FileSystem = FileSystem.get(new URI(HDFS), new Configuration(), "hadoop")
          while (true) {
            try {
              TimeUnit.SECONDS.sleep(5)
            } catch {
              case e: InterruptedException => e.printStackTrace()
            }
            val state: StreamingContextState = ssc.getState()
            val bool: Boolean = fs.exists(new Path(HDFS + "/stopSpark"))
    
            if (bool && state == StreamingContextState.ACTIVE) {
              ssc.stop(stopSparkContext = true, stopGracefully = true)
              System.exit(0)
            }
          }
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    Spark Streaming整合kafka

    • 在消费 kafka 数据时,可以有三种语义保证:
      • at most one 至多一次:数据最多处理一次或没有被处理,有可能造成数据丢失的情况;
      • at least once 至少一次:数据最少被处理一次,有可能存在重复消费的问题;
      • exactly once 精准一次:数据消费一次且仅一次
    1. Spark Streaming整合kafka-0-8

    在这里插入图片描述

    方式一:Receiver-based Approach【不推荐使用】
    • 此方法使用Receiver接收数据,Receiver是使用Kafka高级消费者API实现的。
    • 与所有接收器一样,从Kafka通过Receiver接收的数据存储在Spark执行器中,然后由Spark Streaming启动的作业处理数据。
    • 但是在默认配置下,此方法可能会在失败时丢失数据(请参阅接收器可靠性。)
    • 为确保零数据丢失,必须在Spark Streaming中另外启用Write Ahead Logs(WAL 在Spark 1.2中引入)。
    • 这将同步保存所有收到的Kafka数据写入分布式文件系统(例如HDFS)上的预写日志,以便在发生故障时可以恢复所有数据,但是性能不好。
    • pom.xml 添加依赖:
    <dependency>
        <groupId>org.apache.sparkgroupId>
        <artifactId>spark-streaming-kafka-0-8_2.11artifactId>
        <version>2.4.8version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 核心代码:
    import org.apache.spark.streaming.kafka._
    
    val kafkaStream = KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
    
    • 1
    • 2
    • 3
    • 4
    • 代码示例:
    /**
      * sparkStreaming使用kafka 0.8API基于recevier来接受消息
      */
    object Case12_KafkaReceiver08 {
      private val zkQuorum = "192.168.254.120:2181"
      private val groupId = "KafkaReceiver08"
      private val topics = Map("test" -> 1)
    
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
    
        val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
          // 开启 WAL 机制
          .set("spark.streaming.receiver.writeAheadLog.enable", "true")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        // 设置checkpoint,将接收到的数据持久化写入到HDFS
        ssc.checkpoint("hdfs://node01:8020/wal")
    
        // 接收kafka数据
        val receiverDstream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, zkQuorum, groupId, topics)
        // 获取kafka的topic数据
        val data: DStream[String] = receiverDstream.map(_._2)
        // 单词计算
        val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    
        result.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 创建 kafka 的 topic 并发送数据
    # 创建topic
    bin/kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic test --zookeeper 192.168.254.120:2181
    # 生产发送数据
    bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test
    
    • 1
    • 2
    • 3
    • 4

    如果程序运行过程中,出现错误java.lang.NoSuchMethodError: net.jpountz.lz4.LZ4BlockInputStream.(Ljava/io/InputStream;Z)V这是spark-core与kafka-client中lz4版本不一致导致的,可用以下方式在程序中指定其他的压缩算法进行解决
    new SparkConf().set(“spark.io.compression.codec”, “snappy”)

    方式二:Direct Approach(NoReceivers)
    • 这种新的不基于Receiver的,是直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。
    • 替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。
    • 当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
    • 这种方式有如下优点:
      • 简化并行读取:
        • 如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。
        • Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。
        • 所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
      • 高性能:
        • 如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下。
        • 因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。
        • 而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
      • 一次且仅一次的事务机制:
        • 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。
        • 这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性。
        • 但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
      • 降低资源:
        • Direct不需要Receivers,其申请的Executors全部参与到计算任务中。
        • 而Receiver-based则需要专门的Receivers来读取Kafka数据且不参与计算。
        • 因此相同的资源申请,Direct 能够支持更大的业务。
      • 降低内存:
        • Receiver-based的Receiver与其他Exectuor是异步的,并持续不断接收数据。
        • 对于小业务量的场景还好,如果遇到大业务量时,需要提高Receiver的内存,但是参与计算的Executor并无需那么多的内存。
        • 而Direct 因为没有Receiver,而是在计算时读取数据,然后直接计算,所以对内存的要求很低。
        • 实际应用中我们可以把原先的10G降至现在的2-4G左右。
      • 鲁棒性更好:
        • Receiver-based方法需要Receivers来异步持续不断的读取数据,因此遇到网络、存储负载等因素,导致实时任务出现堆积,但Receivers却还在持续读取数据,此种情况很容易导致计算崩溃。
        • Direct 则没有这种顾虑,其Driver在触发batch计算任务时,才会读取数据并计算。队列出现堆积并不会引起程序的失败。
    • 代码示例:
    /**
      * sparkStreaming使用kafka 0.8API基于Direct直连来接受消息
      * spark direct API接收kafka消息,从而不需要经过zookeeper,直接从broker上获取信息。
      */
    object Case13_KafkaDirect08 {
      private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
      private val groupId = "KafkaDirect08"
      private val topics = Set("test")
    
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
    
        val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
          // 开启 WAL 机制
          .set("spark.streaming.receiver.writeAheadLog.enable", "true")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        // 接收kafka数据
        val kafkaParams = Map(
          "metadata.broker.list" -> kafkaCluster,
          "group.id" -> groupId
        )
        // 使用direct直连的方式接收数据
        val kafkaDstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        // 获取kafka的topic数据
        val data: DStream[String] = kafkaDstream.map(_._2)
        // 单词计算
        val result: DStream[(String, Int)] = data.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
    
        result.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 要想保证数据不丢失,最简单的就是靠checkpoint的机制;但是checkpoint机制有个特点,如果代码升级了,checkpoint机制就失效了。
    • 所以如果想实现数据不丢失,那么就需要自己管理offset。
    2. Spark Streaming与kafka-0-10整合
    • 支持0.10版本,或者更高的版本【推荐使用这个版本】
    • pom.xml 添加依赖:
    <dependency>
      <groupId>org.apache.sparkgroupId>
      <artifactId>spark-streaming-kafka-0-10_${scala.version}artifactId>
      <version>${spark.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 代码示例:
    /**
      * sparkStreaming使用kafka 1.0API基于Direct直连来接受消息
      */
    object Case14_KafkaDirect10 {
      private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
      private val groupId = "KafkaDirect10"
      private val topics = Set("test")
    
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
        // 1. 创建StreamingContext对象
        val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        // 2. 使用Direct接收kafka数据
        val kafkaParams = Map(
          "bootstrap.servers" -> kafkaCluster,
          "group.id" -> groupId,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "enable.auto.commit" -> "false"
        )
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
          ssc,
          // 数据本地性策略
          LocationStrategies.PreferConsistent,
          // 指定要订阅的topic
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    
        // 3. 对数据进行处理
        // 注意:如果你想获取到消息消费的偏移,这里需要拿到最开始的这个DStream进行操作
        // 如果你对该DStream进行了其他的转换之后,生成了新的DStream,新的DStream不再保存对应的消息的偏移量
        kafkaDStream.foreachRDD(rdd => {
          // 获取消息内容
          val dataRdd: RDD[String] = rdd.map(_.value())
          // 打印
          dataRdd.foreach(line => println(line))
    
          // 4. 提交偏移量,将偏移量信息添加到kafka中
          val offsetRanges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          kafkaDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        })
    
        // 5. 开启流式计算
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    3. 解决SparkStreaming与kafka-0.8版本正好数据不丢失方案
    • 一般企业来说无论你是使用哪一套api去消费kafka中的数据,都是设置手动提交偏移量。因为自动提交(默认60s提交一次)偏移量风险比较高,可能会出现数据丢失或者数据被重复处理:
      • 数据处理失败了,自动提交了偏移量,会出现数据的丢失;
      • 数据处理成功了,自动提交偏移量失败,之后消费时会从失败的位置再次消费,导致数据重复处理。
    • 一般来说就手动去提交偏移量,将偏移量的提交通过消费者程序自己去维护,示意图如下:

    在这里插入图片描述

    • 代码示例,偏移量存入ZK:
    /**
      * sparkStreaming使用kafka 0.8API基于Direct直连来接受消息
      * 手动将偏移量数据保存到ZK中
      */
    object Case15_KafkaManageOffset08 {
      private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
      private val zkQuorum = "192.168.254.120:2181"
      private val groupId = "consumer-manager"
      private val topic = "wordcount"
      private val topics = Set(topic)
    
      def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)
    
        val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
          // 开启 WAL 机制
          .set("spark.streaming.receiver.writeAheadLog.enable", "true")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
    
        // 创建一个 ZKGroupTopicDirs 对象,就是用来指定在zk中的存储目录,用来保存数据偏移量
        val topicDirs = new ZKGroupTopicDirs(groupId, topic)
        // 获取 ZK 中的路径 "/consumers/consumer-manager/offsets/wordcount"
        val zkTopicPath = topicDirs.consumerOffsetDir
        // 构造一个ZK的客户端,用来读写偏移量数据
        val zkClient = new ZkClient(zkQuorum)
        // 准备kafka的参数
        val kafkaParams = Map(
          "metadata.broker.list" -> kafkaCluster,
          "group.id" -> groupId,
          "enable.auto.commit" -> "false"
        )
        // 定义kafkaStream流
        var kafkaStream: InputDStream[(String, String)] = null
        // 获取指定的zk节点的子节点个数
        val childrenNum = zkClient.countChildren(zkTopicPath)
        // 判断是否保存过数据: 根据子节点的数量是否为0
        if (childrenNum > 0) {
          var fromOffsets: Map[TopicAndPartition, Long] = Map()
          for (i <- 0 until childrenNum) {
            // 获取子节点
            val partitionOffset: String = zkClient.readData[String](s"$zkTopicPath/$i")
            val tp = TopicAndPartition(topic, i)
            // 获取数据偏移量: 将不同分区内的数据偏移量保存到map集合中
            fromOffsets += (tp -> partitionOffset.toLong)
          }
          // 泛型中 key, kafka中的key   value:hello tom hello jerry
          // 创建函数 解析数据 转换为(topic_name, message)的元组
          val messageHandler = (mmd: MessageAndMetadata[String, String]) => (mmd.topic, mmd.message())
          // 利用底层的API创建DStream: 采用直连的方式(若之前已经消费了,则从指定的位置消费)
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](ssc, kafkaParams, fromOffsets, messageHandler)
        } else {
          // 利用底层的API创建DStream 采用直连的方式(若之前没有消费,则这是第一次读取数据)
          // zk中没有子节点数据,就是第一次读取数据,直接创建直连对象
          kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
        }
        // 直接操作kafkaStream
        // 依次迭代DStream中的kafkaRDD, 只有kafkaRDD才可以强转为HasOffsetRanges, 从中获取数据偏移量信息
        // 之后是操作的RDD, 不能够直接操作DStream, 因为调用Transformation方法之后就不是kafkaRDD了获取不了偏移量信息
        kafkaStream.foreachRDD(kafkaRDD => {
          // 强转为HasOffsetRanges, 获取offset偏移量数据
          val offsetRanges = kafkaRDD.asInstanceOf[HasOffsetRanges].offsetRanges
          // 获取数据
          val lines: RDD[String] = kafkaRDD.map(_._2)
          // 接下来就是对RDD进行操作 触发action
          lines.foreachPartition(partition => partition.foreach(x => println(x)))
          // 手动提交偏移量到zk集群上
          for (o <- offsetRanges) {
            // 拼接zk路径
            val zkPath = s"${topicDirs.consumerOffsetDir}/${o.partition}"
            // 将 partition 的偏移量数据 offset 保存到zookeeper中
            ZkUtils.updatePersistentPath(zkClient, zkPath, o.untilOffset.toString)
          }
        })
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    4. Spark Streaming如何保证exactly-once
    • 一个流式计算如果要保证 exactly-once,首先要有三点要求:
      • source 支持 reply;
      • 流计算引擎本身处理能保证 exactly-once;
      • sink支持幂等或事务更新
    • 实现数据被处理且仅被处理一次,就需要实现数据结果保存操作与偏移量保存操作再同一个事务中,或者实现幂等操作。
    • 也就是说如果想让 SparkStreaming 的程序保证 exactly-once,需要从以下三个角度出发:
      • 接收数据:从Source中接收数据,保证 exactly-once;
      • 转换数据:用DStream和RDD算子转换,保证 exactly-once;
      • 存储数据: 将结果保存到外部系统,保证 exactly-once。
    • scalikejdbc依赖:
    <dependency>
      <groupId>org.scalikejdbcgroupId>
      <artifactId>scalikejdbc_${scala.version}artifactId>
      <version>${scalikejdbc.version}version>
    dependency>
    <dependency>
      <groupId>org.scalikejdbcgroupId>
      <artifactId>scalikejdbc-config_${scala.version}artifactId>
      <version>${scalikejdbc.version}version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 示例代码:
    /**
      * SparkStreaming EOS:
      * Input: kafka
      * Process: SparkStreaming
      * Output: MySQL
      * 保证EOS:
      * 1、偏移量自己管理,即enable.auto.commit=false,这里保存在Mysql中
      * 2、使用createDirectStream
      * 3、事务输出: 结果存储与Offset提交在Driver端同一Mysql事务中
      */
    class Case16_EOSKafkaMysqlAtomic {
      @transient lazy val log = LoggerFactory.getLogger(this.getClass)
    
      private val kafkaCluster = "node01:9092,node02:9092,node03:9092"
      private val groupId = "consumer-eos"
      private val topic = "topic_eos"
      private val mysqlUrl = "jdbc:mysql://node01:3306/test"
      private val mysqlUsr = "root"
      private val mysqlPwd = "123456"
    
      def main(args: Array[String]): Unit = {
        // 准备kafka参数
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> kafkaCluster,
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean),
          "group.id" -> groupId
        )
        // 数据库连接池
        ConnectionPool.singleton(mysqlUrl, mysqlUsr, mysqlPwd)
    
        val sparkConf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(5))
    
        // 1、初次启动或重启时,从指定的Partition、Offset构建TopicPartition
        // 2、运行过程中,每个Partition、Offset保存在内部currentOffsets = Map[TopicPartition, Long]()变量中
        // 3、后期Kafka Topic分区扩展,在运行过程中不能自动感知
        val initOffset = DB.readOnly(implicit session => {
          sql"select `partition`,offset from kafka_topic_offset where topic =${topic} and `group`=${groupId}"
            .map(item => new TopicPartition(topic, item.get[Int]("partition")) -> item.get[Long]("offset"))
            .list().apply().toMap
        })
    
        // CreateDirectStream: 从指定的Topic、Partition、Offset开始消费
        val sourceDStream = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Assign[String, String](initOffset.keys, kafkaParams, initOffset)
        )
        sourceDStream.foreachRDD(rdd => {
          if (!rdd.isEmpty()) {
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            offsetRanges.foreach(offsetRange => {
              log.info(s"Topic: ${offsetRange.topic}, Group: ${groupId}, Partition: ${offsetRange.partition}, fromOffset: ${offsetRange.fromOffset}, untilOffset: ${offsetRange.untilOffset}")
            })
            // 统计分析
            val sparkSession = SparkSession.builder().config(rdd.sparkContext.getConf).getOrCreate()
            import sparkSession.implicits._
            val dataFrame = sparkSession.read.json(rdd.map(_.value()).toDS())
            dataFrame.createOrReplaceTempView("tmpTable")
            val result = sparkSession.sql(
              """
                | select eventTimeMinute, language, count(1) pv, count(distinct(userID)) uv
                | from (select *, substr(eventTime,0,16) eventTimeMinute from tmpTable) as tmp
                | group by eventTimeMinute, language
              """.stripMargin).collect()
    
            // 在Driver端存储数据、提交Offset,结果存储与Offset提交在同一事务中原子执行,这里将偏移量保存在Mysql中
            DB.localTx(implicit session => {
              result.foreach(row => {
                sql"""
                  insert into twitter_pv_uv (eventTimeMinute,language,pv,uv) values (
                    ${row.getAs[String]("eventTimeMinute")},
                    ${row.getAs[String]("language")},
                    ${row.getAs[Long]("pv")},
                    ${row.getAs[Long]("uv")},
                  ) on duplicate key update pv = pv, uv = uv
                """.update.apply()
              })
    
              // offset 提交
              offsetRanges.foreach(offsetRange => {
                val affectedRows =
                  sql"""
                    update kafka_topic_offset set offset = ${offsetRange.untilOffset}
                    where topic = ${topic} and `group` = ${groupId} and `partition` = ${offsetRange.partition} and offset = ${offsetRange.fromOffset}
                     """.update.apply()
    
                if (affectedRows != 1) {
                  throw new Exception(s"""Commit Kafka Topic: ${topic} Offset Faild!""")
                }
              })
            })
          }
        })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
  • 相关阅读:
    搭建Django项目
    题目:2695.包装数组
    如何调节电脑屏幕亮度?让你的眼睛更舒适!
    3.下载Swin-Transformer-Object-Detection
    Explore EP965U HDMI 2.0发射机
    工作比读研简单多了
    排序算法(一)
    JS递归函数详解
    spark on hive
    【远程访问和支持软件】上海道宁为您提供简单、安全、随时可用的远程访问解决方案
  • 原文地址:https://blog.csdn.net/yangwei234/article/details/127720398