我们先来回顾下6.Kafka系列之设计思想(四)-消息传递语义中的一些内容
1.发布消息时,我们有消息被“提交”到日志的概念。一旦发布的消息被提交,只要复制该消息写入的分区的一个代理保持“活动”状态,它就不会丢失。如果生产者尝试发布消息并遇到网络错误,则无法确定此错误是发生在消息提交之前还是之后。在 0.11.0.0 之前,如果生产者未能收到指示消息已提交的响应,它别无选择,只能重新发送消息。这提供了至少一次传递语义,因为如果原始请求实际上已经成功,则消息可能会在重新发送期间再次写入日志
2.从 0.11.0.0 开始,Kafka 生产者还支持幂等传递选项,保证重新发送不会导致日志中出现重复条目。为此,代理为每个生产者分配一个 ID,并使用生产者随每条消息发送的序列号对消息进行重复数据删除
3.同样从 0.11.0.0 开始,生产者支持使用类似事务的语义将消息发送到多个主题分区的能力:即 要么所有消息都已成功写入,要么都没有
1.它可以读取消息,然后保存它在日志中的位置,最后处理消息。在这种情况下,消费者进程有可能在保存其位置之后但在保存其消息处理的输出之前崩溃。在这种情况下,接管处理的进程将从保存的位置开始,即使该位置之前的一些消息还没有被处理。这对应于“至多一次”语义,因为在消费者失败消息的情况下可能不会被处理
2.它可以读取消息、处理消息并最终保存其位置。在这种情况下,消费者进程有可能在处理消息之后但在保存其位置之前崩溃。在这种情况下,当新进程接管它接收到的前几条消息时,它已经被处理过了。这对应于消费者失败情况下的“至少一次”语义。在许多情况下,消息有一个主键,因此更新是幂等的(两次接收相同的消息只是用它自己的另一个副本覆盖记录)
3.那么 exactly once 语义(即你真正想要的东西)呢?从 Kafka 主题消费并生产到另一个主题时(如Kafka Streams 应用程序),我们可以利用上面提到的 0.11.0.0 中的新事务生产者功能。消费者的位置作为消息存储在主题中,因此我们可以在与接收处理数据的输出主题相同的事务中将偏移量写入 Kafka。如果交易被中止,消费者的位置将恢复到它的旧值,并且输出主题的产生的数据将对其他消费者不可见,这取决于他们的“隔离级别”。在默认的“read_uncommitted”隔离级别中,所有消息对消费者都是可见的,即使它们是中止事务的一部分,但在“read_committed”中,消费者将只返回来自已提交事务的消息(以及任何不属于一部分的消息)交易)
我们在客户端参数显示设置enable.idempotence=true就会开启生产者幂等消息传递
注意:序列号实现幂等只是针对每一对
通过事务可以弥补幂等性不能跨多个分区的缺陷,且可以保证对多个分区写入操作的原子性
kafka-topics.sh --create --topic my-topic --partitions 10 --bootstrap-server localhost:9092
kafka-topics.sh --describe --topic my-topic --bootstrap-server localhost:9092

/**
* Kafka事务DEMO
*
* @author shenjian
* @since 2023/5/27
*/
public class KafkaTransactionDemo {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-server:30092");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
// 开启幂等传递
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.initTransactions();
try {
producer.beginTransaction();
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
producer.abortTransaction();
}
producer.close();
}
}
kafka-console-consumer.sh --topic my-topic --from-beginning --bootstrap-server localhost:9092

# --time -1 表示获取的最新位移值
# --time -2 表示获取的最早的位移值,可能由于最早的数据由于过期被删除,所以最早的位移不一定是0
# 通过两数相减,就可以知道当前分区的数据条数
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic --time -2
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic --time -1

不放心的小伙伴可以去统计要总数量是否一致奥
通过事务,从生产者角度,Kafka可以保证:
从消费者角度,事务能保证的语义相对偏弱,对于一些特殊的情况,Kafka并不能保证已提交的事务中的所有消息都能被消费:
为了实现事务,Kafka引入了事务协调器(TransactionCoodinator)负责事务的处理,所有的事务逻辑包括分派PID等都是由TransactionCoodinator 负责实施的
broker节点有一个专门管理事务的内部主题 __transaction_state,TransactionCoodinator 会将事务状态持久化到该主题中
如此一来,表面当前事务已经结束,此时就可以删除主题 __transaction_state 中所有关于该事务的消息
欢迎关注公众号算法小生