• KafKa3.x基础


    来源:B站

    定义

    Kafka传统定义Kafka是一个分布式的基于发布/订阅模式的消息队列(MessageQueue),主要应用于大数据实时处理领域。
    发布/订阅消息的发布者不会将消息直接发送给特定的订阅者,而是将发布的消息分为不同的类别,订阅者只接收感兴趣的消息。
    Kafka最 新定义 : Kafka是 一个开源的分布式事件流平台 (Event Streaming Platform),被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用。

    消息队列

    目 前企 业中比 较常 见的 消息 队列产 品主 要有 Kafka、ActiveMQ 、RabbitMQ 、RocketMQ 等。
    在大数据场景主要采用 Kafka 作为消息队列。在 JavaEE 开发中主要采用 ActiveMQ、RabbitMQ、RocketMQ。

    传统消息队列的应用场景

    传统的消息队列的主要应用场景包括:缓存/消峰、解耦和异步通信

    • 缓冲/消峰:有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况
    • 解耦:允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
    • 异步通信:允许用户把一个消息放入队列,但并不立即处理它,然后在需要的时候再去处理它们。

    消息队列的两种模式

    • 1)点对点模式
      • 消费者主动拉取数据,消息收到后清除消息
        在这里插入图片描述
    • 2)发布/订阅模式
      • 可以有多个topic主题(浏览、点赞、收藏、评论等)
      • 消费者消费数据之后,不删除数据
      • 每个消费者相互独立,都可以消费到数据

    Kafka 基础架构

    在这里插入图片描述
    (1)Producer:消息生产者,就是向 Kafka broker 发消息的客户端。
    (2)Consumer:消息消费者,向 Kafka broker 取消息的客户端。
    (3)Consumer Group(CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。
    (4)Broker一台 Kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个broker 可以容纳多个 topic。
    (5)Topic:可以理解为一个队列,生产者和消费者面向的都是一个 topic。
    (6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列。
    (7)Replica:副本。一个 topic 的每个分区都有若干个副本,一个 Leader 和若干个Follower
    (8)Leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 Leader。
    (9)Follower:每个分区多个副本中的“从”,实时从 Leader 中同步数据,保持和Leader 数据的同步。Leader 发生故障时,某个 Follower 会成为新的 Leader。

    Kafka 命令行操作

    主题命令行操作

    1)查看操作主题命令参数

    [jjm@hadoop102 kafka]$ bin/kafka-topics.sh
    
    • 1
    参数描述
    –bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
    –topic 操作的 topic 名称。
    –create创建主题。
    –delete删除主题。
    –alter修改主题。
    –list查看所有主题。
    –describe查看主题详细描述。
    –partitions 设置分区数。
    –replication-factor设置分区副本。
    –config 更新系统默认的配置。

    2)查看当前服务器中的所有 topic

    [jjm@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --list
    
    • 1
    • 2

    3)创建 first topic

    [jjm@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --create --partitions 1 --replication-factor 3 --topic first
    
    • 1
    • 2

    选项说明:
    –topic 定义 topic 名
    –replication-factor 定义副本数
    –partitions 定义分区数
    4)查看 first 主题的详情

    [jjm@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --describe --topic first
    
    • 1
    • 2

    5)修改分区数(注意:分区数只能增加,不能减少

    [jjm@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --alter --topic first --partitions 3
    
    • 1
    • 2

    6)再次查看 first 主题的详情

    [jjm@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --describe --topic first
    
    • 1
    • 2

    7)删除 topic

    [jjm@hadoop102 kafka]$ bin/kafka-topics.sh --bootstrap-server 
    hadoop102:9092 --delete --topic first
    
    • 1
    • 2

    生产者命令行操作

    1)查看操作生产者命令参数

    [jjm@hadoop102 kafka]$ bin/kafka-console-producer.sh
    
    • 1
    参数描述
    –bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
    –topic 操作的 topic 名称

    2)发送消息

    [jjm@hadoop102 kafka]$ bin/kafka-console-producer.sh --bootstrap-server hadoop102:9092 --topic first
    >hello world
    >hello kafka
    
    • 1
    • 2
    • 3

    消费者命令行操作

    1)查看操作消费者命令参数

    [jjm@hadoop102 kafka]$ bin/kafka-console-consumer.sh
    
    • 1
    参数描述
    –bootstrap-server 连接的 Kafka Broker 主机名称和端口号。
    –topic 操作的 topic 名称。
    –from-beginning从头开始消费。
    –group 指定消费者组名称。

    2)消费消息
    (1)消费 first 主题中的数据。

    [jjm@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    
    • 1

    (2)把主题中所有的数据都读取出来(包括历史数据)。

    [jjm@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
    
    • 1

    Kafka 生产者

    生产者消息发送流程

    发送原理

    在消息发送的过程中,涉及到了两个线程——main 线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker
    在这里插入图片描述

    生产者重要参数列表

    参数名称描述
    bootstrap.servers生产者连接集群所需的 broker 地 址 清 单 。 例 如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1 个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。
    key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
    buffer.memory RecordAccumulator缓冲区总大小,默认 32m
    batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
    linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟生产环境建议该值大小为 5-100ms 之间。
    acks0:生产者发送过来的数据,不需要等数据落盘应答。1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
    max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5开启幂等性要保证该值是 1-5 的数字
    retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
    retry.backoff.ms两次重试之间的时间间隔,默认是 100ms
    enable.idempotence是否开启幂等性,默认 true,开启幂等性。
    compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd

    异步发送 API

    普通异步发送

    1)需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
    2)代码编写
    (1)创建工程 kafka
    (2)导入依赖

    <dependencies>
     <dependency>
     <groupId>org.apache.kafkagroupId>
     <artifactId>kafka-clientsartifactId>
     <version>3.0.0version>
     dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    (3)创建包名:com.jjm.kafka.producer
    (4)编写不带回调函数的 API 代码

    package com.jjm.kafka.producer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    public class CustomProducer {
    	 public static void main(String[] args) throws InterruptedException {
    		 // 1. 创建 kafka 生产者的配置对象
    		 Properties properties = new Properties();
    		 // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
    		 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    		 
    		 // key,value 序列化(必须):key.serializer,value.serializer 
    		 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    	"org.apache.kafka.common.serialization.StringSerializer");
    		 //或者可以使用下面这个,同理下面的value也是
    		 //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName();
    		 properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    		"org.apache.kafka.common.serialization.StringSerializer");
    		 // 3. 创建 kafka 生产者对象
    		 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    		 // 4. 调用 send 方法,发送消息
    		 for (int i = 0; i < 5; i++) {
    			 kafkaProducer.send(new ProducerRecord<>("first","jjm" + i));
    		 }
    		 // 5. 关闭资源
    		 kafkaProducer.close();
    	 }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    测试:
    ①在 hadoop102 上开启 Kafka 消费者。

    [jjm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    
    • 1

    ②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

    [jjm@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    jjm 0
    jjm 1
    jjm 2
    jjm 3
    jjm 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    带回调函数的异步发送

    回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。
    注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

    package com.jjm.kafka.producer;
    import org.apache.kafka.clients.producer.*;
    import java.util.Properties;
    public class CustomProducerCallback {
    	 public static void main(String[] args) throws InterruptedException {
    		// 1. 创建 kafka 生产者的配置对象
    		Properties properties = new Properties();
    		// 2. 给 kafka 配置对象添加配置信息
    		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
    		 // key,value 序列化(必须):key.serializer,value.serializer
    		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    		 
    		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    		// 3. 创建 kafka 生产者对象
    		KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    		 // 4. 调用 send 方法,发送消息
    		for (int i = 0; i < 5; i++) {
    			// 添加回调
    			kafkaProducer.send(new ProducerRecord<>("first", "jjm" + i), new Callback() {
    				// 该方法在 Producer 收到 ack 时调用,为异步调用
    				@Override
    				public void onCompletion(RecordMetadata metadata, Exception exception) {
    					if (exception == null) {
    						// 没有异常,输出信息到控制台
    						System.out.println(" 主题: " + 
    						metadata.topic() + "->" + "分区:" + metadata.partition());
    					 } else {
    						 // 出现异常打印
    						 exception.printStackTrace();
    					 }
    				 }
    			 });
    			 // 延迟一会会看到数据发往不同分区
    			 Thread.sleep(2);
    		 }
    		 // 5. 关闭资源
    		 kafkaProducer.close();
    	 }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    测试:
    ①在 hadoop102 上开启 Kafka 消费者。

    [jjm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    
    • 1

    ②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

    [jjm@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    atguigu 0
    atguigu 1
    atguigu 2
    atguigu 3
    atguigu 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ③在 IDEA 控制台观察回调信息。

    主题:first->分区:0
    主题:first->分区:0
    主题:first->分区:1
    主题:first->分区:1
    主题:first->分区:1
    
    • 1
    • 2
    • 3
    • 4
    • 5

    同步发送 API

    只需在异步发送的基础上,再调用一下 get()方法即可。

    package com.jjm.kafka.producer;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    public class CustomProducerSync {
    	public static void main(String[] args) throws InterruptedException, ExecutionException {
    	 // 1. 创建 kafka 生产者的配置对象
    	Properties properties = new Properties();
    	 // 2. 给 kafka 配置对象添加配置信息
    	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102
    	:9092");
    	 // key,value 序列化(必须):key.serializer,value.serializer
    	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    	 
    	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    	 // 3. 创建 kafka 生产者对象
    	KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
    	// 4. 调用 send 方法,发送消息
    	for (int i = 0; i < 10; i++) {
    		 // 异步发送 默认
    		// kafkaProducer.send(new 
    		ProducerRecord<>("first","kafka" + i));
    		 // 同步发送
    		kafkaProducer.send(new ProducerRecord<>("first","kafka" + i)).get();
    	 }
    	 // 5. 关闭资源
    	kafkaProducer.close();
    	 }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    测试:
    ①在 hadoop102 上开启 Kafka 消费者。

    [jjm@hadoop103 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    
    • 1

    ②在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息。

    [jjm@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    atguigu 0
    atguigu 1
    atguigu 2
    atguigu 3
    atguigu 4
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    生产者分区

    分区好处

    (1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
    (2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

    生产者发送消息的分区策略

    • 1)默认的分区器 DefaultPartitioner
      在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。
      (1)指明partition的情况下,直接将指明的值作为partition值;例如partition=0,所有数据写入分区0。
      (2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值
      例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那么key1 对应的value1写入1号分区,key2对应的value2写入0号分区。
      (3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partitio(黏性分区器),会随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或者已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。
      例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。
  • 相关阅读:
    STL:string容器操作
    Kotlin当中的小技巧
    leetcode-每日一题-119-杨辉三角2(简单,dp)
    去耦电路设计应用指南(二)电容的噪声抑制
    【面试专栏】java线程第一篇:Java线程、线程状态、线程方法
    流媒体传输 - RTP 协议
    基于servlet3.0搭建spring mvc应用 无web.xml 无spring boot
    【Python】Matplotlib-多张图像的显示
    工业控制系统安全标准
    乐趣国学—品读《弟子规》中的“亲仁”之道
  • 原文地址:https://blog.csdn.net/belle_mei/article/details/136183890