• spark Spark Streaming、kafka数据源Direct模式、 自定义数据源


    前言

      Spark Streaming广泛运用于流式数据的处理(准实时、微批次的数据处理框架)。使用离散化流(discretized stream)作为抽象表示,即DStream。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列。典型的流式数据输入源就是kafka

    本文使用的spark版本3.0.0


    1. Kafka数据源

    1.1 选型

      ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。由于接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用
      DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。

    1.2 Kafka 0-10 Direct模式

    依赖:

        <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-core_2.12artifactId>
          <version>3.0.0version>
        dependency>
        <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-streaming_2.12artifactId>
          <version>3.0.0version>
        dependency>
        <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-sql_2.12artifactId>
          <version>3.0.0version>
        dependency>
        <dependency>
          <groupId>org.apache.sparkgroupId>
          <artifactId>spark-streaming-kafka-0-10_2.12artifactId>
          <version>3.0.0version>
        dependency>
        <dependency>
          <groupId>com.fasterxml.jackson.coregroupId>
          <artifactId>jackson-coreartifactId>
          <version>2.10.1version>
        dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    代码:

    import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.dstream.{DStream, InputDStream}
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    
    
    object T5{
      def main(args: Array[String]): Unit = {
        val streamingConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
        val ssc = new StreamingContext(streamingConf, Seconds(3))
    
        //定义Kafka参数
        val kafkaPara: Map[String, Object] = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.42.102:9092,192.168.42.103:9092,192.168.42.104:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "cz",
          "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
          "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
        )
    
        //读取Kafka数据创建DStream
        val kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](Set("di18600"), kafkaPara))
    
        //将每条消息的KV取出
        val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
    
        //计算WordCount
        valueDStream.flatMap(_.split(" "))
          .map((_, 1))
          .reduceByKey(_ + _)
          .print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    kafka生产者发送消息:

    kafka-console-producer.sh --broker-list  192.168.42.102:9092,192.168.42.103:9092,192.168.42.104:9092 --topic di18600
    
    • 1

    在这里插入图片描述


    2. 自定义数据源

      需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集
    onStart: This method is called by the system when the receiver is started. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data.This function must be non-blocking, so receiving the data must occur on a different
    thread. Received data can be stored with Spark by calling store(data).(重点在于初始化源,调用store(data)方法使spark接收数据)

    onStop:This method is called by the system when the receiver is stopped. All resources (threads, buffers, etc.) set up in onStart() must be cleaned up in this method.(重点在于清理onStart()中的线程、缓存)

    import org.apache.spark.SparkConf
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.streaming.receiver.Receiver
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.util.Random
    
    object T4 {
      def main(args: Array[String]): Unit = {
        val streamingConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
        val ssc = new StreamingContext(streamingConf, Seconds(3))
        val ds = ssc.receiverStream(new MyReceiver())
        ds.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
      private var flag = true
    
      override def onStart(): Unit = {
        new Thread(() => {
          while (flag) {
            val i = new Random().nextInt(100)
            store("" + i)
            Thread.sleep(500)
          }
        }).start()
      }
    
      override def onStop(): Unit = {
        flag = false
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

  • 相关阅读:
    操作表 函数的使用
    Mysql技术文档--慢mysql的优化--工作流--按步排查
    Vue3——Suspense组件
    腾讯云标准型SA4服务器AMD处理器性能测评
    IBM Spectrum LSF 作业调度系统,简化计算集群管理并划分工作负载优先级
    简单工厂模式的实践
    go线程安全哈希表concurrent-map
    MySQL之MHA高可用配置及故障切换
    Facebook的创新实验室:人工智能与新技术探索
    网络安全之WebShell截获
  • 原文地址:https://blog.csdn.net/javahelpyou/article/details/126333647