• kafka生产者2


    1.数据可靠

    • 0:生产者发送过来的数据,不需要等数据落盘应答。 

    风险:leader挂了之后,follower还没有收到消息。。。。

    • 1:生产者发送过来的数据,Leader收到数据后应答。

    风险:leader应答完成之后,还没有开始同步副本。。。。

     

     生产者发送过来的数据,Leader和ISR队列里面 的所有节点收齐数据后应答。

    可靠性总结: acks=0,生产者发送过来数据就不管了,可靠性差,效率高; acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等; acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

    1. // 设置 acks
    2. properties.put(ProducerConfig.ACKS_CONFIG, "all");
    3. // 重试次数 retries,默认是 int 最大值,2147483647
    4. properties.put(ProducerConfig.RETRIES_CONFIG, 3);

    2.数据去重 

     

    3.生产者事务

     说明:开启事务,必须开启幂等性。

    1. package com.atguigu.kafka.producer;
    2. import org.apache.kafka.clients.producer.KafkaProducer;
    3. import org.apache.kafka.clients.producer.ProducerConfig;
    4. import org.apache.kafka.clients.producer.ProducerRecord;
    5. import java.util.Properties;
    6. public class CustomProducerTransaction {
    7. public static void main(String[] args) {
    8. // 1. 创建 kafka 生产者的配置对象
    9. Properties properties = new Properties();
    10. // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
    11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
    12. "hadoop100:9092");
    13. // key,value 序列化(必须):key.serializer,value.serializer
    14. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
    15. "org.apache.kafka.common.serialization.StringSerializer");
    16. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
    17. "org.apache.kafka.common.serialization.StringSerializer");
    18. properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"tranactional_id_01");
    19. // 3. 创建 kafka 生产者对象
    20. KafkaProducer kafkaProducer = new
    21. KafkaProducer(properties);
    22. kafkaProducer.initTransactions();
    23. kafkaProducer.beginTransaction();
    24. // 4. 调用 send 方法,发送消息
    25. try{
    26. for (int i = 0; i < 5; i++) {
    27. kafkaProducer.send(new
    28. ProducerRecord<>("first","atguigu " + i));
    29. int i1 = 1 / 0;
    30. }
    31. kafkaProducer.commitTransaction();
    32. }catch (Exception e){
    33. kafkaProducer.abortTransaction();
    34. }finally {
    35. // 5. 关闭资源
    36. kafkaProducer.close();
    37. }
    38. }
    39. }

    4.数据乱序

     

    5.副本 

    Kafka 中副本分为:Leader 和 Follower。Kafka 生产者只会把数据发往 Leader, 然后 Follower 找 Leader 进行同步数据。

    Kafka 分区中的所有副本统称为 AR(Assigned Repllicas)。

    AR = ISR + OSR

    ISR表示和 Leader 保持同步的 Follower 集合。如果 Follower 长时间未向 Leader 发送  通信请求或同步数据,则该 Follower 将被踢出 ISR,进入OSR。该时间阈值由 replica.lag.time.max.ms 参数设定,默认 30s。Leader 发生故障之后,就会从 ISR 中选举新的 Leader。

    OSR,表示 Follower 与 Leader 副本同步时,延迟过多的副本。

    6.leader选举流程

    在ISR中存活为前提,按照 AR中排在前面的优先。假设broker1挂了,isr数组中的1会被抹去。按照isr中的存活前提,AR中按照优先顺序来选举新的leader。 

    7.follow故障处理

     

    8.leader故障处理 

     

    9.自动平衡

    10.后续增加副本  

  • 相关阅读:
    线下活动|来开源集市和Jina AI面对面say hi!
    Python中主要数据结构的使用
    金仓数据库兼容Oracle exp/imp的导出导入工具手册(6. 附录B:imp导入参数说明)
    Python图像处理【3】Python图像处理库应用
    数据治理的改造系统
    DataNode进入Stale状态问题排查
    第3章 Spring Boot进阶,开发社区核心功能(下)
    910数据结构(2018年真题)
    如何下载并编译UE4源码
    JDBC封装查询单个和查询多个
  • 原文地址:https://blog.csdn.net/aaaa1234561/article/details/136268869