• spark DStream从不同数据源采集数据(RDD 队列、文件、diy 采集器、kafka)(scala 编程)


    目录

    1. RDD队列

    2 textFileStream

    3 DIY采集器

    4 kafka数据源【重点】


    1. RDD队列

           a、使用场景:测试
           b、实现方式: 通过ssc.queueStream(queueOfRDDs)创建DStream,每一个推送这个队列的RDD,都会作为一个DStream处理

    1. val sparkconf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")
    2. val ssc = new StreamingContext(sparkconf,Seconds(3))
    3. // 创建一个队列对象,队列中存放的是RDD
    4. val queue = new mutable.Queue[RDD[String]]()
    5. // 通过队列创建DStream
    6. val queueDS: InputDStream[String] = ssc.queueStream(queue)
    7. queueDS.print()
    8. // 启动采集器
    9. ssc.start()
    10. //这个操作之所以放在这个位置,是为了模拟流式的感觉,数据源源不断的生产
    11. for(i <- 1 to 5 ){
    12. // 循环创建rdd
    13. val rdd: RDD[String] = ssc.sparkContext.makeRDD(List(i.toString))
    14. // 将RDD存放到队列中
    15. queue.enqueue(rdd)
    16. // 当前线程休眠1秒
    17. Thread.sleep(6000)
    18. }
    19. // 等待采集器的结束
    20. ssc.awaitTermination()
    21. }

    2 textFileStream

    1. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("textFileStream")
    2. val ssc = new StreamingContext(sparkConf,Seconds(3))
    3. //从文件中读取数据
    4. val textDS: DStream[String] = ssc.textFileStream("in")
    5. textDS.print()
    6. // 启动采集器
    7. ssc.start()
    8. // 等待采集器的结束
    9. ssc.awaitTermination()

    3 DIY采集器

        1. 自定义采集器
        2. 什么情况下需要自定采集器呢?
             比如从mysql、hbase中读取数据。
             采集器的作用是从指定的地方,按照采集周期对数据进行采集。
             目前有:采集kafka、采集netcat工具的指定端口的数据、采集文件目录中的数据等
        3. 自定义采集器的步骤,模仿socketTextStream
             a、自定采集器类,继承extends,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
             b、重写onStart和onStop方法
                onStart:采集器的如何启动
                onStop:采集的如何停止

    1. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("DIY")
    2. val ssc = new StreamingContext(sparkConf, Seconds(3))
    3. // 获取采集的流
    4. val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReciver("localhost",9999))
    5. ds.print()
    6. ssc.start()
    7. ssc.awaitTermination()
    8. }
    9. // 继承extends Reciver,并指定数据泛型,同时对父类的属性赋值,指定数据存储的级别
    10. class MyReciver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {
    11. private var socket: Socket = _
    12. def receive = {
    13. // 获取输入流
    14. val reader = new BufferedReader(
    15. new InputStreamReader(
    16. socket.getInputStream,
    17. "UTF-8"
    18. )
    19. )
    20. // 设定一个间接变量
    21. var s: String = null
    22. while (true) {
    23. // 按行读取数据
    24. s = reader.readLine()
    25. if (s != null) {
    26. // 将数据进行封装
    27. store(s)
    28. }
    29. }
    30. }
    31. // 1. 启动采集器
    32. override def onStart(): Unit = {
    33. socket = new Socket(host, port)
    34. new Thread("Socket Receiver") {
    35. setDaemon(true)
    36. override def run() {
    37. receive
    38. }
    39. }.start()
    40. }
    41. // 2. 停止采集器
    42. override def onStop(): Unit = {
    43. socket.close()
    44. socket = null
    45. }
    46. }

    4 kafka数据源【重点】

    -- DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。
    -- 配置信息基本上是固定写法

    1. // TODO Spark环境
    2. // SparkStreaming使用核数最少是2个
    3. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")
    4. val ssc = new StreamingContext(sparkConf, Seconds(3))
    5. // TODO 使用SparkStreaming读取Kafka的数据
    6. // Kafka的配置信息
    7. val kafkaPara: Map[String, Object] = Map[String, Object](
    8. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop105:9092,hadoop106:9092,hadoop107:9092",
    9. ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
    10. "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
    11. "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    12. )
    13. val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
    14. KafkaUtils.createDirectStream[String, String](
    15. ssc,
    16. LocationStrategies.PreferConsistent,
    17. ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
    18. )
    19. // 获取数据,key是null,value是真实的数据
    20. val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())
    21. valueDStream.flatMap(_.split(" "))
    22. .map((_, 1))
    23. .reduceByKey(_ + _)
    24. .print()
    25. ssc.start()
    26. // 等待采集器的结束
    27. ssc.awaitTermination()

     

  • 相关阅读:
    3.5 讲一讲关于小红书的搜索引流技巧【玩赚小红书】
    ubuntu18.04 RTX3060 rangnet++训练 bonnetal语义分割
    Elasticsearch-8.4.2 集群安装
    Day39 LeetCode
    Java面试八股之什么是mybatis流式查询
    tp5使用sum()聚合函数分组查询
    PostgreSQL-HA 高可用集群在 Rainbond 上的部署方案
    蜂蜜配送销售商城小程序的作用是什么
    鸿蒙HarmonyOS $r(““)与$rawfile(““)的区别
    springboot毕设项目高校宿舍管理系统的设计与实现ukgdt(java+VUE+Mybatis+Maven+Mysql)
  • 原文地址:https://blog.csdn.net/jojo_oulaoula/article/details/133852399