• kafka笔记(二):生产者-同步发送/异步发送/生产者分区/数据可靠性/数据去重/消息发送流程


    目录

    kafka生产者

    生产者消息发送流程

    发送原理

    生产者重要参数列表

    异步发送API

    普通异步发送

    带回调函数的异步发送

    同步发送API

    生产者分区

    分区好处

    生产者发送消息的分区策略

    自定义分区器

    生产经验—提高生产者的吞吐量

    生产经验—数据可靠性

    生产经验—数据去重

    数据传递语义

    幂等性

    生产者事务

    生产经验—数据有序

    生产经验—数据乱序


    kafka生产者

    生产者消息发送流程

    发送原理

    在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

    batch.size:只有数据积累到batch.size之后,sender才会发送数据。默认16k。

    linger.ms:如果数据迟迟未达到batch.size,sender等待linger.ms设置的时间到了之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。

    应答acks:

    0:生产者发送过来的数据,不需要等数据落盘应答。 

    1:生产者发送过来的数据,Leader收到数据后应答。

    -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。-1和all等价。

    生产者重要参数列表

    参数名称描述
    bootstrap.servers

    生产者连接集群所需的broker地址清单。例如:hadoop01:9092,hadoop02:9092,hadoop03:9092,可以设置1个或者多个,中间用逗号隔开。注意,这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

    key.serializer和value.serializer

    指定发送消息的key和value的序列化类型。(注:要写全列名)

    buffer.memory

    RecordAccumulator缓冲区总大小,默认32M。

    batch.size

    缓冲区一批数据的最大值,默认16k。适当增加该值可以提高吞吐量,但是如果该值设置太大会导致数据传输延迟增加。

    linger.ms

    如果数据迟迟未达到batch.size,sender等待linger.time设置的时间到了就会发送数据。单位ms,默认值是0ms:表示没有延迟。

    生产环境建议该值大小为5-100ms之间。

    acks

    0:生产者发送过来的数据,不需要等数据落盘应答。

    1:生产者发送过来的数据,Leader收到数据后应答。

    -1(all):生产者发送过来的数据,Leader和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

    max.in.flight.requests.per.connection

    允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是1-5的数字。

    retry.backoff.ms

    两次重试之间的时间间隔,默认是100ms。

    enable.idempotence

    是否开启幂等性,默认true,开启幂等性。

    compression.type

    生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

    支持压缩类型:none、gzip、snappy、lz4和zstd。

    retries

    当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值:2147483647。

    如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

    异步发送API

    普通异步发送

    (1)需求:创建kafka生产者,采用异步发送的方式发送到kafka broker。

    (2)代码编写:

    1)创建maven工程

    2)导入依赖

    1. <dependencies>
    2. <dependency>
    3. <groupId>org.apache.kafka</groupId>
    4. <artifactId>kafka-clients</artifactId>
    5. <version>2.4.1</version>
    6. </dependency>
    7. </dependencies>
    8. <build>
    9. <plugins>
    10. <plugin>
    11. <artifactId>maven-compiler-plugin</artifactId>
    12. <version>3.6.0</version>
    13. <configuration>
    14. <source>1.8</source>
    15. <target>1.8</target>
    16. </configuration>
    17. </plugin>
    18. <plugin>
    19. <artifactId>maven-assembly-plugin</artifactId>
    20. <configuration>
    21. <descriptorRefs>
    22. <descriptorRef>jar-with-dependencies</descriptorRef>
    23. </descriptorRefs>
    24. </configuration>
    25. <executions>
    26. <execution>
    27. <id>make-assembly</id>
    28. <phase>package</phase>
    29. <goals>
    30. <goal>single</goal>
    31. </goals>
    32. </execution>
    33. </executions>
    34. </plugin>
    35. </plugins>
    36. </build>

    3)创建包

    4)编写API代码

    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import java.util.Properties;
    5. public class CustomProducer {
    6. public static void main(String[] args) {
    7. /**
    8. 编写不带回调函数的API代码
    9. */
    10. //1.创建kafka生产者的配置对象
    11. Properties properties = new Properties();
    12. //2.给kafka配置对象添加配置信息:bootstrap.server
    13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    14. //key,value序列化:key.serializer,value.serializer
    15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    16. "org.apache.kafka.common.serialization.StringSerializer");
    17. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    18. "org.apache.kafka.common.serialization.StringSerializer");
    19. //3.创建kafka生产者对象
    20. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    21. //4.调用send方法,发送信息
    22. for (int i = 0;i < 5;i++) {
    23. kafkaProducer.send(new ProducerRecord<>("first","hello kafka" + i));
    24. }
    25. //5.关闭资源
    26. kafkaProducer.close();
    27. }
    28. }

    (3)测试

    1)开启kafka

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    2)执行idea的代码,查看kafka是否接收到消息

    带回调函数的异步发送

    回调函数会在producer收到ack时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果Exceptionnull说明消息发送成功,如果Exception不为null说明消息发送失败。

    注:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

    (1)代码编写

    1. import org.apache.kafka.clients.producer.*;
    2. import org.apache.kafka.common.serialization.StringSerializer;
    3. import java.util.Properties;
    4. public class CustomProducerCallBack {
    5. public static void main(String[] args) throws InterruptedException {
    6. /**
    7. * 编写回调函数异步发送API代码
    8. */
    9. //1.创建kafka生产者环境
    10. Properties properties = new Properties();
    11. //2.给kafka配置对象添加配置信息:bootstrap.server
    12. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    13. //key.value序列化:key.serializer,value.serializer
    14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    15. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    16. //3.创建kafka生产者对象
    17. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    18. //4.调用send方法,发送信息
    19. for (int a = 0;a < 5; a++) {
    20. kafkaProducer.send(new ProducerRecord<>("first", "hello mykafka" + a), new Callback() {
    21. //在kafkaProducer接收到ack时调用,异步调用
    22. @Override
    23. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    24. if (e == null) {
    25. System.out.println("主题:"+ recordMetadata.topic() + "->"+"分区:"+recordMetadata.partition());
    26. } else {
    27. //出现异常打印
    28. e.printStackTrace();
    29. }
    30. }
    31. });
    32. //延迟一会会看到数据发往不同的分区
    33. Thread.sleep(2);
    34. }
    35. //5.关闭环境
    36. kafkaProducer.close();
    37. }
    38. }

    (2)测试

    1)开启消费者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    2)执行idea的代码,查看控制台中是否有接收到回调消息

    3)查看kafka是否接收到消息

    同步发送API

    只需在异步发送的基础上再调用一下get()方法即可。

    (1)代码编写

    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import org.apache.kafka.common.serialization.StringSerializer;
    5. import java.util.Properties;
    6. import java.util.concurrent.ExecutionException;
    7. public class CustomProducerS {
    8. public static void main(String[] args) throws ExecutionException,InterruptedException {
    9. /**
    10. * 同步发送API
    11. */
    12. //1.创建kafka生产者的配置环境
    13. Properties properties = new Properties();
    14. //2.给kafka配置对象添加配置信息
    15. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    16. //key,value序列化:key.serializer,value.serializer
    17. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    18. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    19. //3.创建kafka生产者对象
    20. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    21. //4.调用send方法,发送消息
    22. for (int i = 0; i < 8; i++) {
    23. //异步发送(默认)
    24. //kafkaProducer.send(new ProducerRecord<>("first","hello!"+i));
    25. //同步发送
    26. kafkaProducer.send(new ProducerRecord<>("first","kafka"+i)).get();
    27. }
    28. //5.关闭资源
    29. kafkaProducer.close();
    30. }
    31. }

    (2)测试

    1)开启消费者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    2)执行idea的代码,查看是否有接收到消息

    生产者分区

    分区好处

    (1)便于合理使用存储资源:每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

    (2)提高并行度:生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费数据。

    生产者发送消息的分区策略

    (1)默认分区器:DefaultPartitioner

    1)指明partition的情况下,直接将指明的值作为partition值;

    例如partition=0,所有数据写入分区0。

    2)没有指明partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值;

    例如:key1的hash值=5, key2的hash值=6 ,topic的partition数=2,那

    么key1对应的value1写入1号分区,key2对应的value2写入0号分区。

    3)既没有partition值又没有key值的情况下,Kafka采用Sticky Partition(黏性分区器)随机选择一个分区,并尽可能一直使用该分区,待该分区的batch已满或已完成,Kafka再随机一个分区进行使用(和上一次的分区不同)。

    例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

    (2)案例一

    将数据发送指定partition,如:将数据指定发送分区0中。

    1)代码

    1. import org.apache.kafka.clients.producer.*;
    2. import org.apache.kafka.common.serialization.StringSerializer;
    3. import java.util.Properties;
    4. import java.util.concurrent.ExecutionException;
    5. public class CustomProducerPartitions {
    6. public static void main(String[] args) throws ExecutionException,InterruptedException{
    7. /**
    8. * 将数据发送到指定分区
    9. * 将所有数据发送到分区0中
    10. */
    11. //1.创建kafka生产者的配置对象
    12. Properties properties = new Properties();
    13. //2.给kafka配置对象添加配置信息
    14. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    15. //key,value序列化:key.serializer,value.serializer
    16. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    17. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    18. //3.创建kafka生产者对象
    19. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    20. //4.调用send方式,方式消息
    21. for (int i = 0; i < 5; i++) {
    22. //指定数据发送到1分区
    23. kafkaProducer.send(new ProducerRecord<>("first", 0, "", "hi spark" + i), new Callback() {
    24. @Override
    25. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    26. if (e == null) {
    27. System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
    28. } else {
    29. e.printStackTrace();
    30. }
    31. }
    32. });
    33. }
    34. //5.关闭环境
    35. kafkaProducer.close();
    36. }
    37. }

    2)测试

    开启kafka消费者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    执行idea中代码,观察控制台和kafka的消息

    控制台

    kafka

    (3)案例二

    没有指定partition值但是有key的情况,将key的hash值与topic的partition数进行取余得到partition值

    1)代码

    1. import org.apache.kafka.clients.producer.*;
    2. import org.apache.kafka.common.serialization.StringSerializer;
    3. import java.util.Properties;
    4. public class CustomProducerPartitions2 {
    5. public static void main(String[] args) {
    6. /**
    7. * 没有指定partition值但是有key的情况
    8. * 将key的hash值与topic的partition数进行取余得到partition值
    9. */
    10. //1.创建kafka生产者的配置对象
    11. Properties properties = new Properties();
    12. //2.给kafka配置对象添加配置信息
    13. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    14. //key,value序列化:key.serializer,value.serializer
    15. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    17. //3.创建kafka生产者对象
    18. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    19. //4.调用send方式,方式消息
    20. for (int i = 0; i < 5; i++) {
    21. //
    22. kafkaProducer.send(new ProducerRecord<>("first", "a","world" + i), new Callback() {
    23. @Override
    24. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    25. if (e == null) {
    26. System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
    27. } else {
    28. e.printStackTrace();
    29. }
    30. }
    31. });
    32. }
    33. //5.关闭环境
    34. kafkaProducer.close();
    35. }
    36. }

    2)测试

    开启kafka消费者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    key=a        查看控制台结果

    key=b        查看控制台结果

    key=c        查看控制台结果

     key=f        查看控制台结果

    自定义分区器

    根据需求可以自己重新实现分区器

    (1)需求:实现一个分区器,发送的数据出现jeffry就发送到分区1,否则发送到分区2.

    (2)实现:

    1)代码

    partition:

    1. import org.apache.kafka.clients.producer.*;
    2. import org.apache.kafka.common.Cluster;
    3. import java.util.Map;
    4. /**
    5. * 1.实现接口partitioner
    6. * 2.实现三个方法:partition,close,configure
    7. * 3.编写partition方法,返回分区号
    8. */
    9. public class MyPartition implements Partitioner {
    10. /**
    11. *
    12. * @param topic 主题
    13. * @param key 消息的key
    14. * @param keyBytes 消息的key序列化后的字节数组
    15. * @param value 消息的value
    16. * @param valueBytes 消息的value序列化后的字节数组
    17. * @param cluster 集群元数据可以查看分区信息
    18. * @return
    19. */
    20. @Override
    21. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    22. //获取消息
    23. String msgValue = value.toString();
    24. //创建partition
    25. int partition;
    26. //判断消息是否包含jeffry
    27. if (msgValue.contains("jeffry")) {
    28. partition = 1;
    29. } else {
    30. partition = 2;
    31. }
    32. //返回分区号
    33. return partition;
    34. }
    35. //关闭资源
    36. @Override
    37. public void close() {
    38. }
    39. //配置方法
    40. @Override
    41. public void configure(Map configs) {
    42. }
    43. }

    测试自定义partition:

    1. import org.apache.kafka.clients.producer.*;
    2. import org.apache.kafka.common.serialization.StringSerializer;
    3. import java.util.Properties;
    4. public class CustomProducerMyPartitions {
    5. public static void main(String[] args) {
    6. /**
    7. * 测试自定义partition
    8. */
    9. Properties properties = new Properties();
    10. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    11. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    12. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    13. properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zj.kafka.MyPartition");
    14. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    15. for (int i = 0;i < 5;i++) {
    16. kafkaProducer.send(new ProducerRecord<>("first", "jeffry" + i), new Callback() {
    17. @Override
    18. public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    19. if (e == null) {
    20. System.out.println("主题:" + recordMetadata.topic() + "->" + "分区:" + recordMetadata.partition());
    21. } else {
    22. e.printStackTrace();
    23. }
    24. }
    25. });
    26. }
    27. kafkaProducer.close();
    28. }
    29. }

    2)测试

    开启kafka消费者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    在idea启动执行程序

    查看控制台的信息

     查看kafka的信息

     

    生产经验—提高生产者的吞吐量

    batch.size:批次大小,默认16k;

    linger.ms:等待时间,修改为5-100ms;

    compression.type:压缩形式为snappy;

    RecordAccumulator:缓冲区大小,修改为64m;

    (1)代码

    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import org.apache.kafka.common.serialization.StringSerializer;
    5. import java.util.Properties;
    6. public class Parameters {
    7. public static void main(String[] args) {
    8. //创建kafka生产者配置对象
    9. Properties properties = new Properties();
    10. //给kafka配置对象添加配置信息
    11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    12. //key,value序列化
    13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    15. //batch.size:批次大小,默认16K
    16. properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
    17. //linger.ms:等待时间,默认0
    18. properties.put(ProducerConfig.LINGER_MS_CONFIG,1);
    19. //RecordAccumulator:缓冲区大小,默认32M buffer。memory
    20. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
    21. //compression.type:压缩,默认none,可以配置gzip、snappy、lzo、zstd
    22. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
    23. //创建kafka生产者对象
    24. KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
    25. //调用send方法,发送信息
    26. for (int i = 0;i < 5;i++) {
    27. kafkaProducer.send(new ProducerRecord<>("first","jeffryOne" + i));
    28. }
    29. //关闭环境
    30. kafkaProducer.close();
    31. }
    32. }

    (2)测试

    1)开启kafka消费者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    2)在idea中启动执行程序

    观察kafka是否接收到消息

    生产经验—数据可靠性

    (1)ack应答原理

     

    数据可靠性分析:

    如果分区副本设置为1个,或者ISR里应答的最小副本数量设置为1( min.insync.replicas默认为1,和ack=1的效果是一样的,仍然有丢数的风险(leader:0,isr:0)。

    数据完全可靠条件 = ACK级别为-1+分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

    可靠性总结:

    acks=0,生产者发送过来数据就不管了,可靠性差,效率高;

    acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;

    acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低;

    在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1一般用于传输和钱相关的数据,对可靠性要求比较高的场景。

    数据重复分析:

    acks: -1(all):生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。

    (2)代码编写

    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import org.apache.kafka.common.serialization.StringSerializer;
    5. import java.util.Properties;
    6. public class CustomProducerAck {
    7. public static void main(String[] args) {
    8. //创建kafka生产者的配置对象
    9. Properties properties = new Properties();
    10. //给kafka配置对象添加配置信息
    11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    12. //keyvalue序列化
    13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    15. //设置acks
    16. properties.put(ProducerConfig.ACKS_CONFIG,"all");
    17. //重试次数retries。默认是int最大的值:2147483647
    18. properties.put(ProducerConfig.RETRIES_CONFIG,5);
    19. //创建kafka生产者对象
    20. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    21. //调用send方法,发送信息
    22. for (int i = 0;i < 5;i++) {
    23. kafkaProducer.send(new ProducerRecord<>("first","jeffry" + i));
    24. }
    25. //关闭环境
    26. kafkaProducer.close();
    27. }
    28. }

    (3)测试

    1)开启kafka消费者

    bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic first

    2)在idea中启动执行程序,查看kafka情况

    生产经验—数据去重

    数据传递语义

    (1)至少一次(At Least Once)= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2;

    At Least Once可以保证数据不丢失,但是不能保证数据不重复。

    (2)最多一次(At Most Once)= ACK级别设置为0;

    At Most Once可以保证数据不重复,但是不能保证数据不丢失。

    (3)精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

    幂等性

    (1)幂等性原理

    幂等性:指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

    精确一次 = 幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数>=2)。

    判断重复数据的标准:具有相同主键的消息提交时,Broker只会持久化一条。

    PID是Kafka每次重启都会分配一个新的;

    Partition表示分区号;

    Sequence Number是单调自增的。

    幂等性只能保证在单分区单会话内不重复。

    (2)如何使用幂等性

    开启enable.idempotence默认为true,false关闭。

    生产者事务

    (1)kafka事务原理

    注:开启事务必须开启幂等性。 

     (2)kafka事务的5个API

    1. //1初始化事务
    2. void initTransactions();
    3. //2开启事务
    4. void beginTransaction() throws ProducerFencedException;
    5. //3在事务内提交已经消费的偏移量
    6. void sendOffsetsToTransaction(Map offsets,
    7. String consumerGroupId) throws ProducerFencedException;
    8. //4提交事务
    9. void commitTransaction() throws ProducerFencedException;
    10. //5放弃事务
    11. void abortTransaction() throws ProducerFencedException;

    (3)单个producer,使用事务保证消息的仅一次发送

    1. import org.apache.kafka.clients.producer.KafkaProducer;
    2. import org.apache.kafka.clients.producer.ProducerConfig;
    3. import org.apache.kafka.clients.producer.ProducerRecord;
    4. import org.apache.kafka.common.serialization.StringSerializer;
    5. import java.util.Properties;
    6. public class Transaction {
    7. public static void main(String[] args) throws InterruptedException {
    8. //1.创建kafka生产者的配置对象
    9. Properties properties = new Properties();
    10. //2.给kafka配置对象添加配置信息
    11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092");
    12. //keyvalue序列化
    13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    15. //设置事务id(必须),事务id可以任意起名
    16. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionis-0");
    17. //3.创建kafka生产者对象
    18. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    19. //初始化事务
    20. kafkaProducer.initTransactions();
    21. //开启事务
    22. kafkaProducer.beginTransaction();
    23. try {
    24. //4.调用send方法,发送信息
    25. for (int i = 0;i < 5;i++) {
    26. //发送消息
    27. kafkaProducer.send(new ProducerRecord<>("first","tom" + i));
    28. }
    29. int i = 1/0;
    30. //提交事务
    31. kafkaProducer.commitTransaction();
    32. } catch (Exception e) {
    33. //中止事务
    34. kafkaProducer.abortTransaction();
    35. } finally {
    36. //5.关闭环境
    37. kafkaProducer.close();
    38. }
    39. }
    40. }

    生产经验—数据有序

    单分区内数据是有序的;

    多发区内数据是无序的;

    生产经验—数据乱序

     (1)kafka在1.x版本之前保证数据单分区有序,条件如下:  

     max.in.flight.requests.per.connection=1(不需要考虑是否开启幂等性)。

    (2)kafka在1.x及以后版本保证数据单分区有序,条件如下:

    1)开启幂等性

    max.in.flight.requests.per.connection需要设置小于等于5。

    2)未开启幂等性

    max.in.flight.requests.per.connection需要设置为1。

    原因说明:因为在kafka1.x以后,启用幂等后,kafka服务端会缓存producer发来的最近5个request的元数据,故无论如何,都可以保证最近5个request的数据都是有序的。    

    如果开启了幂等性且缓存的请求个数小于5个。会在服务端重新排序。

    本文为学习笔记!!!

  • 相关阅读:
    电子元器件行业B2B交易系统:规范企业交易流程,提升销售管理效率
    Cadence Allegro 在Gerber光绘中生成板卡层叠结构文件
    【Qt】Qt中的中心部件意义
    Elasticsearch 注册为Windows服务
    ValueError: Unknown engine: openpyxl,pandas指定读取新版本execl
    【NLP】python进行word文档编辑——构建不同层级标题
    excel表的筛选后自动求和
    开服当GM的基本准则
    linux中busybox与文件系统的关系
    2023年第十六届山东省职业院校技能大赛中职组“网络安全”赛项规程
  • 原文地址:https://blog.csdn.net/qq_55906442/article/details/126857953