前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。
消息监听器顾名思义用来接收消息,它是使用消息监听容器的必须条件。目前有8个消息监听器:
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
1、Consumer对象不是线程安全的;2、不要在这个过程中操作消费者位置和/或监听器中已提交偏移量。
消息监听容器有两种,一种是单线程消费,一种是多线程消费。
单线程实现为 KafkaMessageListenerContainer,KafkaMessageListenerContainer在单个线程上接收来自所有主题或分区的所有消息。
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
通过 ContainerProperties 可以对主题和分区以及其他信息进行配置
//主题设置
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
//监听器设置
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
//消费者工厂配置
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
权限配置:authorizationExceptionRetryInterval。这是一个容器属性,它会从KafkaConsumer中获取获取信息,当配置的用户被拒绝读取特定主题时,
就会发生触发 AuthorizationException。
多线程实现为 ConcurrentMessageListenerContainer ,ConcurrentMessageListenerContainer实际上是在给一个或多个KafkaMessageListenerContainer实例提供多线程消费,
本质上最后进行工作的还是KafkaMessageListenerContainer,故此它的实现和 KafkaMessageListenerContainer类似,
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它有一个concurrency属性,通过设置这个属性可以指定创建几个 KafkaMessageListenerContainer实例。
假设我们有3个主题,每个主题有5个分区。同时设置 container.setConcurrency(15),我们希望的是有15个线程活动着,实际上只有5个活着,
这是因为 Kafka 中默认的 PartitionAssignor是RangeAssignor。 在SpringBoot中可以设置: spring.kafka.consumer.properties.partition.assignment.strategy=
org.apache.kafka.clients.consumer.RoundRobinAssignor 来更改策略。
自动提交
设置 enable.auto.commit 消费者属性为 true 即可。这个也是默认状态。
手动提交
设置 enable.auto.commit 消费者属性为 false 即可;同时注意设置 AckMode。以下是spring支持的集中类型说明(无事务)
如何提交
public interface Acknowledgment {
void acknowledge();
}
如果要提交部分批次,使用nack(),使用事务时,设置AckMode为MANUAL; 调用nack()会将成功处理的记录的偏移量发送到事务。
nack()只能在调用侦听器的消费者线程上调用。