• Flink消费Kafka主题消息的演示


    一、说明


    本案例实现的功能统计对Kafka中的消息单词出现的次数,即词频统计。其主要演示了Flink流式程序消费kafka中的消息,其目的想让初学者了解Flink如何编写消费Kafka中消息的程序以及通过程序的演示来进一步学习flink。

    实验环境:

    1. Kafka版本是:kafka_2.11-2.3.1
    2. Flink版本是:flink-1.10.1
    3. Kafka单节点部署在slave01上(安装过程可以参考我的其他博文)
    4. 开发工具IDEA

    二、编写程序


    1. 创建maven项目
    2. 添加flink依赖,可参考Flink的流批WordCount入门案例
    3. 添加Kafka的依赖到pom.xml中,将如下内容添加至pom.xml中:
      <dependency>
              <groupId>org.apache.flink</groupId>
              <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
              <version>1.10.1</version>
          </dependency>
      
      • 1
      • 2
      • 3
      • 4
      • 5
    4. 编写程序,完成代码如下所示:
      import org.apache.flink.api.common.serialization.SimpleStringSchema
      import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, createTypeInformation}
      import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
      
      import java.util.Properties
      
      object SourceFromKafka {
        def main(args: Array[String]): Unit = {
          val  env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
          val properties = new Properties()
          properties.setProperty("bootstrap.servers", "slave01:9092")
          properties.setProperty("group.id", "consumer-group")
          properties.setProperty("key.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer")
          properties.setProperty("value.deserializer",
            "org.apache.kafka.common.serialization.StringDeserializer")
          properties.setProperty("auto.offset.reset", "latest")
          val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new
              SimpleStringSchema(), properties))
      
          stream3.flatMap(_.split(" "))
            .map((_,1))
            .keyBy(0)
            .sum(1)
            .print()
      
          env.execute("SourceFromKafka Job starting……")
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29

    三、运行演示


    1. 启动Kafka,在Kafka安装目录下的config目录下执行如下命令即可:
      kafka-server-start.sh ./server.properties &
      
      • 1
    2. 创建Kafka主题sensor,执行如下命令:
      kafka-topics.sh --create --zookeeper master:2181,slave01:2181,slave02:2181,--replication-factor 1 --partitions 2 --topic sensor
      
      • 1
    3. 启动Kafka控制台生产者,用于生成数据,执行如下命令:
      kafka-console-producer.sh --broker-list slave01:9092 --topic sensor
      
      • 1
    4. 在IDEA中运行我们写好的程序,成功后如下图所示:
      在这里插入图片描述
    5. Kafka生产者控制台界面中分别输入如下数据:

      hello world
      hello world
      hello world
      hello world
      hello world

    6. 观察IDEA中程序控制台中输出的结果,如下图所示:
      在这里插入图片描述

    四、打包部署到服务器


    1. IDEA中右侧菜单:maven-clean-compile-package
    2. 将以下的Kafka依赖jar包上传至flink安装目录下的lib目录下:下载链接
      1. kafka-clients-2.3.1.jar
      2. flink-connector-kafka-base_2.12-1.10.1.jar
      3. flink-connector-kafka-0.11_2.12-1.10.1.jar
      4. 如果以上三个jar依赖不行,根据报错,可以将下列两个jar包也上传至flink安装目录下的lib目录:
        flink-connector-kafka-0.10_2.12-1.10.1.jar
        flink-connector-kafka-0.9_2.12-1.10.1.jar
        完整下载
    3. 打开Flink的WebUI,并将程序包上传到服务器
    4. 查看程序运行结果
  • 相关阅读:
    使用“文心一言”编写技术博文《搭建企业知识库:基于 Wiki.js 的实践指南》
    springboot快速开发web CRUD
    基于surging网络组件多协议适配的平台化发展
    JavaScript:实现 Polynomials多项式算法 (附完整源码)
    如何批量压缩图片大小?教你3个批量压缩图片的方法
    中国制霸生成器「GitHub 热点速览 v.22.42」
    表格内日期比较计算
    skywalking部署
    智能工厂落地的三种模式,你了解多少?
    简单剖析程序的翻译过程!
  • 原文地址:https://blog.csdn.net/sujiangming/article/details/125451445