• Kafka消息队列


    Kafka消息队列

    一、概念

    Kafka是一个基于发布订阅者模式的消息队列,实现数据缓存、流量削峰等等功能。

    在大数据生态下,Kafka主要是用来和实时计算框架对接去处理海量的实时数据

    1.1 消息队列一般有两种架构

    1.1.1 点对点模式
    1.1.2 发布/订阅者模式

    二、架构

    2.1 生产者producer

    生产消息、数据

    Kafka的生产者,生产者是为了给topic的partition生产数据的,生产者可以是Flume,也可以是我们自定义的操作,还可以是Kafka自带的控制台生产者。 生产者生产的数据放到Kafka主题的哪个分区? 生产者生产的数据都是key和value格式类型的数据,只不过key值可以不存在。

    1. 如果生产的消息只有value,没有key,那么消息会采用轮询机制选址一个主题分区放入数据\

    2. 如果生产的消息key和value都有,但是没有指定分区机制,会按照key的hashcode值和topic的分区数取一个余数,放到对应的分区

    3. 如果生产的消息key和value都有。那么为了避免数据倾斜,我们可以自定义分区机制

      【注意】:kafka在生产数据的时候,每一条数据都会带着一个offset偏移量,消费数据的时候需要根据offset偏移量去读取对应的数据进行消费

    2.2 Kafka集群cluster

    2.2.1 Broker

    Kafka集群的一个节点,每一个broker节点都会有一个唯一的编号

    2.2.2 Topic

    主题,就是消息队列中的消息队,一个Broker中可以存在多个主题,一个主题也可以存在于多个Broker上

    Kafka中消息主题,就是消息队列,是Kafka用来存储消息的组件,topic中存放的数据是有序的

    2.2.3 Partition

    分区,每一个Topic主题都可以指定存储的分区数,一般情况下,一个Broker会存储一个主题的一个分区数据,而且每一个分区还可以设置副本数保证存储数据的安全性,分区和分区副本之间有一个主从架构关系。分区副本数不能随便设置,必须小于等于broker的数量。

    Topic存放的消息最底层是以分区的形式存在的,Topic所谓的数据有序,不是整体有序,而是每一个分区内部是有序的。分区设置副本机制的,副本数量必须小于等于broker的节点数量 Kafka主题分区的数据不是永久存在的,而是有一个数据清理机制(基于时间的清理机制、基于分区数据大小的清理机制)

    2.2.4 zookeeper

    Kafka中主题、分区、消费者等等元数据信息都是交给zookeeper统一管理的

    2.3 消费者Consumer

    订阅主题,消费数据

    消费者:消费数据的最小单位,但是一个消费者可以订阅多个topic的数据

    2.4 消费者组Consumer Group

    将多个消费者组合起来,同时消费同一个主题的数据

    消费者组:一个消费者组可以有多个消费者,其中topic一个分区的数据只能被消费者组的一个消费者消费,如果我们想要让一个消费者消费topic所有分区的数据,那么我们需要保证消费者组中只有一个消费者。

    三、Kafka的安装

    3.1 解压、重命名

    3.2 修改Broker配置文件

    server.properties

    3.2.1 启用删除主题的功能
    3.2.2、logs日志文件目录的配置
    3.2.3、分区日志文件的滚动和删除规则
    3.2.4、broker的编号
    3.2.5、配置zookeeper的地址

    四、Kafka的启动和关闭

    4.1 启动

    4.1.1 启动zookeeper
    4.1.2 启动kafka

    kafka-server-start.sh /opt/xxxx/server.properties &

    4.2 关闭

    kafka-server-stop.sh

    五、kafka的基本使用

    5.1 Kafka的命令行操作方式

    5.1.1 主题的操作
    5.1.1.1 创建主题

    kafka-topic.sh --create --topic topicName --partitions num --replication-factor num<=borkerCount --zookeeper zkserverxxx

    5.1.1.2删除主题

    kafka-topic.sh --delete --topic topicName --zookeeper xxx 必须开启主题的删除功能

    5.1.1.3修改主题

    kafka-topic.sh --alter --topic topicName --partitions num --zookeeper xxx 主题分区数一般只能增加

    5.1.1.4查询某个主题的详细信息

    kafka-topic.sh --describe --topic topicName --zookeeper xxx

    5.1.1.5查询所有的主题

    kafka-topic.sh --list --zookeeper xxx

    5.1.2 生产者的操作

    kafka-console-producer.sh --bootstrap-server ip:9092,ip:9092 --topic topicName

    5.1.3 消费者的操作

    kafka-console-consumer.sh --bootstrap-server ip:9092 --from-beginning --topic topicName --group groupName

    5.2 Kafa的Java API操作方法

    5.2.1 主题的操作

    主要通过AdminClient类来实现kafka的各种操作

    创建AdminClient类需要写一个配置项 bootstrap.servers

    5.2.2 生产者的操作

    生产者:KafkaProducer ProducerRecord

    生产者创建需要赋予一些参数:参数的key都是在ProducerConfig类中封装的

    参数作用
    key.serializer生产的key值的序列化类型
    value.serializer生产的value值得序列化类型
    bootstrap.serverskafka服务的列表
    5.2.3 消费者的操作

    消费者:KafkaConsumer ConsumerRecords ConsumerRecord

    消费者创建需要赋予一些参数:参数的key都是ConsumerConfig类中封装的

    参数作用
    key.deserializer消息的key的反序列化类型
    value.deserializer消息的value的反序列化类型
    bootstrap.serverskafka服务的列表
    group.id当前消费者所属的消费者组
    auto.offset.reset消费者消费消息的时候是从头开始消息还是从最新的位置开始消费的核心,两个取值earliest、latest
    5.2.4 earliest、latest区别

    【earliest、latest区别】: earliest当消费者所属的消费者组没有任何的消费记录,从头开始消费 latest当消费者所属的消费者组没有任何的消费记录,从最新的位置开始消费 如果他们所属的消费者组有消费记录,那么他们两者都是从消费记录的位置继续开始消费

    六、kafka的可视化监控工具

    offset explorer

    kafka eagle

    七、kafka和Flume的整合

    后期我们在做实时计算的时候,我们经常会做如下操作,会通过Flume采集相关数据到Kafka中缓存,然后再使用实时计算框架对接kafka进行计算。

    Flume采集的数据给kafka,那么此时也就意味着Flume就相当于是Kafka的生产者,kafka相当于是Flume的sink下沉地

    kafka除了当作flume的sink,也可以充当flume的source和channel

    八、Kafka和Spark Streaming的整合

    通过Spark Streaming消费Kafka中的数据,然后对数据进行实时计算处理

    8.1 Spark Streaming整合Kafka有两种方式

    8.1.1 Receiver模式

    采用一个Reciver接受器去接受Kafka的数据,然后数据缓存到Spark的executor内存中,这种方式很容易出现数据丢失问题,如果想要实现数据的安全性,需要开启Spark的 WAL预写日志机制保证数据的安全性 receiver模式连接的zookooper实现

    8.1.2 Direct模式

    不需要接收器,直接连接Kafka节点获取数据,同时由Spark自动维护offset偏移量,此时我们不需要担心数据丢失。

    8.2 整合步骤

    8.2.1 引入一个编程依赖

    spark-streaming-kafka-0.10/0.8 在Spark3版本之后,在KafkaUtils中把receiver模式移除了

    九、整合案例的流程

    想使用Flume采集端口的数据(以空格分割的单词)到kafka的某个主题中,然后借助Spark Streaming统计端口数据中每一个单词出现的总次数。

    9.1 分析

    9.1.1 编写Flume脚本

    编写Flume脚本采集端口数据到Kafka中 sink指定到kafka即可,flume充当kafka的生产者

    9.1.2 整合Spark Streaming

    整合Spark Streaming代码读取kafka中的数据,此时Spark相当于kafka的消费者

    十、相关代码

    #1、起别名
    demo.sources=s1
    demo.channels=c1
    demo.sinks=k1
    
    #2、配置source数据源
    demo.sources.s1.type=netcat
    demo.sources.s1.bind=single
    demo.sources.s1.port=44444
    
    #3、配置channel管道
    demo.channels.c1.type=memory
    demo.channels.c1.capacity=20000
    demo.channels.c1.transactionCapacity=10000
    
    #4、配置sink下沉地
    demo.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    demo.sinks.k1.kafka.bootstrap.servers=single:9092
    demo.sinks.k1.kafka.topic=flume-topic
    
    # 5、关联
    demo.sources.s1.channels=c1
    demo.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    a.sources=s1
    a.channels=c1
    a.sinks=k1
    
    a.sources.s1.type=org.apache.flume.source.kafka.KafkaSource
    a.sources.s1.kafka.bootstrap.servers=single:9092
    a.sources.s1.kafka.consumer.group.id=flume
    a.sources.s1.kafka.topics=flume-topic
    
    a.channels.c1.type=memory
    a.channels.c1.capacity=20000
    a.channels.c1.transactionCapacity=10000
    
    a.sinks.k1.type=logger
    
    a.sources.s1.channels=c1
    a.sinks.k1.channel=c1
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  • 相关阅读:
    斯坦福计算机视觉cs231n-assignment3总结
    设计模式之状态模式
    K8S自动化运维容器Docker集群
    python聚类分析如何可视化?
    基于node.js的学生管理系统设计
    代码随想录算法训练营第四十一天| LeetCode343. 整数拆分、LeetCode96. 不同的二叉搜索树
    SQL 语言数据操纵语言 DML
    YOLOv8-Seg改进:轻量级Backbone改进 | VanillaNet极简神经网络模型 | 华为诺亚2023
    从爆款IP到独家IP,汽车之家内容战略如何登高?
    【滤波跟踪】基于北方苍鹰和粒子群算法优化粒子滤波器实现目标滤波跟踪附matlab代码
  • 原文地址:https://blog.csdn.net/cai_4/article/details/133758745