之前已经介绍了通过kafka-topic-consumer.sh
和kafka-tool
工具来消费数据。下面介绍SpringKafka消费数据的方式——kafka消息监听器。
Kafka的消息监听一般可以分为:1.单条数据监听;2.批量数据监听。GenericMessageListener
是SpringKafka
的消息监听器接口,也是一个函数式接口,利用接口的onMessage
方法可以实现消费数据。
public interface GenericMessageListener {
void onMessage(T data);
default void onMessage(T data, @Nullable Acknowledgment acknowledgment) {
throw new UnsupportedOperationException("Container should never call this");
}
default void onMessage(T data, Consumer, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
default void onMessage(T data, @Nullable Acknowledgment acknowledgment, Consumer, ?> consumer) {
throw new UnsupportedOperationException("Container should never call this");
}
}
基于此接口可以实现单条数据消息监听器接口MessageListenen
、多条数据消息监听器接口BatchMessageListener
、带ACK机制的消息监听器AcknowledgingMessageListener
和BatchAcknowledgingMessageListener
#mermaid-svg-11qFALBGTxUu6O9B {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-11qFALBGTxUu6O9B .error-icon{fill:#552222;}#mermaid-svg-11qFALBGTxUu6O9B .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-11qFALBGTxUu6O9B .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-11qFALBGTxUu6O9B .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-11qFALBGTxUu6O9B .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-11qFALBGTxUu6O9B .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-11qFALBGTxUu6O9B .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-11qFALBGTxUu6O9B .marker{fill:#333333;stroke:#333333;}#mermaid-svg-11qFALBGTxUu6O9B .marker.cross{stroke:#333333;}#mermaid-svg-11qFALBGTxUu6O9B svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-11qFALBGTxUu6O9B .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-11qFALBGTxUu6O9B .cluster-label text{fill:#333;}#mermaid-svg-11qFALBGTxUu6O9B .cluster-label span{color:#333;}#mermaid-svg-11qFALBGTxUu6O9B .label text,#mermaid-svg-11qFALBGTxUu6O9B span{fill:#333;color:#333;}#mermaid-svg-11qFALBGTxUu6O9B .node rect,#mermaid-svg-11qFALBGTxUu6O9B .node circle,#mermaid-svg-11qFALBGTxUu6O9B .node ellipse,#mermaid-svg-11qFALBGTxUu6O9B .node polygon,#mermaid-svg-11qFALBGTxUu6O9B .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-11qFALBGTxUu6O9B .node .label{text-align:center;}#mermaid-svg-11qFALBGTxUu6O9B .node.clickable{cursor:pointer;}#mermaid-svg-11qFALBGTxUu6O9B .arrowheadPath{fill:#333333;}#mermaid-svg-11qFALBGTxUu6O9B .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-11qFALBGTxUu6O9B .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-11qFALBGTxUu6O9B .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-11qFALBGTxUu6O9B .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-11qFALBGTxUu6O9B .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-11qFALBGTxUu6O9B .cluster text{fill:#333;}#mermaid-svg-11qFALBGTxUu6O9B .cluster span{color:#333;}#mermaid-svg-11qFALBGTxUu6O9B div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-11qFALBGTxUu6O9B :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}
MessageListener
GenericMessageListener
单条数据监听器
BatchMessageListener
批量数据监听器
AckowledgingMessageListenenr
带ACK的单条数据监听器
BatchAckowledgingMessageListener
带ACK机制的批量数据监听器
消息监听器MessageListener
是由消费监听器容器MessageListenerContainer
接口来承载,使用setupMessageListenner()
方法启动一个监听器。其中还有定义了操作消息的resume()
、pause()
等方法。
public interface MessageListenerContainer extends SmartLifecycle {
// 启动一个消息监听器
void setupMessageListener(Object messageListener);
// 获取消费者的指标信息
Map> metrics();
}
spring-kafka提供了两个容器KafkaMessageListenerContainer
和ConcurrentMessageListenerContainer
。
#mermaid-svg-AShjsbIwvUJstX1D {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-AShjsbIwvUJstX1D .error-icon{fill:#552222;}#mermaid-svg-AShjsbIwvUJstX1D .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-AShjsbIwvUJstX1D .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-AShjsbIwvUJstX1D .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-AShjsbIwvUJstX1D .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-AShjsbIwvUJstX1D .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-AShjsbIwvUJstX1D .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-AShjsbIwvUJstX1D .marker{fill:#333333;stroke:#333333;}#mermaid-svg-AShjsbIwvUJstX1D .marker.cross{stroke:#333333;}#mermaid-svg-AShjsbIwvUJstX1D svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-AShjsbIwvUJstX1D .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-AShjsbIwvUJstX1D .cluster-label text{fill:#333;}#mermaid-svg-AShjsbIwvUJstX1D .cluster-label span{color:#333;}#mermaid-svg-AShjsbIwvUJstX1D .label text,#mermaid-svg-AShjsbIwvUJstX1D span{fill:#333;color:#333;}#mermaid-svg-AShjsbIwvUJstX1D .node rect,#mermaid-svg-AShjsbIwvUJstX1D .node circle,#mermaid-svg-AShjsbIwvUJstX1D .node ellipse,#mermaid-svg-AShjsbIwvUJstX1D .node polygon,#mermaid-svg-AShjsbIwvUJstX1D .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-AShjsbIwvUJstX1D .node .label{text-align:center;}#mermaid-svg-AShjsbIwvUJstX1D .node.clickable{cursor:pointer;}#mermaid-svg-AShjsbIwvUJstX1D .arrowheadPath{fill:#333333;}#mermaid-svg-AShjsbIwvUJstX1D .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-AShjsbIwvUJstX1D .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-AShjsbIwvUJstX1D .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-AShjsbIwvUJstX1D .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-AShjsbIwvUJstX1D .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-AShjsbIwvUJstX1D .cluster text{fill:#333;}#mermaid-svg-AShjsbIwvUJstX1D .cluster span{color:#333;}#mermaid-svg-AShjsbIwvUJstX1D div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-AShjsbIwvUJstX1D :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}
GenericMessageListenerContainer
MessageListenerContainer
AbstractMessageListenerContainer
KafkaMessageListenerConatiner
ConcurrentMessageListenerCOntainer
消息监听器容器由容器工厂KafkaListenerContainerFactory
统一创建并管理
public interface KafkaListenerContainerFactory {
// 根据endpoint创建监听器容器
C createListenerContainer(KafkaListenerEndpoint endpoint);
// 根据topic、partition和offset的配置
C createContainer(TopicPartitionOffset... topicPartitions);
// 根据topic创建监听器容器
C createContainer(String... topics);
// 根据topic的正则表达式创建监听器容器
C createContainer(Pattern topicPattern);
}
spring-kafka提供了监听器容器工厂ConcurrentKafkaListenerContainerFactory
,其有两个重要的配置
ContainerProperties
和ConsumerFactory
#mermaid-svg-oFHzGvGC4hl330UK {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-oFHzGvGC4hl330UK .error-icon{fill:#552222;}#mermaid-svg-oFHzGvGC4hl330UK .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-oFHzGvGC4hl330UK .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-oFHzGvGC4hl330UK .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-oFHzGvGC4hl330UK .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-oFHzGvGC4hl330UK .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-oFHzGvGC4hl330UK .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-oFHzGvGC4hl330UK .marker{fill:#333333;stroke:#333333;}#mermaid-svg-oFHzGvGC4hl330UK .marker.cross{stroke:#333333;}#mermaid-svg-oFHzGvGC4hl330UK svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-oFHzGvGC4hl330UK .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-oFHzGvGC4hl330UK .cluster-label text{fill:#333;}#mermaid-svg-oFHzGvGC4hl330UK .cluster-label span{color:#333;}#mermaid-svg-oFHzGvGC4hl330UK .label text,#mermaid-svg-oFHzGvGC4hl330UK span{fill:#333;color:#333;}#mermaid-svg-oFHzGvGC4hl330UK .node rect,#mermaid-svg-oFHzGvGC4hl330UK .node circle,#mermaid-svg-oFHzGvGC4hl330UK .node ellipse,#mermaid-svg-oFHzGvGC4hl330UK .node polygon,#mermaid-svg-oFHzGvGC4hl330UK .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-oFHzGvGC4hl330UK .node .label{text-align:center;}#mermaid-svg-oFHzGvGC4hl330UK .node.clickable{cursor:pointer;}#mermaid-svg-oFHzGvGC4hl330UK .arrowheadPath{fill:#333333;}#mermaid-svg-oFHzGvGC4hl330UK .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-oFHzGvGC4hl330UK .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-oFHzGvGC4hl330UK .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-oFHzGvGC4hl330UK .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-oFHzGvGC4hl330UK .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-oFHzGvGC4hl330UK .cluster text{fill:#333;}#mermaid-svg-oFHzGvGC4hl330UK .cluster span{color:#333;}#mermaid-svg-oFHzGvGC4hl330UK div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-oFHzGvGC4hl330UK :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}
AbstractKafkaListenerContainerFactory
KafkaListenerContainerFactory
ConcurrentKafkaListenerContainerFactory
ContainerProperties
定义了要消费消息的topic
,消息处理的MessageListener
等信息。
因此要实现一个消息监听器的流程如下:
#mermaid-svg-VZRVgqMYDSikNYS2 {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .error-icon{fill:#552222;}#mermaid-svg-VZRVgqMYDSikNYS2 .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-VZRVgqMYDSikNYS2 .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-VZRVgqMYDSikNYS2 .marker{fill:#333333;stroke:#333333;}#mermaid-svg-VZRVgqMYDSikNYS2 .marker.cross{stroke:#333333;}#mermaid-svg-VZRVgqMYDSikNYS2 svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-VZRVgqMYDSikNYS2 .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster-label text{fill:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster-label span{color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .label text,#mermaid-svg-VZRVgqMYDSikNYS2 span{fill:#333;color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .node rect,#mermaid-svg-VZRVgqMYDSikNYS2 .node circle,#mermaid-svg-VZRVgqMYDSikNYS2 .node ellipse,#mermaid-svg-VZRVgqMYDSikNYS2 .node polygon,#mermaid-svg-VZRVgqMYDSikNYS2 .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-VZRVgqMYDSikNYS2 .node .label{text-align:center;}#mermaid-svg-VZRVgqMYDSikNYS2 .node.clickable{cursor:pointer;}#mermaid-svg-VZRVgqMYDSikNYS2 .arrowheadPath{fill:#333333;}#mermaid-svg-VZRVgqMYDSikNYS2 .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-VZRVgqMYDSikNYS2 .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-VZRVgqMYDSikNYS2 .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-VZRVgqMYDSikNYS2 .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster text{fill:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 .cluster span{color:#333;}#mermaid-svg-VZRVgqMYDSikNYS2 div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-VZRVgqMYDSikNYS2 :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}
KafkaProperties
ConsumerFactory
ContainerProperties
ContainerFactory
ListenerContainer
MessageListener
SpringKafka的消费者是由一个消费监听器容器ListenerConatiner
去承载的,容器对应一个配置文件为ContainerProperties
,ContainerProperties
继承自消费者配置类ConsumerProperties
,并且承载了消息监听器的设置
#mermaid-svg-BgCGzMWDlV1c7PWP {font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .error-icon{fill:#552222;}#mermaid-svg-BgCGzMWDlV1c7PWP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-BgCGzMWDlV1c7PWP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-BgCGzMWDlV1c7PWP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-BgCGzMWDlV1c7PWP .marker.cross{stroke:#333333;}#mermaid-svg-BgCGzMWDlV1c7PWP svg{font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-BgCGzMWDlV1c7PWP .label{font-family:“trebuchet ms”,verdana,arial,sans-serif;color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster-label text{fill:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster-label span{color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .label text,#mermaid-svg-BgCGzMWDlV1c7PWP span{fill:#333;color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .node rect,#mermaid-svg-BgCGzMWDlV1c7PWP .node circle,#mermaid-svg-BgCGzMWDlV1c7PWP .node ellipse,#mermaid-svg-BgCGzMWDlV1c7PWP .node polygon,#mermaid-svg-BgCGzMWDlV1c7PWP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-BgCGzMWDlV1c7PWP .node .label{text-align:center;}#mermaid-svg-BgCGzMWDlV1c7PWP .node.clickable{cursor:pointer;}#mermaid-svg-BgCGzMWDlV1c7PWP .arrowheadPath{fill:#333333;}#mermaid-svg-BgCGzMWDlV1c7PWP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-BgCGzMWDlV1c7PWP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-BgCGzMWDlV1c7PWP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-BgCGzMWDlV1c7PWP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster text{fill:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP .cluster span{color:#333;}#mermaid-svg-BgCGzMWDlV1c7PWP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:“trebuchet ms”,verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-BgCGzMWDlV1c7PWP :root{–mermaid-font-family:“trebuchet ms”,verdana,arial,sans-serif;}
ContainerProperties
ConsumerProperties
首先介绍非注解式的消息监听器,类似于ProducerFactory
,消费者需要创建一个ConsumerFactory
@Bean
public ConsumerFactory consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties());
}
然后建立监听器容器工厂ConcurrentKafkaListenerContainerFactory
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(KafkaConsts.DEFAULT_PARTITION_NUM);
factory.setBatchListener(true);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
有了容器工厂之后,就可以通过注册bean
的方式生成一个MessageListenerContainer
@Bean
public KafkaMessageListenerContainer kafkaMessageListenerContainer(
ConsumerFactory consumerFactory) {
ContainerProperties containerProperties = new ContainerProperties("numb");
containerProperties.setMessageListener(
(MessageListener) data -> System.out.println("收到消息: " + data.value()));
return new KafkaMessageListenerContainer(consumerFactory, containerProperties);
}
在这个kafkaMessageListenerContainer
中,通过ContainerProperties
配置了消费的topic和messageListener。之后启动项目后,spring会将kafkaMessageListenerContainer
注册到ConcurrentKafkaListenerContainerFactory
中,这样获取到数据后会自动调用消息监听器进行数据处理。
测试消费者消费数据
@Test
public void test_send_and_consume() {
ExecutorService threadPool = Executors.newCachedThreadPool();
threadPool.submit(() -> {
while (true) {
kafkaTemplate.send(KafkaConsts.TOPIC_TEST, UUID.randomUUID().toString(), "kv");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("发送完成");
}
});
while (true);
}
输出:
发送完成
发送成功
收到消息: kv
发送完成
发送成功
收到消息: kv
之前配置了容器监听器工厂ConcurrentKafkaListenerContainerFactory
之后,还需要用代码配置MessageListenerContainer
, 指定消费的topic、消息监听器处理等。其实上面这步完全可以通过注解@KafkaListener
实现。
@Component
@Slf4j
public class MessageHandler {
@KafkaListener(topics = KafkaConsts.TOPIC_TEST, containerFactory = "kafkaListenerContainerFactory", id = "consumer_numb"
// , topicPartitions = { @TopicPartition(topic = "numb", partitionOffsets = {@PartitionOffset(partition = "0", initialOffset="1")})}
)
public void handleMessage(ConsumerRecord record, Acknowledgment acknowledgment) {
try {
String message = (String) record.value();
log.info("收到消息: {}", message);
} catch (Exception e) {
log.error(e.getMessage(), e);
} finally {
// 手动提交 offset
acknowledgment.acknowledge();
}
}
}
id:监听器的id
groupId:消费组id
idIsGroup:是否用id作为groupId,如果置为false,并指定groupId时,消费组ID使用groupId;否则默认为true,会使用监听器的id作为groupId
topics:指定要监听哪些topic(与topicPattern、topicPartitions 三选一)
topicPattern: 匹配Topic进行监听(与topics、topicPartitions 三选一)
topicPartitions: 显式分区分配,可以为监听器配置明确的主题和分区(以及可选的初始偏移量)
@KafkaListener(id = “thing2”, topicPartitions =
{ @TopicPartition(topic = “topic1”, partitions = { “0”, “1” }),
@TopicPartition(topic = “topic2”, partitions = “0”,
partitionOffsets = @PartitionOffset(partition = “1”, initialOffset = “100”))
})
containerFactory:指定监听器容器工厂
errorHandler: 监听异常处理器,配置BeanName
beanRef:真实监听容器的BeanName,需要在 BeanName前加 “__”
clientIdPrefix:消费者Id前缀
concurrency: 覆盖容器工厂containerFactory的并发配置
先自我介绍一下,小编13年上师交大毕业,曾经在小公司待过,去过华为OPPO等大厂,18年进入阿里,直到现在。深知大多数初中级java工程师,想要升技能,往往是需要自己摸索成长或是报班学习,但对于培训机构动则近万元的学费,着实压力不小。自己不成体系的自学效率很低又漫长,而且容易碰到天花板技术停止不前。因此我收集了一份《java开发全套学习资料》送给大家,初衷也很简单,就是希望帮助到想自学又不知道该从何学起的朋友,同时减轻大家的负担。添加下方名片,即可获取全套学习资料哦