上节我们完成了如下内容:
Kafka在引入幂等性之前,Producer向Broker发送消息,然后Broker将消息追加到消息流中后给Producer返回ACK信号值,实现流程如下:
生产中,会出现各种不确定的因素,比如在Producer在发送给Broker的时候出现网络异常。比如以下这种异常情况的出现:
上图这种情况,当Producer第一次发送消息给Broker时,Broker消息(x2,y2)追加到消息中,但是在返回ACK信号给Producer时失败了(比如网络异常)。此时,Producer端触发重试机制,将消息(x2,y2)重新发送给Broker,Broker接收到消息后,再次将该消息追加到消息流中,然后成功返回ACK信号给Producer。这样下来,消息流中就被重复追加两条相同的(x2,y2)的消息。
保证咋消息重发的时候,消费者不会重复处理,即使在消费者收到重复消息的时候,重复处理,也要保证最终结果的一致性。
所谓幂等性,数学概念就是:f(f(x)) = f(x),f函数表示对消费的处理
比如,银行转账,如果失败,需要重试。不管重试多少次,都要保证结果一定一致的。
添加唯一Id,类似于数据库的主键,用于标记一个消息:
Kafka为了实现幂等性,它在底层设计架构中引入了Producer和SequenceNumber
同样的,这是一种理想状态下的发送流程。实际情况下,会有很多不确定的因素,比如Broker在发送ACK信号给Producer时出现了网络异常,导致发送失败。异常情况如下图所示:
当Producer发送消息(x2,y2)给Broker时,Broker接收到消息并将其追加到消息流中。此时,Broker返回ACK信号给Producer时,发生异常导致Producer接收ACK信号失败。对于Producer来说,会触发重试机制,将消息(x2,y2)再次发送,但是,由于引入幂等性,在每条消息中附带了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber发送给Broker,而之前Broker缓存之前发送过的相同的消息,那么在消息流中的消息就只有一条(x2,y2),不会出现重复发送的情况。
// 实例化⼀个Producer对象
Producer<String, String> producer = new KafkaProducer<>(props);
在 org.apache.kafka.clients.producer.iinternals.Sender 类中,在run()中有一个maybeWaitForPid()方法,用来生成一个ProducerID,实现代码如下:
private void maybeWaitForPid() {
if (transactionState == null) {
return;
}
while (!transactionState.hasPid()) {
try {
Node node = awaitLeastLoadedNodeReady(requestTimeout);
if (node != null) {
ClientResponse response = sendAndAwaitInitPidRequest(node);
if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
} else {
log.error("Received an unexpected response type for an InitPidRequest from {}. We will back off and try again.", node);
}
} else {
log.debug("Could not find an available broker to send InitPidRequest to. We will back off and try again.");
}
} catch (Exception e) {
log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
}
log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
time.sleep(retryBackoffMs);
metadata.requestUpdate();
}
}
在Kafka事务中,一个原子性操作,根据操作类型可以分为3中情况,情况如下:
// 初始化事务,需要注意确保transation.id属性被分配
void initTransactions();
// 开启事务
void beginTransaction() throws ProducerFencedException;
// 为Consumer提供的在事务内Commit Offsets的操作
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws ProducerFencedException;
// 提交事务
void commitTransaction() throws ProducerFencedException;
// 放弃事务,类似于回滚事务的操作
void abortTransaction() throws ProducerFencedException;
package icu.wzk.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.HashMap;
import java.util.Map;
public class MyTransactionalProducer {
public static void main(String[] args) {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 提供客户端ID
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer");
// 事务ID
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my_tx_id");
// 要求ISR确认
configs.put(ProducerConfig.ACKS_CONFIG, "all");
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
// 初始化事务
producer.initTransactions();
// 开启事务
producer.beginTransaction();
try {
// 发送消息
producer.send(new ProducerRecord<>("tp_tx_01", "tx_msg_02"));
// 可以在这里设置一些异常来测试 比如:
// int n = 1 / 0;
} catch (Exception e) {
// 中止事务
producer.abortTransaction();
} finally {
producer.close();
}
}
}
运行之后,控制台输出结果如下:
package icu.wzk.kafka;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class MyTransactional {
public static void main(String[] args) {
KafkaProducer<String, String> producer = getProducer();
KafkaConsumer<String, String> consumer = getConsumer();
// 事务的初始化
producer.initTransactions();
// 订阅主题
consumer.subscribe(Collections.singleton("tp_tx_01"));
final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// 开启事务
producer.beginTransaction();
try {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (ConsumerRecord<String, String> record : records) {
System.out.println(record);
producer.send(new ProducerRecord<>("tp_tx_out_01", record.key(), record.value()));
offsets.put(
new TopicPartition(record.topic(), record.partition()),
// 偏移量表示下一条要消费的消息
new OffsetAndMetadata(record.offset() + 1)
);
}
// 将该消息的偏移量提交作为事务的一部分,随事务提交和回滚(不提交消费偏移量)
producer.sendOffsetsToTransaction(offsets, "consumer_grp_02");
// 提交事务
producer.commitTransaction();
} catch (Exception e) {
e.printStackTrace();
// 中止事务
producer.abortTransaction();
} finally {
producer.close();
consumer.close();
}
}
public static KafkaProducer<String, String> getProducer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 设置ClientID
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "tx_producer_01");
// 设置事务ID
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "tx_id_02");
// 要求ISR确认
configs.put(ProducerConfig.ACKS_CONFIG, "all");
// 启用幂等性
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
return producer;
}
public static KafkaConsumer<String, String> getConsumer() {
Map<String, Object> configs = new HashMap<>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "h121.wzk.icu:9092");
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// 设置消费组ID
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_grp_02");
// 不启⽤消费者偏移量的⾃动确认,也不要⼿动确认
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configs.put(ConsumerConfig.CLIENT_ID_CONFIG, "consumer_client_02");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
return consumer;
}
}
(由于我测试的云服务器的Kafka掉线了,我又启动了一次,重新执行一次案例1。)
下面是案例2直接的结果如下图: