• Spring boot整合Activemq的原理


    自动配置

    如果未自定义ConnectionFactory Bean,则使用此配置。引入了ActiveMQXAConnectionFactoryConfigurationActiveMQConnectionFactoryConfiguration,使用了配置:ActiveMQPropertiesJmsProperties

    @Configuration(proxyBeanMethods = false)
    @AutoConfigureBefore(JmsAutoConfiguration.class)
    @AutoConfigureAfter({ JndiConnectionFactoryAutoConfiguration.class })
    @ConditionalOnClass({ ConnectionFactory.class, ActiveMQConnectionFactory.class })
    @ConditionalOnMissingBean(ConnectionFactory.class)
    @EnableConfigurationProperties({ ActiveMQProperties.class, JmsProperties.class })
    @Import({ ActiveMQXAConnectionFactoryConfiguration.class, ActiveMQConnectionFactoryConfiguration.class })
    public class ActiveMQAutoConfiguration {
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    配置属性

    spring:
      activemq:
        user: admin
        password: ${PASSWORD}
        broker-url: tcp://127.0.0.1:61616
        pool:
          enabled: true
          max-connections: 10
      jms:
        pub-sub-domain: false  #只能监听队列,不能是topic。
        # 可以设置监听器的默认属性
        listener:
          concurrency: 1
          maxConcurrency: 2
        #可以设置 template的一些默认属性。
        template:
          
          
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    ActiveMQProperties

    @ConfigurationProperties(prefix = "spring.activemq")
    public class ActiveMQProperties {
    
    	/** 	 * URL of the ActiveMQ broker.*/
    	private String brokerUrl;
    
    	/** 默认 broker是否仅内存模式。	 */
    	private boolean inMemory = true;
    	private String user;
    	private String password;
    
    	/**
    	 * Time to wait before considering a close complete.
    	 */
    	private Duration closeTimeout = Duration.ofSeconds(15);
    
    	/**
    	 * Whether to stop message delivery before re-delivering messages from a rolled back
    	 * transaction. This implies that message order is not preserved when this is enabled.
    	 */
    	private boolean nonBlockingRedelivery = false;
    
    	/** 发送消息 等待响应的超时。0 表示一直等待。*/
    	private Duration sendTimeout = Duration.ofMillis(0);
    
    	@NestedConfigurationProperty
    	private final JmsPoolConnectionFactoryProperties pool = new JmsPoolConnectionFactoryProperties();
    
    	private final Packages packages = new Packages();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    连接池属性

    //package org.springframework.boot.autoconfigure.jms;
    public class JmsPoolConnectionFactoryProperties {
      //是否启用连接池
    	private boolean enabled;
    
    	/** 连接池已满是否block。false:则抛出异常:JMSException
    	 */
    	private boolean blockIfFull = true;
    
    	/** 阻塞时间。连接池满,则抛出异常。	 */
    	private Duration blockIfFullTimeout = Duration.ofMillis(-1);
    
    	/**
    	 * Connection idle timeout.
    	 */
    	private Duration idleTimeout = Duration.ofSeconds(30);
    
    	/** 最大连接数	 */
    	private int maxConnections = 1;
    
    	/** 每个连接的最大session数。*/
    	private int maxSessionsPerConnection = 500;
    
    	/**
    	 * Time to sleep between runs of the idle connection eviction thread. When negative,
    	 * no idle connection eviction thread runs.
    	 */
    	private Duration timeBetweenExpirationCheck = Duration.ofMillis(-1);
    
    	/** 仅使用一个匿名 生产者。false,则表示每次请求一个匿名producer。	 */
    	private boolean useAnonymousProducers = true;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    JmsProperties

    
    @ConfigurationProperties(prefix = "spring.jms")
    public class JmsProperties {
    
    	/** 是否默认destination类型是topic */
    	private boolean pubSubDomain = false;
    
    	/**
    	 * Connection factory JNDI name. When set, takes precedence to others connection  factory auto-configurations.
    	 */
    	private String jndiName;
      //Cache的设置
    	private final Cache cache = new Cache();
      //Listener设置
    	private final Listener listener = new Listener();
      
    	private final Template template = new Template();
      
      
    	public static class Listener {
    		/**		 * 自动启动 */
    		private boolean autoStartup = true;
    		/**ACK模式,默认:auto */
    		private AcknowledgeMode acknowledgeMode;
    		private Integer concurrency;
    		private Integer maxConcurrency;
    		/**
    		 * Timeout to use for receive calls. Use -1 for a no-wait receive or 0 for no
    		 * timeout at all. The latter is only feasible if not running within a transaction
    		 * manager and is generally discouraged since it prevents clean shutdown.
    		 */
    		private Duration receiveTimeout = Duration.ofSeconds(1);
    
      }
      
      
    	public static class Template {
    
    		/** 未设置 defaultDestination时的默认名。 */
    		private String defaultDestination;
    
    		private Duration deliveryDelay;
    
    		private DeliveryMode deliveryMode;
    		private Integer priority;
    		private Duration timeToLive;
    
    		private Boolean qosEnabled;
    		private Duration receiveTimeout;
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    Configuration

    
    @Configuration(proxyBeanMethods = false)
    //不存在 ConnectionFactory Bean时应用配置
    @ConditionalOnMissingBean(ConnectionFactory.class)
    class ActiveMQConnectionFactoryConfiguration {
    
    	@Configuration(proxyBeanMethods = false)
      //不启用 连接池 时配置 ( pool.enabled:false)
    	@ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "false",
    			matchIfMissing = true)
    	static class SimpleConnectionFactoryConfiguration {
    
        // 不启用 缓存时配置(cache.enabled:false)。
    		@Bean
    		@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "false")
    		ActiveMQConnectionFactory jmsConnectionFactory(ActiveMQProperties properties,
    				ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
    			return createJmsConnectionFactory(properties, factoryCustomizers);
    		}
         
    		private static ActiveMQConnectionFactory createJmsConnectionFactory(ActiveMQProperties properties,
    				ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
          
    			return new ActiveMQConnectionFactoryFactory(properties,
    					factoryCustomizers.orderedStream().collect(Collectors.toList()))
            //通过FactoryFactory 构造 Factory
    							.createConnectionFactory(ActiveMQConnectionFactory.class);
    		}
    
        //启用 缓存 时配置
    		@Configuration(proxyBeanMethods = false)
    		@ConditionalOnClass(CachingConnectionFactory.class)
    		@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "true",
    				matchIfMissing = true)
    		static class CachingConnectionFactoryConfiguration {
    
    			@Bean
    			CachingConnectionFactory cachingJmsConnectionFactory(JmsProperties jmsProperties,
    					ActiveMQProperties properties,
    					ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
            // 获取cache。
    				JmsProperties.Cache cacheProperties = jmsProperties.getCache();
    				CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
    						createJmsConnectionFactory(properties, factoryCustomizers));
              //设置到factory。
    				connectionFactory.setCacheConsumers(cacheProperties.isConsumers());
    				connectionFactory.setCacheProducers(cacheProperties.isProducers());
    				connectionFactory.setSessionCacheSize(cacheProperties.getSessionCacheSize());
    				return connectionFactory;
    			}
    
    		}
    
    	}
    
      // 启用 了 连接池 时的配置。
    	@Configuration(proxyBeanMethods = false)
    	@ConditionalOnClass({ JmsPoolConnectionFactory.class, PooledObject.class })
    	static class PooledConnectionFactoryConfiguration {
    
        // 启用了 连接池。
    		@Bean(destroyMethod = "stop")
    		@ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "true")
    		JmsPoolConnectionFactory pooledJmsConnectionFactory(ActiveMQProperties properties,
    				ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
    			ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactoryFactory(properties,
    					factoryCustomizers.orderedStream().collect(Collectors.toList()))
    							.createConnectionFactory(ActiveMQConnectionFactory.class);
          
    			return new JmsPoolConnectionFactoryFactory(properties.getPool())
    					.createPooledConnectionFactory(connectionFactory);
    		}
    
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77

    JmsAnnotationDrivenConfiguration

    启用Jms的注解配置类。

    //配置前提:引入了类:EnableJms
    @ConditionalOnClass(EnableJms.class)
    class JmsAnnotationDrivenConfiguration {
      //几个重要属性
    	private final ObjectProvider<DestinationResolver> destinationResolver;
    	private final ObjectProvider<JtaTransactionManager> transactionManager;
    	private final ObjectProvider<MessageConverter> messageConverter;
    	private final JmsProperties properties;
      //默认:DefaultJmsListenerContainerFactoryConfigurer
      @Bean
    	@ConditionalOnMissingBean
    	DefaultJmsListenerContainerFactoryConfigurer jmsListenerContainerFactoryConfigurer() {
    		DefaultJmsListenerContainerFactoryConfigurer configurer = new DefaultJmsListenerContainerFactoryConfigurer();
    		configurer.setDestinationResolver(this.destinationResolver.getIfUnique());
    		configurer.setTransactionManager(this.transactionManager.getIfUnique());
    		configurer.setMessageConverter(this.messageConverter.getIfUnique());
    		configurer.setJmsProperties(this.properties);
    		return configurer;
    	}
    	//默认:DefaultJmsListenerContainerFactory
    	@Bean
    	@ConditionalOnSingleCandidate(ConnectionFactory.class)
    	@ConditionalOnMissingBean(name = "jmsListenerContainerFactory")
    	DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
    			DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
    		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    		configurer.configure(factory, connectionFactory);
    		return factory;
    	}
    
    	@Configuration(proxyBeanMethods = false)
    	@EnableJms
    	@ConditionalOnMissingBean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
    	static class EnableJmsConfiguration {
    
    	}
    
      //配置默认:DestinationResolver
    	@Configuration(proxyBeanMethods = false)
    	@ConditionalOnJndi
    	static class JndiConfiguration {
    
    		@Bean
    		@ConditionalOnMissingBean(DestinationResolver.class)
    		JndiDestinationResolver destinationResolver() {
    			JndiDestinationResolver resolver = new JndiDestinationResolver();
    			resolver.setFallbackToDynamicDestination(true);
    			return resolver;
    		}
    
    	}
      
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    ActiveMQConnectionFactoryFactory

    主要用于创建 ActiveMQConnectionFactory。根据配置的propertiesActiveMQConnectionFactoryCustomizer 构造 ActiveMQConnectionFactory 实例。

    class ActiveMQConnectionFactoryFactory {
    
    	private static final String DEFAULT_EMBEDDED_BROKER_URL = "vm://localhost?broker.persistent=false";
    
    	private static final String DEFAULT_NETWORK_BROKER_URL = "tcp://localhost:61616";
      //属性
    	private final ActiveMQProperties properties;
      // 设置属性
    	private final List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers;
    
      private <T extends ActiveMQConnectionFactory> T doCreateConnectionFactory(Class<T> factoryClass) throws Exception {
    		T factory = createConnectionFactoryInstance(factoryClass);
    		if (this.properties.getCloseTimeout() != null) {
    			factory.setCloseTimeout((int) this.properties.getCloseTimeout().toMillis());
    		}
    		factory.setNonBlockingRedelivery(this.properties.isNonBlockingRedelivery());
    		if (this.properties.getSendTimeout() != null) {
    			factory.setSendTimeout((int) this.properties.getSendTimeout().toMillis());
    		}
    		Packages packages = this.properties.getPackages();
    		if (packages.getTrustAll() != null) {
    			factory.setTrustAllPackages(packages.getTrustAll());
    		}
    		if (!packages.getTrusted().isEmpty()) {
    			factory.setTrustedPackages(packages.getTrusted());
    		}
        //使用 ActiveMQConnectionFactoryCustomizer 设置 
    		customize(factory);
    		return factory;
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    JmsPoolConnectionFactoryFactory

    创建带连接池的ConnectionFactory的工厂。

    public class JmsPoolConnectionFactoryFactory {
    
    	private final JmsPoolConnectionFactoryProperties properties;
    	
    		public JmsPoolConnectionFactory createPooledConnectionFactory(ConnectionFactory connectionFactory) {
    		JmsPoolConnectionFactory pooledConnectionFactory = new JmsPoolConnectionFactory();
    		//包装 ConnectionFactory
    		pooledConnectionFactory.setConnectionFactory(connectionFactory);
    		//其他设置JmsPoolConnectionFactory 的属性。
    		pooledConnectionFactory.setBlockIfSessionPoolIsFull(this.properties.isBlockIfFull());
    		if (this.properties.getBlockIfFullTimeout() != null) {
    			pooledConnectionFactory
    					.setBlockIfSessionPoolIsFullTimeout(this.properties.getBlockIfFullTimeout().toMillis());
    		}
    
    		......
    		}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    JmsPoolConnectionFactory

    连接池,创建连接。

    public class JmsPoolConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
        public Connection createConnection() throws JMSException {
            return this.createConnection((String)null, (String)null);
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    ActiveMQConnectionFactoryCustomizer

    用于在ActiveMQConnectionFactoryFactory 生成 ActiveMQConnectionFactory时 设置 ActiveMQConnectionFactory 的值。

    @FunctionalInterface
    public interface ActiveMQConnectionFactoryCustomizer {
    
    	/**
    	 * Customize the {@link ActiveMQConnectionFactory}.
    	 * @param factory the factory to customize
    	 */
    	void customize(ActiveMQConnectionFactory factory);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    Listener

    @JmsListener

    @Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Repeatable(JmsListeners.class)
    @MessageMapping
    public @interface JmsListener {
      //用于 MethodJmsListenerEndpoint id。在注册时使用,必须唯一。
    	String id() default "";
    
    	String containerFactory() default "";
    
    	String destination();
    
    	/**	 * 持久化订阅的名字	 */
    	String subscription() default "";
      /**消息选择器 **/
    	String selector() default "";
    
    	/** 消费者的并行数量限制。如果设置了,会覆盖 container factory的设置。可以是“5-10”,“10”这样的设置。
    	注意可能不是所有的都支持。
    	 */
    	String concurrency() default "";
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    @JmsListeners

    public @interface JmsListeners {
    
    	JmsListener[] value();
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    JmsListenerConfigurer

    主要用于配置 JmsListenerEndpointRegistrar

    @FunctionalInterface
    public interface JmsListenerConfigurer {
    	void configureJmsListeners(JmsListenerEndpointRegistrar registrar);
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    JmsListenerAnnotationBeanPostProcessor

    Spring 会对注解了@JmsListener@JmsListeners 的 方 法 进 行 处 理 ,会使用 JmsListenerAnnotationBeanPostProcessor 来处理 标注了 @JmsListener@JmsListeners 的类和方法。

    public class JmsListenerAnnotationBeanPostProcessor
    		implements MergedBeanDefinitionPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    
    @Override
    	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    		if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
    				bean instanceof JmsListenerEndpointRegistry) {
    			// Ignore AOP infrastructure such as scoped proxies.
    			return bean;
    		}
    
    		Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
    		if (!this.nonAnnotatedClasses.contains(targetClass) &&
            // 
    				AnnotationUtils.isCandidateClass(targetClass, JmsListener.class)) {
          
    			Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
    					(MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
                //获取  @JmsListener
    						Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
    								method, JmsListener.class, JmsListeners.class);
    						return (!listenerMethods.isEmpty() ? listenerMethods : null);
    					});
    			if (annotatedMethods.isEmpty()) {
    				this.nonAnnotatedClasses.add(targetClass);
    				if (logger.isTraceEnabled()) {
    					logger.trace("No @JmsListener annotations found on bean type: " + targetClass);
    				}
    			}
    			else {
    				// Non-empty set of methods
    				annotatedMethods.forEach((method, listeners) ->
    						listeners.forEach(listener -> processJmsListener(listener, method, bean)));
    				if (logger.isDebugEnabled()) {
    					logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
    							"': " + annotatedMethods);
    				}
    			}
    		}
    		return bean;
    	}
    
        //对每个 JmsListener 处理。
      	protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
    		Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
    
    		MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
    		endpoint.setBean(bean);
    		endpoint.setMethod(invocableMethod);
    		endpoint.setMostSpecificMethod(mostSpecificMethod);
    		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
    		endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
    		endpoint.setBeanFactory(this.beanFactory);
    		endpoint.setId(getEndpointId(jmsListener));
          //根据配置设置属性。
    		endpoint.setDestination(resolve(jmsListener.destination()));
    		if (StringUtils.hasText(jmsListener.selector())) {
    			endpoint.setSelector(resolve(jmsListener.selector()));
    		}
    		if (StringUtils.hasText(jmsListener.subscription())) {
    			endpoint.setSubscription(resolve(jmsListener.subscription()));
    		}
    		if (StringUtils.hasText(jmsListener.concurrency())) {
    			endpoint.setConcurrency(resolve(jmsListener.concurrency()));
    		}
    
    		JmsListenerContainerFactory<?> factory = null;
    		String containerFactoryBeanName = resolve(jmsListener.containerFactory());
    		if (StringUtils.hasText(containerFactoryBeanName)) {
    			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
    			try {
    				factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
    			}
    			catch (NoSuchBeanDefinitionException ex) {
    				throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
    						mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
    						" with id '" + containerFactoryBeanName + "' was found in the application context", ex);
    			}
    		}
    
    		this.registrar.registerEndpoint(endpoint, factory);
    	}
    
      
    	@Override
    	public void afterSingletonsInstantiated() {
    		// Remove resolved singleton classes from cache
    		this.nonAnnotatedClasses.clear();
    
    		if (this.beanFactory instanceof ListableBeanFactory) {
          //可以在其他地方定义 JmsListenerConfigurer
    			Map<String, JmsListenerConfigurer> beans =
    					((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
    			List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
    			AnnotationAwareOrderComparator.sort(configurers);
    			for (JmsListenerConfigurer configurer : configurers) {
    				configurer.configureJmsListeners(this.registrar);
    			}
    		}
    
    		if (this.containerFactoryBeanName != null) {
    			this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
    		}
    
    		if (this.registrar.getEndpointRegistry() == null) {
    			// Determine JmsListenerEndpointRegistry bean from the BeanFactory
    			if (this.endpointRegistry == null) {
    				Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name");
    				this.endpointRegistry = this.beanFactory.getBean(
    						JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
    			}
    			this.registrar.setEndpointRegistry(this.endpointRegistry);
    		}
    
    
    		// Set the custom handler method factory once resolved by the configurer
    		MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
    		if (handlerMethodFactory != null) {
    			this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
    		}
    
    		// Actually register all listeners
    		this.registrar.afterPropertiesSet();
    	}
    
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127

    JmsListenerEndpointRegistrar

    用于注册Listener与对应的ContainerFactory。

    public class JmsListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
      //JmsListenerEndpoint 对应Listener包装
      //JmsListenerContainerFactory :
      public void registerEndpoint(JmsListenerEndpoint endpoint, @Nullable JmsListenerContainerFactory<?> factory) {
    		Assert.notNull(endpoint, "Endpoint must not be null");
    		Assert.hasText(endpoint.getId(), "Endpoint id must be set");
    
    		// 包装成一个 JmsListenerEndpointDescriptor
    		JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);
    
    		synchronized (this.mutex) {
    			if (this.startImmediately) {  // register and start immediately
    				Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
    				this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
    						resolveContainerFactory(descriptor), true);
    			}
    			else {
    				this.endpointDescriptors.add(descriptor);
    			}
    		}
    	}
    }  
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    MethodJmsListenerEndpoint

    @JmsListener 的包装类

    public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint implements BeanFactoryAware {
    	@Nullable
    	private Object bean;
      //注解了@JmsListener的方法
    	@Nullable
    	private Method method;
    
    	@Nullable
    	private Method mostSpecificMethod;
    
    	@Nullable
    	private MessageHandlerMethodFactory messageHandlerMethodFactory;
    
    	@Nullable
    	private StringValueResolver embeddedValueResolver;
      
      //MessageListenerContainer 一般是 DefaultMessageListenerContainer
    	@Override
    	protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
    		Assert.state(this.messageHandlerMethodFactory != null,
    				"Could not create message listener - MessageHandlerMethodFactory not set");
    		MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
    		Object bean = getBean();
    		Method method = getMethod();
    		Assert.state(bean != null && method != null, "No bean+method set on endpoint");
    		InvocableHandlerMethod invocableHandlerMethod =
    				this.messageHandlerMethodFactory.createInvocableHandlerMethod(bean, method);
    		messageListener.setHandlerMethod(invocableHandlerMethod);
    		String responseDestination = getDefaultResponseDestination();
    		if (StringUtils.hasText(responseDestination)) {
          // 根据是否TOPIC OR QUEUE 设置
    			if (container.isReplyPubSubDomain()) {
    				messageListener.setDefaultResponseTopicName(responseDestination);
    			}
    			else {
    				messageListener.setDefaultResponseQueueName(responseDestination);
    			}
    		}
        //根据 container 的属性设置  ListenerAdapter
    		QosSettings responseQosSettings = container.getReplyQosSettings();
    		if (responseQosSettings != null) {
    			messageListener.setResponseQosSettings(responseQosSettings);
    		}
    		MessageConverter messageConverter = container.getMessageConverter();
    		if (messageConverter != null) {
    			messageListener.setMessageConverter(messageConverter);
    		}
    		DestinationResolver destinationResolver = container.getDestinationResolver();
    		if (destinationResolver != null) {
    			messageListener.setDestinationResolver(destinationResolver);
    		}
    		return messageListener;
    	}
    
      
      	/**
    	 * Create an empty {@link MessagingMessageListenerAdapter} instance.
    	 * @return a new {@code MessagingMessageListenerAdapter} or subclass thereof
    	 */
    	protected MessagingMessageListenerAdapter createMessageListenerInstance() {
    		return new MessagingMessageListenerAdapter();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    消息消费

    MessagingMessageListenerAdapter

    处理消息的适配器。

    public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {
      //注解了@JmsListener的方法。
      	@Nullable
    	private InvocableHandlerMethod handlerMethod;
      
    	@Override
    	public void onMessage(javax.jms.Message jmsMessage, @Nullable Session session) throws JMSException {
        //消息格式转换
    		Message<?> message = toMessagingMessage(jmsMessage);
    		if (logger.isDebugEnabled()) {
    			logger.debug("Processing [" + message + "]");
    		}
        //解析消息
        //可以处理返回值。
    		Object result = invokeHandler(jmsMessage, session, message);
    		if (result != null) {
    			handleResult(result, jmsMessage, session);
    		}
    		else {
    			logger.trace("No result object given - no result to handle");
    		}
    	}
    
    	@Nullable
    	private Object invokeHandler(javax.jms.Message jmsMessage, @Nullable Session session, Message<?> message) {
        //获取对应的Method
    		InvocableHandlerMethod handlerMethod = getHandlerMethod();
    		try {
          //Method 调用
    			return handlerMethod.invoke(message, jmsMessage, session);
    		}
    		catch (MessagingException ex) {
    			throw new ListenerExecutionFailedException(
    					createMessagingErrorMessage("Listener method could not be invoked with incoming message"), ex);
    		}
    		catch (Exception ex) {
    			throw new ListenerExecutionFailedException("Listener method '" +
    					handlerMethod.getMethod().toGenericString() + "' threw exception", ex);
    		}
    	}
      //根据需要转换为对应的Message 格式。
    	protected Message<?> toMessagingMessage(javax.jms.Message jmsMessage) {
    		try {
    			return (Message<?>) getMessagingMessageConverter().fromMessage(jmsMessage);
    		}
    		catch (JMSException ex) {
    			throw new MessageConversionException("Could not convert JMS message", ex);
    		}
    	}
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    image-20221020165944484

    //package org.springframework.messaging.handler.invocation;
    public class InvocableHandlerMethod extends HandlerMethod {
      	@Nullable
      //message 这是接收到的消息,
      //providedArgs 为传入的其他参数:javax.jms.Message jmsMessage, @Nullable Session session
    	public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
    		Object[] args = getMethodArgumentValues(message, providedArgs);
    		if (logger.isTraceEnabled()) {
    			logger.trace("Arguments: " + Arrays.toString(args));
    		}
    		return doInvoke(args);
    	}
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    AbstractMessageListenerContainer

    @JmsListener注解的方法的封装。里面设置有其注解里的属性。

    public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer
    {
      //@JmsListener 中定义的
    	@Nullable
    	private volatile Object destination;
      //@JmsListener 中定义的
    	@Nullable
    	private volatile String messageSelector;
      // MessagingMessageListenerAdapter ,里面包含JmsListener处理方法
    	@Nullable
    	private volatile Object messageListener;
      
    	private boolean subscriptionDurable = false;
    
    	private boolean subscriptionShared = false;
      //@JmsListener 中定义的
    	@Nullable
    	private String subscriptionName;
      //是topic则设置为true
    	@Nullable
    	private Boolean replyPubSubDomain;
    
    	@Nullable
    	private QosSettings replyQosSettings;
      
      
      protected void doExecuteListener(Session session, Message message) throws JMSException {
    		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
    			if (logger.isWarnEnabled()) {
    				logger.warn("Rejecting received message because of the listener container " +
    						"having been stopped in the meantime: " + message);
    			}
    			rollbackIfNecessary(session);
    			throw new MessageRejectedWhileStoppingException();
    		}
    
    		try {
          //处理消息。
    			invokeListener(session, message);
    		}
    		catch (JMSException | RuntimeException | Error ex) {
    			rollbackOnExceptionIfNecessary(session, ex);
    			throw ex;
    		}
    		commitIfNecessary(session, message);
    	}
    
      	protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException {
    		// 是事务型的。
    		if (session.getTransacted()) {
    			// Commit necessary - but avoid commit call within a JTA transaction.
          //本地事务,
    			if (isSessionLocallyTransacted(session)) {
    				// Transacted session created by this container -> commit.
    				JmsUtils.commitIfNecessary(session);
    			}
    		}
          //非事务型的,并且是客户端ACK,则自动应答
          //session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE
    		else if (message != null && isClientAcknowledge(session)) {
    			message.acknowledge();
    		}
    	}
      //事务提交
      public static void commitIfNecessary(Session session) throws JMSException {
    		Assert.notNull(session, "Session must not be null");
    		try {
    			session.commit();
    		}
    		catch (javax.jms.TransactionInProgressException | javax.jms.IllegalStateException ex) {
    			// Ignore -> can only happen in case of a JTA transaction.
    		}
    	}
      
      protected void invokeListener(Session session, Message message) throws JMSException {
    		Object listener = getMessageListener();
    
    		if (listener instanceof SessionAwareMessageListener) {
    			doInvokeListener((SessionAwareMessageListener) listener, session, message);
    		}
    		else if (listener instanceof MessageListener) {
    			doInvokeListener((MessageListener) listener, message);
    		}
    		else if (listener != null) {
    			throw new IllegalArgumentException(
    					"Only MessageListener and SessionAwareMessageListener supported: " + listener);
    		}
    		else {
    			throw new IllegalStateException("No message listener specified - see property 'messageListener'");
    		}
    	}
      
      protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message)
    			throws JMSException {
    
    		Connection conToClose = null;
    		Session sessionToClose = null;
    		try {
          //主要是为了判断 session是否变化
    			Session sessionToUse = session;
    			if (!isExposeListenerSession()) {
    				// We need to expose a separate Session.
    				conToClose = createConnection();
    				sessionToClose = createSession(conToClose);
    				sessionToUse = sessionToClose;
    			}
          //处理消息。
    			listener.onMessage(message, sessionToUse);
    			// Clean up specially exposed Session, if any.
    			if (sessionToUse != session) {
    				if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) {
    					// Transacted session created by this container -> commit.
    					JmsUtils.commitIfNecessary(sessionToUse);
    				}
    			}
    		}
    		finally {
    			JmsUtils.closeSession(sessionToClose);
    			JmsUtils.closeConnection(conToClose);
    		}
    	}
      
      protected void invokeListener(Session session, Message message) throws JMSException {
        //获取到MessagingMessageListenerAdapter ,里面包含JmsListener处理方法
    		Object listener = getMessageListener();
    
    		if (listener instanceof SessionAwareMessageListener) {
    			doInvokeListener((SessionAwareMessageListener) listener, session, message);
    		}
    		else if (listener instanceof MessageListener) {
    			doInvokeListener((MessageListener) listener, message);
    		}
    		else if (listener != null) {
    			throw new IllegalArgumentException(
    					"Only MessageListener and SessionAwareMessageListener supported: " + listener);
    		}
    		else {
    			throw new IllegalStateException("No message listener specified - see property 'messageListener'");
    		}
    	}
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142

    DefaultMessageListenerContainer

    DefaultMessageListenerContainer是一个用于异步消息监听的管理类。循环去broker获取消息。

    每个DefaultMessageListenerContainer的实例对应一个Consumer。

    public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
      
    		private boolean executeOngoingLoop() throws JMSException {
    			boolean messageReceived = false;
    			boolean active = true;
    			while (active) {
    				synchronized (lifecycleMonitor) {
    					....
    				}
            //active 则继续接收消息
    				if (active) {
    					messageReceived = (invokeListener() || messageReceived);
    				}
    			}
    			return messageReceived;
    		}
    
      
      
    		private boolean invokeListener() throws JMSException {
    			this.currentReceiveThread = Thread.currentThread();
    			try {
    				initResourcesIfNecessary();
    				boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
    				this.lastMessageSucceeded = true;
    				return messageReceived;
    			}
    			finally {
    				this.currentReceiveThread = null;
    			}
    		}
      //AbstractPollingMessageListenerContainer 方法
      //从consumer中获取消息
      	@Nullable
    	protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
    		return receiveFromConsumer(consumer, getReceiveTimeout());
    	}
      //处理消息
      protected void doExecuteListener(Session session, Message message) throws JMSException {
    		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
    			if (logger.isWarnEnabled()) {
    				logger.warn("Rejecting received message because of the listener container " +
    						"having been stopped in the meantime: " + message);
    			}
    			rollbackIfNecessary(session);
    			throw new MessageRejectedWhileStoppingException();
    		}
    
    		try {
    			invokeListener(session, message);
    		}
    		catch (JMSException | RuntimeException | Error ex) {
    			rollbackOnExceptionIfNecessary(session, ex);
    			throw ex;
    		}
    		commitIfNecessary(session, message);
    	}
      
      
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    DefaultJmsListenerContainerFactory

    DefaultMessageListenerContainer 会由DefaultJmsListenerContainerFactory 创建, DefaultJmsListenerContainerFactory 又是由DefaultJmsListenerContainerFactoryConfigurer 进行配置。

    public final class DefaultJmsListenerContainerFactoryConfigurer {
    public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
    		Assert.notNull(factory, "Factory must not be null");
    		Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
    		factory.setConnectionFactory(connectionFactory);
    		factory.setPubSubDomain(this.jmsProperties.isPubSubDomain());
    		if (this.transactionManager != null) {
    			factory.setTransactionManager(this.transactionManager);
    		}
    		else {
          //没配置TM,则设置为true。
    			factory.setSessionTransacted(true);
    		}
    		if (this.destinationResolver != null) {
    			factory.setDestinationResolver(this.destinationResolver);
    		}
    		if (this.messageConverter != null) {
    			factory.setMessageConverter(this.messageConverter);
    		}
      // 配置的JmsProperties属性。
    		JmsProperties.Listener listener = this.jmsProperties.getListener();
    		factory.setAutoStartup(listener.isAutoStartup());
    		if (listener.getAcknowledgeMode() != null) {
    			factory.setSessionAcknowledgeMode(listener.getAcknowledgeMode().getMode());
    		}
    		String concurrency = listener.formatConcurrency();
    		if (concurrency != null) {
    			factory.setConcurrency(concurrency);
    		}
    		Duration receiveTimeout = listener.getReceiveTimeout();
    		if (receiveTimeout != null) {
    			factory.setReceiveTimeout(receiveTimeout.toMillis());
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    public class DefaultJmsListenerContainerFactory
    		extends AbstractJmsListenerContainerFactory<DefaultMessageListenerContainer> {
      //构造 DefaultMessageListenerContainer
      @Override
    	protected DefaultMessageListenerContainer createContainerInstance() {
    		return new DefaultMessageListenerContainer();
    	}
      // AbstractJmsListenerContainerFactory 的一些方法
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    AbstractJmsListenerContainerFactory

    @Override
    	public C createListenerContainer(JmsListenerEndpoint endpoint) {
    		C instance = createContainerInstance();
    
    		if (this.connectionFactory != null) {
    			instance.setConnectionFactory(this.connectionFactory);
    		}
    		if (this.destinationResolver != null) {
    			instance.setDestinationResolver(this.destinationResolver);
    		}
    		if (this.errorHandler != null) {
    			instance.setErrorHandler(this.errorHandler);
    		}
    		if (this.messageConverter != null) {
    			instance.setMessageConverter(this.messageConverter);
    		}
    		if (this.sessionTransacted != null) {
    			instance.setSessionTransacted(this.sessionTransacted);
    		}
    		if (this.sessionAcknowledgeMode != null) {
    			instance.setSessionAcknowledgeMode(this.sessionAcknowledgeMode);
    		}
    		if (this.pubSubDomain != null) {
    			instance.setPubSubDomain(this.pubSubDomain);
    		}
    		if (this.replyPubSubDomain != null) {
    			instance.setReplyPubSubDomain(this.replyPubSubDomain);
    		}
    		if (this.replyQosSettings != null) {
    			instance.setReplyQosSettings(this.replyQosSettings);
    		}
    		if (this.subscriptionDurable != null) {
    			instance.setSubscriptionDurable(this.subscriptionDurable);
    		}
    		if (this.subscriptionShared != null) {
    			instance.setSubscriptionShared(this.subscriptionShared);
    		}
    		if (this.clientId != null) {
    			instance.setClientId(this.clientId);
    		}
    		if (this.phase != null) {
    			instance.setPhase(this.phase);
    		}
    		if (this.autoStartup != null) {
    			instance.setAutoStartup(this.autoStartup);
    		}
    
    		initializeContainer(instance);
        // 设置 endpoint 的container
    		endpoint.setupListenerContainer(instance);
    
    		return instance;
    	}
    
    	/**
    	 * Create an empty container instance.
    	 */
    	protected abstract C createContainerInstance();
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    ActiveMQMessageConsumer

    创建消费者实例。根据ActiveMQSession的参数创建。

    public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
                String name, String selector, int prefetch,
                int maximumPendingMessageCount, boolean noLocal, boolean browser,
                boolean dispatchAsync, MessageListener messageListener) throws JMSException {
           //..... .... 省略一些
            this.session = session;
           //此处可以根据 destination 获取 重发策略。
            this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
            if (this.redeliveryPolicy == null) {
                this.redeliveryPolicy = new RedeliveryPolicy();
            }
            setTransformer(session.getTransformer());
            //以下代码就是设置一些属性。
            //。。。。。。
    
           //....... 
    
            this.info.setDestination(dest);
            this.info.setBrowser(browser);
             //selector
            if (selector != null && selector.trim().length() != 0) {
                // Validate the selector
                SelectorParser.parse(selector);
                this.info.setSelector(selector);
                this.selector = selector;
            } else if (info.getSelector() != null) {
                // Validate the selector
                SelectorParser.parse(this.info.getSelector());
                this.selector = this.info.getSelector();
            } else {
                this.selector = null;
            }
    
           //..... .... 
            if (messageListener != null) {
                setMessageListener(messageListener);
            }
            try {
                this.session.addConsumer(this);
                this.session.syncSendPacket(info);
            } catch (JMSException e) {
                this.session.removeConsumer(this);
                throw e;
            }
    
            if (session.connection.isStarted()) {
                start();
            }
        }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    Destination

    Activemq的Destination主要分为TopicQueue,还有其他的。

    destination的name 通过 DynamicDestinationResolver 解决,其是在JmsDestinationAccessor里创建的,JmsDestinationAccessorDefaultMessageListenerContainer 的父类。

    public interface Destination {
    }
    
    public interface Queue extends Destination {
        String getQueueName() throws JMSException;
    
        String toString();
    }
    
    public abstract class ActiveMQDestination
    {
    }
    
    public class ActiveMQQueue extends ActiveMQDestination implements Queue {
    }
    
    public class ActiveMQTopic extends ActiveMQDestination implements Topic {
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    image-20221021155133158

    spring 整合 Jms

    为JmsListener 设置 containerFactory

    默认情况下,当使用了@JmsListener 注解,而又没有自定义 JmsListenerContainerFactory时,Spring Boot 会自动创建一个默认的对象。

    如果想创建多个 JmsListenerContainerFactory,可使用 Spring Boot 提供的DefaultJmsListenerContainerFactoryConfigurer 来创建

    @Configuration
    static class JmsConfiguration {
      // 默认的bean的名字为jmsListenerContainerFactory,此处可以设置自己的。
      //如果与默认的一致,则默认的不会再被创建了。
      @Bean(name="myContainerFactory")
      public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory =    new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory());
        
        factory.setMessageConverter (myMessageConverter());
        return factory;
      }
      
      
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    上面代码定义完JmsListenerContainerFactory 之 后 , 在 @JmsListener 注 解中指定containerFactory 为对应的 Factory 名字(myFactory) 即可。

    一个应用内同时消费队列和发布订阅两个类型的消息

      //消费topic的
    	@Bean(name = { "jmsListenerContainerFactory4Topic" })
    	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Topic(DefaultJmsListenerContainerFactoryConfigurer configurer) {
    		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        //设置一些默认值
        configurer.configure(factory, connectionFactory());
     
    		//设置 启用发布/订阅
    		factory.setPubSubDomain(true);
    		if (this.transactionManager != null) {
    			factory.setTransactionManager(transactionManager);
    		} else {
    			factory.setSessionTransacted(Boolean.valueOf(true));
    		}
    		JmsProperties.Listener listener = jmsProperties.getListener();
    		factory.setAutoStartup(listener.isAutoStartup());
    		if (listener.getAcknowledgeMode() != null) {
    			factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
    		}
    		String concurrency = listener.formatConcurrency();
    		if (concurrency != null)
    			factory.setConcurrency(concurrency);
    		return factory;
    	}
    	  //消费Queue的
    	@Bean(name = { "jmsListenerContainerFactory4Queue" })
    	public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Queue(DefaultJmsListenerContainerFactoryConfigurer configurer) {
    		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
         //设置一些默认值
        configurer.configure(factory, connectionFactory());
         
        //不启用发布/订阅
    		factory.setPubSubDomain(false);
    		if (this.transactionManager != null) {
    			factory.setTransactionManager(transactionManager);
    		} else {
    			factory.setSessionTransacted(Boolean.valueOf(true));
    		}
    		JmsProperties.Listener listener = jmsProperties.getListener();
    		factory.setAutoStartup(listener.isAutoStartup());
    		if (listener.getAcknowledgeMode() != null) {
    			factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
    		}
    		String concurrency = listener.formatConcurrency();
    		if (concurrency != null)
    			factory.setConcurrency(concurrency);
     
    		return factory;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    一个应用内实现生产者P2P&subpub兼容模式

    定义2个JmsTemplate

     
    /**
     * 自定义JmsTemplate,支持事务
     *
     */
    @Configuration
    public class JmsTemplateConfiguration {
     
    	private final JmsProperties properties;
    	private final ObjectProvider<DestinationResolver> destinationResolver;
    	private final ObjectProvider<MessageConverter> messageConverter;
     
    	public JmsTemplateConfiguration(JmsProperties properties,
    			ObjectProvider<DestinationResolver> destinationResolver,
    			ObjectProvider<MessageConverter> messageConverter) {
    		this.properties = properties;
    		this.destinationResolver = destinationResolver;
    		this.messageConverter = messageConverter;
    	}
     
    	/**
    	 * 配置队列生产者的JmsTemplate
    	 * @param connectionFactory
    	 * @return
    	 */
    	@Bean(name="jmsQueueTemplate")
    	@Primary
    	public JmsTemplate jmsQueueTemplate(ConnectionFactory connectionFactory) {
    		//设置创建连接的工厂
    		JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
    		
    		//设置P2P队列消息类型
    		jmsTemplate.setPubSubDomain(false);
     
    		DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
    		if (destinationResolver != null) {
    			jmsTemplate.setDestinationResolver(destinationResolver);
    		}
    		MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
    		if (messageConverter != null) {
    			jmsTemplate.setMessageConverter(messageConverter);
    		}
    		//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
    		jmsTemplate.setExplicitQosEnabled(true);
    		//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
    		jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    		//默认不开启事务
    		System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted());
    		//如果不启用事务,则会导致XA事务失效;
    		//作为生产者如果需要支持事务,则需要配置SessionTransacted为true
    		jmsTemplate.setSessionTransacted(true);
    		
    		return jmsTemplate;
    	}
    	
    	/**
    	 * 配置发布订阅生产者的JmsTemplate
    	 */
    	@Bean(name="jmsTopicTemplate")
    	public JmsTemplate jmsTopicTemplate(ConnectionFactory connectionFactory) {
    		//设置创建连接的工厂
    		JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
    		
    		//设置发布订阅消息类型
    		jmsTemplate.setPubSubDomain(true);
     
    		DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
    		if (destinationResolver != null) {
    			jmsTemplate.setDestinationResolver(destinationResolver);
    		}
    		MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
    		if (messageConverter != null) {
    			jmsTemplate.setMessageConverter(messageConverter);
    		}
    		//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
    		jmsTemplate.setExplicitQosEnabled(true);
    		//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
    		jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
    		//默认不开启事务
    		System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted());
    		//如果不启用事务,则会导致XA事务失效;
    		//作为生产者如果需要支持事务,则需要配置SessionTransacted为true
    		jmsTemplate.setSessionTransacted(true);
    		
    		return jmsTemplate;
    	}
    	
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88

    使用定义的JmsTemplate,定义JmsMessagingTemplate

    @Configuration
    public class JmsMessageConfiguration {
      //通过name 引用前面定义的JmsTemplate
    	@Autowired
    	@Qualifier(value="jmsQueueTemplate")
    	private JmsTemplate jmsQueueTemplate;
    	
    	@Autowired
    	@Qualifier(value="jmsTopicTemplate")
    	private JmsTemplate jmsTopicTemplate;
    	
    	/** 	 * 定义点对点队列 	 */
    	@Bean
    	public Queue queue() {
    		return new ActiveMQQueue("my.queue");
    	}
    	
    	/** 定义一个主题 */
    	@Bean
        public Topic topic() {
           return new ActiveMQTopic("my.topic");
        }
    	
    	/**   创建处理队列消息模板 */
    	@Bean(name="jmsQueueMessagingTemplate")
    	public JmsMessagingTemplate jmsQueueMessagingTemplate() {
    		return new JmsMessagingTemplate(jmsQueueTemplate);
    	}
    	
    	/** 创建处理发布订阅消息模板 */
    	@Bean(name="jmsTopicMessagingTemplate")
    	public JmsMessagingTemplate jmsTopicMessagingTemplate() {
    		return new JmsMessagingTemplate(jmsTopicTemplate);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    使用JmsMessagingTemplate

    @Service
    public class SendService {
     
    	@Autowired
    	private Queue queue;
    	
    	@Autowired
    	@Qualifier(value="jmsQueueMessagingTemplate")
    	private JmsMessagingTemplate jmsQueueMessagingTemplate;
    	
    	@Autowired
    	private Topic topic;
    	
    	@Autowired
    	@Qualifier(value="jmsTopicMessagingTemplate")
    	private JmsMessagingTemplate jmsTopicMessagingTemplate;
    	//使用事务。
    	@Transactional(propagation=Propagation.REQUIRED,rollbackFor=ArithmeticException.class)
    	public void send(){
    		//发送队列消息
    		jmsQueueMessagingTemplate.convertAndSend(this.queue, "生产者辛苦生产的点对点消息成果");
    		System.out.println("生产者:辛苦生产的点对点消息成果");
    		//发送发布订阅消息
    		jmsTopicMessagingTemplate.convertAndSend(this.topic, "生产者辛苦生产的发布订阅消息成果");
    		System.out.println("生产者:辛苦生产的发布订阅消息成果");
    		System.out.println(1/0);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    在@JmsListener 方法中访问session

        @JmsListener(destination = "so55038881")
        // 会自动设置Session,还可以获取到JmsMessage。
        public void listen(String in, Session session) {
            System.out.println(in + ":" + session);
        }
      //通过@Autowired 把 jmsTemplate 注入。
      @Autowired
      public MyBean(JmsTemplate jmsTemplate) {
      		this.jmsTemplate = jmsTemplate;
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    每个@JmsListener 都会产生一个container。每个container 产生一个session。每个session 都有自己的id

      JmsPoolSession { ActiveMQSession {id=ID:DESKTOP-F4Q4JRF-52518-1666324884549-1:1:1,started=true} java.lang.Object@150e4fcb }
      JmsPoolSession { ActiveMQSession {id=ID:DESKTOP-F4Q4JRF-52518-1666324884549-1:2:1,started=true} java.lang.Object@3aa2221d }
    
    • 1
    • 2

    不同Destination使用不同重投递策略

    使用ActiveMQConnectionFactoryCustomizer修改 ConnectionFactoryRedeliveryPolicyMap 属性。

    下面代码配置了2个ActiveMQConnectionFactoryCustomizer,都被执行了。其中一个设置了指定queue的 RedeliveryPolicy。

    
        @Bean
        public ActiveMQConnectionFactoryCustomizer myCustomizer() {
            return new ActiveMQConnectionFactoryCustomizer() {
    
                /**
                 * Customize the {@link ActiveMQConnectionFactory}.
                 *
                 * @param factory the factory to customize
                 */
                @Override
                public void customize(ActiveMQConnectionFactory factory) {
                    System.out.println("Customizer 1");
                }
            };
        }
    
        @Bean
        public ActiveMQConnectionFactoryCustomizer myCustomizer2() {
            return new ActiveMQConnectionFactoryCustomizer() {
    
                /**
                 * Customize the {@link ActiveMQConnectionFactory}.
                 *
                 * @param factory the factory to customize
                 */
                @Override
                public void customize(ActiveMQConnectionFactory factory) {
    
                    //设置了队列的 policy。
                    ActiveMQQueue q = new ActiveMQQueue("queue.dlq.test");
                    RedeliveryPolicy policy = new RedeliveryPolicy();
                    policy.setMaximumRedeliveries(3);
                    factory.getRedeliveryPolicyMap().put(q,policy);
    
                    System.out.println("Customizer 2");
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    也可以在DefaultJmsListenerContainerFactory 创建时设置。

    RedeliveryPolicyMap

    用于存储不同destination 对应的不同 重投递策略。

    Key:ActiveMQDestination,通过physicalName equals。

    Value:RedeliveryPolicy

    return physicalName.equals(d.physicalName);
    
    • 1
  • 相关阅读:
    【LeetCode-数组】-- 寻找数组的中心索引
    Redis配置与优化
    Vue3 中 keepAlive 如何搭配 VueRouter 来更自由的控制页面的状态缓存?
    Java岗:字节面经分享+Java面试必考题
    【Python】编码
    Android 的Memory Profiler详解
    常用 Git 命令
    Android Audio实战——音量设置Hal(二十)
    Linux进程控制
    后端 | 如何卸载 setup.py 安装 python 包 | Python
  • 原文地址:https://blog.csdn.net/demon7552003/article/details/127464640