目录
💟这里是CS大白话专场,让枯燥的学习变得有趣!
💟没有对象不要怕,我们new一个出来,每天对ta说不尽情话!
💟好记性不如烂键盘,自己总结不如收藏别人!
acks = 0 | acks = 1 | acks = -1 | |
机制 | 生产者发完数据不接收应答 | 生产者发完数据Leader应答 | 生产者发完数据Leader和ISR队列里所有Follower应答 |
可靠性 | 低 | 中 | 高 |
效率 | 高 | 中 | 低 |
使用 | 少 | 传输普通日志 允许丢数据 | 传输重要数据 不允许丢数据 |
💌Leader维护一个动态的ISR(Leader+Follower),若Follower长时间没有反应,则被踢出ISR。
💌至少一次:保证数据不丢,但可能数据重复。
ACK == -1 && 分区副本数 >= 2 && ISR里应答的最小副本数 >= 2
- //asks
- properties.put(ProducerConfig.ACKS_CONFIG,"1");
- //重试次数 默认int最大值2147483647
- properties.put(ProducerConfig.RETRIES_CONFIG,3);
💌在Leader应答的一瞬间挂掉了,选择一个Follower作为Leader发送应答,但是Follower本身已经同步了一份数据,因此在应答的时候数据重复了。
💌至多一次:保证数据不重复,但可能数据丢失。
ACK == 0
💌精确一次:既不重复也不丢失数据。
幂等性 && 至少一次
💌幂等性:Broker端只会持久化一条Producer发来的同样的数据,由以下三个参数判定:
🍠PID:每次重启分配一个新的,保证单会话内不重复。
🍠Partition:区号。
🍠SeqNumber:单调自增。
💌开启事务必须开启幂等性。每一个Broker节点都有一个事务协调器,根据唯一的事务id对50求模,确定事务存到主题的哪一个分区,分区所在的节点就是本次事务的主负责人。整体流程就是申请完事务id之后发送数据持久化请求,将数据存储到主题中,再确认数据是否真正发送成功。
- package com.jodie.kafka.producer;
-
- import org.apache.kafka.clients.producer.*;
- import org.apache.kafka.common.serialization.StringSerializer;
-
- import java.util.Properties;
- import java.util.concurrent.ExecutionException;
-
- /**
- * @author Jodie
- * @date 2022/9/20-18:17
- */
- public class CustomProducerTransactions {
- public static void main(String[] args) throws ExecutionException, InterruptedException {
- Properties properties = new Properties();
- //连接集群
- properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.10.102:9092,192.168.10.103:9092");
- //指定序列化类型
- //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
- properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
- //指定事务ID
- properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"01");
- //创建生产者对象
- KafkaProducer
kafkaProducer = new KafkaProducer<>(properties); - //初始化事务
- kafkaProducer.initTransactions();
- //开启事务
- kafkaProducer.beginTransaction();
-
- try {
- kafkaProducer.send(new ProducerRecord<>("first","jodie"));
- //提交事务
- kafkaProducer.commitTransaction();
- }catch (Exception e){
- //放弃事务
- kafkaProducer.abortTransaction();
- }finally {
- kafkaProducer.close();
- }
- }
- }
💌在1.x版本之后保证数据单分区有序,条件如下:
🍠未开启幂等性:max.in.flight.requests.per.connection=1。
🍠开启幂等性:max.in.flight.requests.per.connection<=5,保证最近五个request的数据有序。