• 聊聊KafkaListener的实现机制


    本文只要研究一下KafkaListener的实现机制

    KafkaListener

    org/springframework/kafka/annotation/KafkaListener.java

    @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @Repeatable(KafkaListeners.class)
    public @interface KafkaListener {
    	String id() default "";
    	String containerFactory() default "";
    	String[] topics() default {};
    	String topicPattern() default "";
    	TopicPartition[] topicPartitions() default {};
    	String containerGroup() default "";
    	String errorHandler() default "";
    	String groupId() default "";
    	boolean idIsGroup() default true;
    	String clientIdPrefix() default "";
    	String beanRef() default "__listener";
    	String concurrency() default "";
    	String autoStartup() default "";
    	String[] properties() default {};
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    KafkaListener注解定义了id、topics、groupId等属性

    KafkaListenerAnnotationBeanPostProcessor

    org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

    public class KafkaListenerAnnotationBeanPostProcessor
    		implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    
    	private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();	
    
    	@Override
    	public int getOrder() {
    		return LOWEST_PRECEDENCE;
    	}
    
    	@Override
    	public void setBeanFactory(BeanFactory beanFactory) {
    		this.beanFactory = beanFactory;
    		if (beanFactory instanceof ConfigurableListableBeanFactory) {
    			this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
    			this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
    					this.listenerScope);
    		}
    	}
    
    	@Override
    	public void afterSingletonsInstantiated() {
    		this.registrar.setBeanFactory(this.beanFactory);
    
    		if (this.beanFactory instanceof ListableBeanFactory) {
    			Map instances =
    					((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
    			for (KafkaListenerConfigurer configurer : instances.values()) {
    				configurer.configureKafkaListeners(this.registrar);
    			}
    		}
    
    		if (this.registrar.getEndpointRegistry() == null) {
    			if (this.endpointRegistry == null) {
    				Assert.state(this.beanFactory != null,
    						"BeanFactory must be set to find endpoint registry by bean name");
    				this.endpointRegistry = this.beanFactory.getBean(
    						KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
    						KafkaListenerEndpointRegistry.class);
    			}
    			this.registrar.setEndpointRegistry(this.endpointRegistry);
    		}
    
    		if (this.defaultContainerFactoryBeanName != null) {
    			this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
    		}
    
    		// Set the custom handler method factory once resolved by the configurer
    		MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
    		if (handlerMethodFactory != null) {
    			this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
    		}
    		else {
    			addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
    		}
    
    		// Actually register all listeners
    		this.registrar.afterPropertiesSet();
    	}
    
    	@Override
    	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
    		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
    			Class targetClass = AopUtils.getTargetClass(bean);
    			Collection classLevelListeners = findListenerAnnotations(targetClass);
    			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
    			final List multiMethods = new ArrayList<>();
    			Map> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
    					(MethodIntrospector.MetadataLookup>) method -> {
    						Set listenerMethods = findListenerAnnotations(method);
    						return (!listenerMethods.isEmpty() ? listenerMethods : null);
    					});
    			if (hasClassLevelListeners) {
    				Set methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
    						(ReflectionUtils.MethodFilter) method ->
    								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
    				multiMethods.addAll(methodsWithHandler);
    			}
    			if (annotatedMethods.isEmpty()) {
    				this.nonAnnotatedClasses.add(bean.getClass());
    				if (this.logger.isTraceEnabled()) {
    					this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
    				}
    			}
    			else {
    				// Non-empty set of methods
    				for (Map.Entry> entry : annotatedMethods.entrySet()) {
    					Method method = entry.getKey();
    					for (KafkaListener listener : entry.getValue()) {
    						processKafkaListener(listener, method, bean, beanName);
    					}
    				}
    				if (this.logger.isDebugEnabled()) {
    					this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
    							+ beanName + "': " + annotatedMethods);
    				}
    			}
    			if (hasClassLevelListeners) {
    				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
    			}
    		}
    		return bean;
    	}		
    }		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104

    KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton接口,其getOrder返回LOWEST_PRECEDENCE
    其afterSingletonsInstantiated方法(SmartInitializingSingleton接口)首先获取KafkaListenerConfigurer,然后设置configureKafkaListeners为registrar,最后是执行registrar.afterPropertiesSet()
    其postProcessAfterInitialization方法(BeanPostProcessor接口)会收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener

    processKafkaListener

    	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
    		Method methodToUse = checkProxy(method, bean);
    		MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>();
    		endpoint.setMethod(methodToUse);
    		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
    	}
    
    	protected void processListener(MethodKafkaListenerEndpoint endpoint, KafkaListener kafkaListener,
    			Object bean, Object adminTarget, String beanName) {
    
    		String beanRef = kafkaListener.beanRef();
    		if (StringUtils.hasText(beanRef)) {
    			this.listenerScope.addListener(beanRef, bean);
    		}
    		endpoint.setBean(bean);
    		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    		endpoint.setId(getEndpointId(kafkaListener));
    		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
    		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
    		endpoint.setTopics(resolveTopics(kafkaListener));
    		endpoint.setTopicPattern(resolvePattern(kafkaListener));
    		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
    		String group = kafkaListener.containerGroup();
    		if (StringUtils.hasText(group)) {
    			Object resolvedGroup = resolveExpression(group);
    			if (resolvedGroup instanceof String) {
    				endpoint.setGroup((String) resolvedGroup);
    			}
    		}
    		String concurrency = kafkaListener.concurrency();
    		if (StringUtils.hasText(concurrency)) {
    			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
    		}
    		String autoStartup = kafkaListener.autoStartup();
    		if (StringUtils.hasText(autoStartup)) {
    			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
    		}
    		resolveKafkaProperties(endpoint, kafkaListener.properties());
    
    		KafkaListenerContainerFactory factory = null;
    		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
    		if (StringUtils.hasText(containerFactoryBeanName)) {
    			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
    			try {
    				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
    			}
    			catch (NoSuchBeanDefinitionException ex) {
    				throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
    						+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
    						+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
    			}
    		}
    
    		endpoint.setBeanFactory(this.beanFactory);
    		String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
    		if (StringUtils.hasText(errorHandlerBeanName)) {
    			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
    		}
    		this.registrar.registerEndpoint(endpoint, factory);
    		if (StringUtils.hasText(beanRef)) {
    			this.listenerScope.removeListener(beanRef);
    		}
    	}	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,然后执行processListener方法,它主要是将KafkaListener注解的信息填充到MethodKafkaListenerEndpoint上,确定KafkaListenerContainerFactory,最后执行registrar.registerEndpoint(endpoint, factory)

    registrar.registerEndpoint

    org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

    	/**
    	 * Register a new {@link KafkaListenerEndpoint} alongside the
    	 * {@link KafkaListenerContainerFactory} to use to create the underlying container.
    	 * 

    The {@code factory} may be {@code null} if the default factory has to be * used for that endpoint. * @param endpoint the {@link KafkaListenerEndpoint} instance to register. * @param factory the {@link KafkaListenerContainerFactory} to use. */ public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory) { Assert.notNull(endpoint, "Endpoint must be set"); Assert.hasText(endpoint.getId(), "Endpoint id must be set"); // Factory may be null, we defer the resolution right before actually creating the container KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory); synchronized (this.endpointDescriptors) { if (this.startImmediately) { // Register and start immediately this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor), true); } else { this.endpointDescriptors.add(descriptor); } } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    KafkaListenerEndpointRegistrar的registerEndpoint会创建KafkaListenerEndpointDescriptor,然后执行endpointRegistry.registerListenerContainer

    endpointRegistry.registerListenerContainer

    org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

    	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory factory,
    			boolean startImmediately) {
    		Assert.notNull(endpoint, "Endpoint must not be null");
    		Assert.notNull(factory, "Factory must not be null");
    
    		String id = endpoint.getId();
    		Assert.hasText(id, "Endpoint id must not be empty");
    		synchronized (this.listenerContainers) {
    			Assert.state(!this.listenerContainers.containsKey(id),
    					"Another endpoint is already registered with id '" + id + "'");
    			MessageListenerContainer container = createListenerContainer(endpoint, factory);
    			this.listenerContainers.put(id, container);
    			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
    				List containerGroup;
    				if (this.applicationContext.containsBean(endpoint.getGroup())) {
    					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
    				}
    				else {
    					containerGroup = new ArrayList();
    					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
    				}
    				containerGroup.add(container);
    			}
    			if (startImmediately) {
    				startIfNecessary(container);
    			}
    		}
    	}
    
    	/**
    	 * Start the specified {@link MessageListenerContainer} if it should be started
    	 * on startup.
    	 * @param listenerContainer the listener container to start.
    	 * @see MessageListenerContainer#isAutoStartup()
    	 */
    	private void startIfNecessary(MessageListenerContainer listenerContainer) {
    		if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
    			listenerContainer.start();
    		}
    	}	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()

    MessageListenerContainer

    org/springframework/kafka/listener/MessageListenerContainer.java

    public interface MessageListenerContainer extends SmartLifecycle {
    	void setupMessageListener(Object messageListener);
    	Map> metrics();
    	default ContainerProperties getContainerProperties() {
    		throw new UnsupportedOperationException("This container doesn't support retrieving its properties");
    	}
    	default Collection getAssignedPartitions() {
    		throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
    	}
    	default void pause() {
    		throw new UnsupportedOperationException("This container doesn't support pause");
    	}
    	default void resume() {
    		throw new UnsupportedOperationException("This container doesn't support resume");
    	}
    	default boolean isPauseRequested() {
    		throw new UnsupportedOperationException("This container doesn't support pause/resume");
    	}
    	default boolean isContainerPaused() {
    		throw new UnsupportedOperationException("This container doesn't support pause/resume");
    	}
    	default void setAutoStartup(boolean autoStartup) {
    		// empty
    	}
    	default String getGroupId() {
    		throw new UnsupportedOperationException("This container does not support retrieving the group id");
    	}
    	@Nullable
    	default String getListenerId() {
    		throw new UnsupportedOperationException("This container does not support retrieving the listener id");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    MessageListenerContainer继承了SmartLifecycle接口,它有一个泛型接口为GenericMessageListenerContainer,后者有一个抽象类为AbstractMessageListenerContainer,然后它有两个子类,分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer

    AbstractMessageListenerContainer

    public abstract class AbstractMessageListenerContainer
    		implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware {
    
    	@Override
    	public final void start() {
    		checkGroupId();
    		synchronized (this.lifecycleMonitor) {
    			if (!isRunning()) {
    				Assert.isTrue(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
    						() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
    				doStart();
    			}
    		}
    	}
    
    	@Override
    	public final void stop() {
    		synchronized (this.lifecycleMonitor) {
    			if (isRunning()) {
    				final CountDownLatch latch = new CountDownLatch(1);
    				doStop(latch::countDown);
    				try {
    					latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
    					publishContainerStoppedEvent();
    				}
    				catch (InterruptedException e) {
    					Thread.currentThread().interrupt();
    				}
    			}
    		}
    	}
    	//......
    }		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    AbstractMessageListenerContainer的start方法会回调子类的doStart方法,其stop方法会回调子类的doStop方法

    KafkaMessageListenerContainer

    org/springframework/kafka/listener/KafkaMessageListenerContainer.java

    public class KafkaMessageListenerContainer // NOSONAR comment density
    		extends AbstractMessageListenerContainer {
    
    	@Override
    	protected void doStart() {
    		if (isRunning()) {
    			return;
    		}
    		if (this.clientIdSuffix == null) { // stand-alone container
    			checkTopics();
    		}
    		ContainerProperties containerProperties = getContainerProperties();
    		checkAckMode(containerProperties);
    
    		Object messageListener = containerProperties.getMessageListener();
    		Assert.state(messageListener != null, "A MessageListener is required");
    		if (containerProperties.getConsumerTaskExecutor() == null) {
    			SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
    					(getBeanName() == null ? "" : getBeanName()) + "-C-");
    			containerProperties.setConsumerTaskExecutor(consumerExecutor);
    		}
    		Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
    		GenericMessageListener listener = (GenericMessageListener) messageListener;
    		ListenerType listenerType = deteremineListenerType(listener);
    		this.listenerConsumer = new ListenerConsumer(listener, listenerType);
    		setRunning(true);
    		this.listenerConsumerFuture = containerProperties
    				.getConsumerTaskExecutor()
    				.submitListenable(this.listenerConsumer);
    	}
    
    	//......
    }		
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    KafkaMessageListenerContainer的doStart方法会获取到messageListener,然后创建ListenerConsumer,最后提交到线程池中执行

    ConcurrentMessageListenerContainer

    org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

    public class ConcurrentMessageListenerContainer extends AbstractMessageListenerContainer {
    
    	@Override
    	protected void doStart() {
    		if (!isRunning()) {
    			checkTopics();
    			ContainerProperties containerProperties = getContainerProperties();
    			TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
    			if (topicPartitions != null && this.concurrency > topicPartitions.length) {
    				this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
    						+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
    						+ topicPartitions.length);
    				this.concurrency = topicPartitions.length;
    			}
    			setRunning(true);
    
    			for (int i = 0; i < this.concurrency; i++) {
    				KafkaMessageListenerContainer container;
    				if (topicPartitions == null) {
    					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties);
    				}
    				else {
    					container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
    							containerProperties, partitionSubset(containerProperties, i));
    				}
    				String beanName = getBeanName();
    				container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
    				if (getApplicationEventPublisher() != null) {
    					container.setApplicationEventPublisher(getApplicationEventPublisher());
    				}
    				container.setClientIdSuffix("-" + i);
    				container.setGenericErrorHandler(getGenericErrorHandler());
    				container.setAfterRollbackProcessor(getAfterRollbackProcessor());
    				container.setRecordInterceptor(getRecordInterceptor());
    				container.setEmergencyStop(() -> {
    					stop(() -> {
    						// NOSONAR
    					});
    					publishContainerStoppedEvent();
    				});
    				if (isPaused()) {
    					container.pause();
    				}
    				container.start();
    				this.containers.add(container);
    			}
    		}
    	}
    
    	//......	
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    ConcurrentMessageListenerContainer的doStart会根据concurrency值来创建对应的KafkaMessageListenerContainer,然后执行其start方法

    ListenerConsumer

    org/springframework/kafka/listener/KafkaMessageListenerContainer.java

    private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
    
    		@Override
    		public void run() {
    			this.consumerThread = Thread.currentThread();
    			if (this.genericListener instanceof ConsumerSeekAware) {
    				((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
    			}
    			if (this.transactionManager != null) {
    				ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
    			}
    			this.count = 0;
    			this.last = System.currentTimeMillis();
    			initAsignedPartitions();
    			while (isRunning()) {
    				try {
    					pollAndInvoke();
    				}
    				catch (@SuppressWarnings(UNUSED) WakeupException e) {
    					// Ignore, we're stopping or applying immediate foreign acks
    				}
    				catch (NoOffsetForPartitionException nofpe) {
    					this.fatalError = true;
    					ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
    					break;
    				}
    				catch (Exception e) {
    					handleConsumerException(e);
    				}
    				catch (Error e) { // NOSONAR - rethrown
    					Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
    					if (runnable != null) {
    						runnable.run();
    					}
    					this.logger.error("Stopping container due to an Error", e);
    					wrapUp();
    					throw e;
    				}
    			}
    			wrapUp();
    		}
    
    		protected void pollAndInvoke() {
    			if (!this.autoCommit && !this.isRecordAck) {
    				processCommits();
    			}
    			processSeeks();
    			checkPaused();
    			ConsumerRecords records = this.consumer.poll(this.pollTimeout);
    			this.lastPoll = System.currentTimeMillis();
    			checkResumed();
    			debugRecords(records);
    			if (records != null && records.count() > 0) {
    				if (this.containerProperties.getIdleEventInterval() != null) {
    					this.lastReceive = System.currentTimeMillis();
    				}
    				invokeListener(records);
    			}
    			else {
    				checkIdle();
    			}
    		}
    
    		private void invokeListener(final ConsumerRecords records) {
    			if (this.isBatchListener) {
    				invokeBatchListener(records);
    			}
    			else {
    				invokeRecordListener(records);
    			}
    		}
    
    		private void doInvokeBatchOnMessage(final ConsumerRecords records,
    				List> recordList) {
    			switch (this.listenerType) {
    				case ACKNOWLEDGING_CONSUMER_AWARE:
    					this.batchListener.onMessage(recordList,
    							this.isAnyManualAck
    									? new ConsumerBatchAcknowledgment(records)
    									: null, this.consumer);
    					break;
    				case ACKNOWLEDGING:
    					this.batchListener.onMessage(recordList,
    							this.isAnyManualAck
    									? new ConsumerBatchAcknowledgment(records)
    									: null);
    					break;
    				case CONSUMER_AWARE:
    					this.batchListener.onMessage(recordList, this.consumer);
    					break;
    				case SIMPLE:
    					this.batchListener.onMessage(recordList);
    					break;
    			}
    		}
    
    		private void doInvokeOnMessage(final ConsumerRecord recordArg) {
    			ConsumerRecord record = recordArg;
    			if (this.recordInterceptor != null) {
    				record = this.recordInterceptor.intercept(record);
    			}
    			if (record == null) {
    				if (this.logger.isDebugEnabled()) {
    					this.logger.debug("RecordInterceptor returned null, skipping: " + recordArg);
    				}
    			}
    			else {
    				switch (this.listenerType) {
    					case ACKNOWLEDGING_CONSUMER_AWARE:
    						this.listener.onMessage(record,
    								this.isAnyManualAck
    										? new ConsumerAcknowledgment(record)
    										: null, this.consumer);
    						break;
    					case CONSUMER_AWARE:
    						this.listener.onMessage(record, this.consumer);
    						break;
    					case ACKNOWLEDGING:
    						this.listener.onMessage(record,
    								this.isAnyManualAck
    										? new ConsumerAcknowledgment(record)
    										: null);
    						break;
    					case SIMPLE:
    						this.listener.onMessage(record);
    						break;
    				}
    			}
    		}									
    	//......	
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131

    ListenerConsumer实现了org.springframework.scheduling.SchedulingAwareRunnable接口(它继承了Runnable接口)以及org.springframework.kafka.listener.ConsumerSeekCallback接口
    其run方法主要是执行initAsignedPartitions,然后循环执行pollAndInvoke,对于NoOffsetForPartitionException则跳出异常,对于其他Exception则执行handleConsumerException,对于Error执行emergencyStop与wrapUp方法
    pollAndInvoke方法主要是执行consumer.poll(),然后通过invokeListener(records)回调,最后是通过doInvokeBatchOnMessage、doInvokeOnMessage去回调listener.onMessage方法

    小结

    KafkaListenerAnnotationBeanPostProcessor主要是收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener,processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,执行registrar.registerEndpoint(endpoint, factory)
    KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()
    MessageListenerContainer有两个主要的实现类分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer,后者的start方法主要是根据concurrency创建对应数量的KafkaMessageListenerContainer,最后都是执行KafkaMessageListenerContainer的start方法,它会创建ListenerConsumer,最后提交到线程池中执行;ListenerConsumer主要是执行pollAndInvoke,拉取消息,然后回到listener的onMessage方法
    整体的链路就是KafkaListenerAnnotationBeanPostProcessor --> KafkaListenerEndpointRegistry --> MessageListenerContainer --> GenericMessageListener.onMessage

  • 相关阅读:
    基于Feign接口的全链路拦截器
    2022 年全国职业院校技能大赛高职组云计算赛项-容器云环境搭建
    MFC-TCP网络编程客户端-Socket
    说说原型(prototype)、原型链
    博途PLC 1200/1500PLC ModbusTcp通信之状态机编程
    计算机等级考试Python二级
    MySQL : 彻底搞懂一条SQL的执行过程
    [附源码]Python计算机毕业设计Django的小区宠物管理系统
    使用 ESP32 CAM 和 OpenCV 的运动检测
    Mac M1芯片Java开发环境搭建 · Maven安装
  • 原文地址:https://blog.csdn.net/hello_ejb3/article/details/133980409