• kafka集成spark


    1.新建Scala项目

    具体教程可见在idea中创建Scala项目教程-CSDN博客

    1.1右键项目名-添加框架支持-勾选scala

    1.2main目录下新建scala目录-右键Scala目录-将目录标记为-勾选源代码根目录

    1.3创建包com.ljr.spark

    1.4引入依赖(pox.xml)

    1. org.apache.spark
    2. spark-streaming-kafka-0-10_2.12
    3. 3.0.0
    4. org.apache.spark
    5. spark-core_2.12
    6. 3.0.0
    7. org.apache.spark
    8. spark-streaming_2.12
    9. 3.0.0

    1.5把spark conf/目录下的log4j.properties 复制到项目的resources目录

    2.集成spark生产者

    新建SparkKafkaProducer (注意选择的是object而不是class)

    1. package com.ljr.spark
    2. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    3. import org.apache.kafka.common.serialization.StringSerializer
    4. import java.util.Properties
    5. object SparkKafkaProducer {
    6. def main(args: Array[String]): Unit = {
    7. //1 属性配置
    8. val pros = new Properties()
    9. pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092")
    10. pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    11. pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    12. //2 创建生产者
    13. val producer = new KafkaProducer[String, String](pros)
    14. //3 发送数据
    15. for (i <- 1 to 5) {
    16. producer.send(new ProducerRecord[String,String]("customers","Lili" + i))
    17. }
    18. //4 关闭资源
    19. producer.close()
    20. }
    21. }

    运行,开启Kafka 消费者消费数据

    kafka-console-consumer.sh --bootstrap-server node1:9092 --topic customers

    能接收到信息,可见spark作为生产者集成Kafka成功

    3.集成spark消费者

    1. package com.ljr.spark
    2. import org.apache.kafka.clients.consumer.ConsumerConfig
    3. import org.apache.kafka.common.serialization.StringDeserializer
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    6. import org.apache.spark.streaming.{Seconds, StreamingContext}
    7. object SparkKafkaConsumer {
    8. def main(args: Array[String]): Unit = {
    9. //1 初始化上下文环境
    10. val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
    11. val sc = new StreamingContext(conf, Seconds(3))
    12. //2 消费数据
    13. val kafkapara = Map[String, Object](
    14. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"node1:9092,node2:9092",
    15. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
    16. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
    17. ConsumerConfig.GROUP_ID_CONFIG->"KFK-SP"
    18. )
    19. val kafkaDstream = KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("customers"), kafkapara))
    20. val valueDstream = kafkaDstream.map(record => record.value())
    21. valueDstream.print()
    22. //3 执行代码并阻塞
    23. sc.start()
    24. sc.awaitTermination()
    25. }
    26. }

    运行,

    开启Kafka 生产者生产数据

    kafka-console-producer.sh.sh --bootstrap-server node1:9092 --topic customers

    控制台可以消费到数据,可见spark作为消费者集成Kafka成功。

  • 相关阅读:
    首发AI原生应用开发平台——千帆AI原生应用开发工作台,加速企业AI应用落地
    【LeetCode 1758】生成交替二进制字符串的最少操作数
    Rust 函数
    《向量数据库指南》——选择向量数据库时需要考量的点Milvus Cloud
    (4) OpenCV图像处理kNN近邻算法-识别数字0和1
    【微服务容器化】第二章-Docker常用命令
    flask自定义序列化
    二维数组的最小路径和问题
    京东大型API网关实践之路
    Codeforces 398C LRU (概率dp)
  • 原文地址:https://blog.csdn.net/v15220/article/details/139576642