• kafka集成spark


    1.新建Scala项目

    具体教程可见在idea中创建Scala项目教程-CSDN博客

    1.1右键项目名-添加框架支持-勾选scala

    1.2main目录下新建scala目录-右键Scala目录-将目录标记为-勾选源代码根目录

    1.3创建包com.ljr.spark

    1.4引入依赖(pox.xml)

    1. org.apache.spark
    2. spark-streaming-kafka-0-10_2.12
    3. 3.0.0
    4. org.apache.spark
    5. spark-core_2.12
    6. 3.0.0
    7. org.apache.spark
    8. spark-streaming_2.12
    9. 3.0.0

    1.5把spark conf/目录下的log4j.properties 复制到项目的resources目录

    2.集成spark生产者

    新建SparkKafkaProducer (注意选择的是object而不是class)

    1. package com.ljr.spark
    2. import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
    3. import org.apache.kafka.common.serialization.StringSerializer
    4. import java.util.Properties
    5. object SparkKafkaProducer {
    6. def main(args: Array[String]): Unit = {
    7. //1 属性配置
    8. val pros = new Properties()
    9. pros.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092")
    10. pros.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    11. pros.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,classOf[StringSerializer])
    12. //2 创建生产者
    13. val producer = new KafkaProducer[String, String](pros)
    14. //3 发送数据
    15. for (i <- 1 to 5) {
    16. producer.send(new ProducerRecord[String,String]("customers","Lili" + i))
    17. }
    18. //4 关闭资源
    19. producer.close()
    20. }
    21. }

    运行,开启Kafka 消费者消费数据

    kafka-console-consumer.sh --bootstrap-server node1:9092 --topic customers

    能接收到信息,可见spark作为生产者集成Kafka成功

    3.集成spark消费者

    1. package com.ljr.spark
    2. import org.apache.kafka.clients.consumer.ConsumerConfig
    3. import org.apache.kafka.common.serialization.StringDeserializer
    4. import org.apache.spark.SparkConf
    5. import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    6. import org.apache.spark.streaming.{Seconds, StreamingContext}
    7. object SparkKafkaConsumer {
    8. def main(args: Array[String]): Unit = {
    9. //1 初始化上下文环境
    10. val conf = new SparkConf().setMaster("local[*]").setAppName("spark-kafka")
    11. val sc = new StreamingContext(conf, Seconds(3))
    12. //2 消费数据
    13. val kafkapara = Map[String, Object](
    14. ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"node1:9092,node2:9092",
    15. ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
    16. ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG->classOf[StringDeserializer],
    17. ConsumerConfig.GROUP_ID_CONFIG->"KFK-SP"
    18. )
    19. val kafkaDstream = KafkaUtils.createDirectStream(sc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Set("customers"), kafkapara))
    20. val valueDstream = kafkaDstream.map(record => record.value())
    21. valueDstream.print()
    22. //3 执行代码并阻塞
    23. sc.start()
    24. sc.awaitTermination()
    25. }
    26. }

    运行,

    开启Kafka 生产者生产数据

    kafka-console-producer.sh.sh --bootstrap-server node1:9092 --topic customers

    控制台可以消费到数据,可见spark作为消费者集成Kafka成功。

  • 相关阅读:
    udp Socket组播 服务器
    文本和二进制混合存储
    Markdown 画图学习
    MySQL 连接报错,致命错误怎么解决呢?
    c++练习(10):链表练习
    【C语言】指针的入门详细介绍
    CentOS Stream 9 设置
    Halcon 3D 切片法检测物料
    3 开源鸿蒙OpenHarmony4.1源码下载、编译,生成OHOS_Image可执行文件的最简易流程
    Web基础与http协议
  • 原文地址:https://blog.csdn.net/v15220/article/details/139576642