如果未自定义ConnectionFactory Bean,则使用此配置。引入了ActiveMQXAConnectionFactoryConfiguration,ActiveMQConnectionFactoryConfiguration,使用了配置:ActiveMQProperties,JmsProperties
@Configuration(proxyBeanMethods = false)
@AutoConfigureBefore(JmsAutoConfiguration.class)
@AutoConfigureAfter({ JndiConnectionFactoryAutoConfiguration.class })
@ConditionalOnClass({ ConnectionFactory.class, ActiveMQConnectionFactory.class })
@ConditionalOnMissingBean(ConnectionFactory.class)
@EnableConfigurationProperties({ ActiveMQProperties.class, JmsProperties.class })
@Import({ ActiveMQXAConnectionFactoryConfiguration.class, ActiveMQConnectionFactoryConfiguration.class })
public class ActiveMQAutoConfiguration {
}
spring:
activemq:
user: admin
password: ${PASSWORD}
broker-url: tcp://127.0.0.1:61616
pool:
enabled: true
max-connections: 10
jms:
pub-sub-domain: false #只能监听队列,不能是topic。
# 可以设置监听器的默认属性
listener:
concurrency: 1
maxConcurrency: 2
#可以设置 template的一些默认属性。
template:
@ConfigurationProperties(prefix = "spring.activemq")
public class ActiveMQProperties {
/** * URL of the ActiveMQ broker.*/
private String brokerUrl;
/** 默认 broker是否仅内存模式。 */
private boolean inMemory = true;
private String user;
private String password;
/**
* Time to wait before considering a close complete.
*/
private Duration closeTimeout = Duration.ofSeconds(15);
/**
* Whether to stop message delivery before re-delivering messages from a rolled back
* transaction. This implies that message order is not preserved when this is enabled.
*/
private boolean nonBlockingRedelivery = false;
/** 发送消息 等待响应的超时。0 表示一直等待。*/
private Duration sendTimeout = Duration.ofMillis(0);
@NestedConfigurationProperty
private final JmsPoolConnectionFactoryProperties pool = new JmsPoolConnectionFactoryProperties();
private final Packages packages = new Packages();
//package org.springframework.boot.autoconfigure.jms;
public class JmsPoolConnectionFactoryProperties {
//是否启用连接池
private boolean enabled;
/** 连接池已满是否block。false:则抛出异常:JMSException
*/
private boolean blockIfFull = true;
/** 阻塞时间。连接池满,则抛出异常。 */
private Duration blockIfFullTimeout = Duration.ofMillis(-1);
/**
* Connection idle timeout.
*/
private Duration idleTimeout = Duration.ofSeconds(30);
/** 最大连接数 */
private int maxConnections = 1;
/** 每个连接的最大session数。*/
private int maxSessionsPerConnection = 500;
/**
* Time to sleep between runs of the idle connection eviction thread. When negative,
* no idle connection eviction thread runs.
*/
private Duration timeBetweenExpirationCheck = Duration.ofMillis(-1);
/** 仅使用一个匿名 生产者。false,则表示每次请求一个匿名producer。 */
private boolean useAnonymousProducers = true;
@ConfigurationProperties(prefix = "spring.jms")
public class JmsProperties {
/** 是否默认destination类型是topic */
private boolean pubSubDomain = false;
/**
* Connection factory JNDI name. When set, takes precedence to others connection factory auto-configurations.
*/
private String jndiName;
//Cache的设置
private final Cache cache = new Cache();
//Listener设置
private final Listener listener = new Listener();
private final Template template = new Template();
public static class Listener {
/** * 自动启动 */
private boolean autoStartup = true;
/**ACK模式,默认:auto */
private AcknowledgeMode acknowledgeMode;
private Integer concurrency;
private Integer maxConcurrency;
/**
* Timeout to use for receive calls. Use -1 for a no-wait receive or 0 for no
* timeout at all. The latter is only feasible if not running within a transaction
* manager and is generally discouraged since it prevents clean shutdown.
*/
private Duration receiveTimeout = Duration.ofSeconds(1);
}
public static class Template {
/** 未设置 defaultDestination时的默认名。 */
private String defaultDestination;
private Duration deliveryDelay;
private DeliveryMode deliveryMode;
private Integer priority;
private Duration timeToLive;
private Boolean qosEnabled;
private Duration receiveTimeout;
}
}
@Configuration(proxyBeanMethods = false)
//不存在 ConnectionFactory Bean时应用配置
@ConditionalOnMissingBean(ConnectionFactory.class)
class ActiveMQConnectionFactoryConfiguration {
@Configuration(proxyBeanMethods = false)
//不启用 连接池 时配置 ( pool.enabled:false)
@ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "false",
matchIfMissing = true)
static class SimpleConnectionFactoryConfiguration {
// 不启用 缓存时配置(cache.enabled:false)。
@Bean
@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "false")
ActiveMQConnectionFactory jmsConnectionFactory(ActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
return createJmsConnectionFactory(properties, factoryCustomizers);
}
private static ActiveMQConnectionFactory createJmsConnectionFactory(ActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
return new ActiveMQConnectionFactoryFactory(properties,
factoryCustomizers.orderedStream().collect(Collectors.toList()))
//通过FactoryFactory 构造 Factory
.createConnectionFactory(ActiveMQConnectionFactory.class);
}
//启用 缓存 时配置
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(CachingConnectionFactory.class)
@ConditionalOnProperty(prefix = "spring.jms.cache", name = "enabled", havingValue = "true",
matchIfMissing = true)
static class CachingConnectionFactoryConfiguration {
@Bean
CachingConnectionFactory cachingJmsConnectionFactory(JmsProperties jmsProperties,
ActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
// 获取cache。
JmsProperties.Cache cacheProperties = jmsProperties.getCache();
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
createJmsConnectionFactory(properties, factoryCustomizers));
//设置到factory。
connectionFactory.setCacheConsumers(cacheProperties.isConsumers());
connectionFactory.setCacheProducers(cacheProperties.isProducers());
connectionFactory.setSessionCacheSize(cacheProperties.getSessionCacheSize());
return connectionFactory;
}
}
}
// 启用 了 连接池 时的配置。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass({ JmsPoolConnectionFactory.class, PooledObject.class })
static class PooledConnectionFactoryConfiguration {
// 启用了 连接池。
@Bean(destroyMethod = "stop")
@ConditionalOnProperty(prefix = "spring.activemq.pool", name = "enabled", havingValue = "true")
JmsPoolConnectionFactory pooledJmsConnectionFactory(ActiveMQProperties properties,
ObjectProvider<ActiveMQConnectionFactoryCustomizer> factoryCustomizers) {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactoryFactory(properties,
factoryCustomizers.orderedStream().collect(Collectors.toList()))
.createConnectionFactory(ActiveMQConnectionFactory.class);
return new JmsPoolConnectionFactoryFactory(properties.getPool())
.createPooledConnectionFactory(connectionFactory);
}
}
}
启用Jms的注解配置类。
//配置前提:引入了类:EnableJms
@ConditionalOnClass(EnableJms.class)
class JmsAnnotationDrivenConfiguration {
//几个重要属性
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<JtaTransactionManager> transactionManager;
private final ObjectProvider<MessageConverter> messageConverter;
private final JmsProperties properties;
//默认:DefaultJmsListenerContainerFactoryConfigurer
@Bean
@ConditionalOnMissingBean
DefaultJmsListenerContainerFactoryConfigurer jmsListenerContainerFactoryConfigurer() {
DefaultJmsListenerContainerFactoryConfigurer configurer = new DefaultJmsListenerContainerFactoryConfigurer();
configurer.setDestinationResolver(this.destinationResolver.getIfUnique());
configurer.setTransactionManager(this.transactionManager.getIfUnique());
configurer.setMessageConverter(this.messageConverter.getIfUnique());
configurer.setJmsProperties(this.properties);
return configurer;
}
//默认:DefaultJmsListenerContainerFactory
@Bean
@ConditionalOnSingleCandidate(ConnectionFactory.class)
@ConditionalOnMissingBean(name = "jmsListenerContainerFactory")
DefaultJmsListenerContainerFactory jmsListenerContainerFactory(
DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
return factory;
}
@Configuration(proxyBeanMethods = false)
@EnableJms
@ConditionalOnMissingBean(name = JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
static class EnableJmsConfiguration {
}
//配置默认:DestinationResolver
@Configuration(proxyBeanMethods = false)
@ConditionalOnJndi
static class JndiConfiguration {
@Bean
@ConditionalOnMissingBean(DestinationResolver.class)
JndiDestinationResolver destinationResolver() {
JndiDestinationResolver resolver = new JndiDestinationResolver();
resolver.setFallbackToDynamicDestination(true);
return resolver;
}
}
}
主要用于创建 ActiveMQConnectionFactory。根据配置的properties,ActiveMQConnectionFactoryCustomizer 构造 ActiveMQConnectionFactory 实例。
class ActiveMQConnectionFactoryFactory {
private static final String DEFAULT_EMBEDDED_BROKER_URL = "vm://localhost?broker.persistent=false";
private static final String DEFAULT_NETWORK_BROKER_URL = "tcp://localhost:61616";
//属性
private final ActiveMQProperties properties;
// 设置属性
private final List<ActiveMQConnectionFactoryCustomizer> factoryCustomizers;
private <T extends ActiveMQConnectionFactory> T doCreateConnectionFactory(Class<T> factoryClass) throws Exception {
T factory = createConnectionFactoryInstance(factoryClass);
if (this.properties.getCloseTimeout() != null) {
factory.setCloseTimeout((int) this.properties.getCloseTimeout().toMillis());
}
factory.setNonBlockingRedelivery(this.properties.isNonBlockingRedelivery());
if (this.properties.getSendTimeout() != null) {
factory.setSendTimeout((int) this.properties.getSendTimeout().toMillis());
}
Packages packages = this.properties.getPackages();
if (packages.getTrustAll() != null) {
factory.setTrustAllPackages(packages.getTrustAll());
}
if (!packages.getTrusted().isEmpty()) {
factory.setTrustedPackages(packages.getTrusted());
}
//使用 ActiveMQConnectionFactoryCustomizer 设置
customize(factory);
return factory;
}
}
创建带连接池的ConnectionFactory的工厂。
public class JmsPoolConnectionFactoryFactory {
private final JmsPoolConnectionFactoryProperties properties;
public JmsPoolConnectionFactory createPooledConnectionFactory(ConnectionFactory connectionFactory) {
JmsPoolConnectionFactory pooledConnectionFactory = new JmsPoolConnectionFactory();
//包装 ConnectionFactory
pooledConnectionFactory.setConnectionFactory(connectionFactory);
//其他设置JmsPoolConnectionFactory 的属性。
pooledConnectionFactory.setBlockIfSessionPoolIsFull(this.properties.isBlockIfFull());
if (this.properties.getBlockIfFullTimeout() != null) {
pooledConnectionFactory
.setBlockIfSessionPoolIsFullTimeout(this.properties.getBlockIfFullTimeout().toMillis());
}
......
}
}
连接池,创建连接。
public class JmsPoolConnectionFactory implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
public Connection createConnection() throws JMSException {
return this.createConnection((String)null, (String)null);
}
}
用于在ActiveMQConnectionFactoryFactory 生成 ActiveMQConnectionFactory时 设置 ActiveMQConnectionFactory 的值。
@FunctionalInterface
public interface ActiveMQConnectionFactoryCustomizer {
/**
* Customize the {@link ActiveMQConnectionFactory}.
* @param factory the factory to customize
*/
void customize(ActiveMQConnectionFactory factory);
}
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Repeatable(JmsListeners.class)
@MessageMapping
public @interface JmsListener {
//用于 MethodJmsListenerEndpoint id。在注册时使用,必须唯一。
String id() default "";
String containerFactory() default "";
String destination();
/** * 持久化订阅的名字 */
String subscription() default "";
/**消息选择器 **/
String selector() default "";
/** 消费者的并行数量限制。如果设置了,会覆盖 container factory的设置。可以是“5-10”,“10”这样的设置。
注意可能不是所有的都支持。
*/
String concurrency() default "";
}
public @interface JmsListeners {
JmsListener[] value();
}
主要用于配置 JmsListenerEndpointRegistrar
@FunctionalInterface
public interface JmsListenerConfigurer {
void configureJmsListeners(JmsListenerEndpointRegistrar registrar);
}
Spring 会对注解了@JmsListener,@JmsListeners 的 方 法 进 行 处 理 ,会使用 JmsListenerAnnotationBeanPostProcessor 来处理 标注了 @JmsListener ,@JmsListeners 的类和方法。
public class JmsListenerAnnotationBeanPostProcessor
implements MergedBeanDefinitionPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof AopInfrastructureBean || bean instanceof JmsListenerContainerFactory ||
bean instanceof JmsListenerEndpointRegistry) {
// Ignore AOP infrastructure such as scoped proxies.
return bean;
}
Class<?> targetClass = AopProxyUtils.ultimateTargetClass(bean);
if (!this.nonAnnotatedClasses.contains(targetClass) &&
//
AnnotationUtils.isCandidateClass(targetClass, JmsListener.class)) {
Map<Method, Set<JmsListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<JmsListener>>) method -> {
//获取 @JmsListener
Set<JmsListener> listenerMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(
method, JmsListener.class, JmsListeners.class);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(targetClass);
if (logger.isTraceEnabled()) {
logger.trace("No @JmsListener annotations found on bean type: " + targetClass);
}
}
else {
// Non-empty set of methods
annotatedMethods.forEach((method, listeners) ->
listeners.forEach(listener -> processJmsListener(listener, method, bean)));
if (logger.isDebugEnabled()) {
logger.debug(annotatedMethods.size() + " @JmsListener methods processed on bean '" + beanName +
"': " + annotatedMethods);
}
}
}
return bean;
}
//对每个 JmsListener 处理。
protected void processJmsListener(JmsListener jmsListener, Method mostSpecificMethod, Object bean) {
Method invocableMethod = AopUtils.selectInvocableMethod(mostSpecificMethod, bean.getClass());
MethodJmsListenerEndpoint endpoint = createMethodJmsListenerEndpoint();
endpoint.setBean(bean);
endpoint.setMethod(invocableMethod);
endpoint.setMostSpecificMethod(mostSpecificMethod);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setEmbeddedValueResolver(this.embeddedValueResolver);
endpoint.setBeanFactory(this.beanFactory);
endpoint.setId(getEndpointId(jmsListener));
//根据配置设置属性。
endpoint.setDestination(resolve(jmsListener.destination()));
if (StringUtils.hasText(jmsListener.selector())) {
endpoint.setSelector(resolve(jmsListener.selector()));
}
if (StringUtils.hasText(jmsListener.subscription())) {
endpoint.setSubscription(resolve(jmsListener.subscription()));
}
if (StringUtils.hasText(jmsListener.concurrency())) {
endpoint.setConcurrency(resolve(jmsListener.concurrency()));
}
JmsListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(jmsListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, JmsListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register JMS listener endpoint on [" +
mostSpecificMethod + "], no " + JmsListenerContainerFactory.class.getSimpleName() +
" with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
this.registrar.registerEndpoint(endpoint, factory);
}
@Override
public void afterSingletonsInstantiated() {
// Remove resolved singleton classes from cache
this.nonAnnotatedClasses.clear();
if (this.beanFactory instanceof ListableBeanFactory) {
//可以在其他地方定义 JmsListenerConfigurer
Map<String, JmsListenerConfigurer> beans =
((ListableBeanFactory) this.beanFactory).getBeansOfType(JmsListenerConfigurer.class);
List<JmsListenerConfigurer> configurers = new ArrayList<>(beans.values());
AnnotationAwareOrderComparator.sort(configurers);
for (JmsListenerConfigurer configurer : configurers) {
configurer.configureJmsListeners(this.registrar);
}
}
if (this.containerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.containerFactoryBeanName);
}
if (this.registrar.getEndpointRegistry() == null) {
// Determine JmsListenerEndpointRegistry bean from the BeanFactory
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
JmsListenerConfigUtils.JMS_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME, JmsListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
// Actually register all listeners
this.registrar.afterPropertiesSet();
}
}
用于注册Listener与对应的ContainerFactory。
public class JmsListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
//JmsListenerEndpoint 对应Listener包装
//JmsListenerContainerFactory :
public void registerEndpoint(JmsListenerEndpoint endpoint, @Nullable JmsListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// 包装成一个 JmsListenerEndpointDescriptor
JmsListenerEndpointDescriptor descriptor = new JmsListenerEndpointDescriptor(endpoint, factory);
synchronized (this.mutex) {
if (this.startImmediately) { // register and start immediately
Assert.state(this.endpointRegistry != null, "No JmsListenerEndpointRegistry set");
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
}
@JmsListener 的包装类。
public class MethodJmsListenerEndpoint extends AbstractJmsListenerEndpoint implements BeanFactoryAware {
@Nullable
private Object bean;
//注解了@JmsListener的方法
@Nullable
private Method method;
@Nullable
private Method mostSpecificMethod;
@Nullable
private MessageHandlerMethodFactory messageHandlerMethodFactory;
@Nullable
private StringValueResolver embeddedValueResolver;
//MessageListenerContainer 一般是 DefaultMessageListenerContainer
@Override
protected MessagingMessageListenerAdapter createMessageListener(MessageListenerContainer container) {
Assert.state(this.messageHandlerMethodFactory != null,
"Could not create message listener - MessageHandlerMethodFactory not set");
MessagingMessageListenerAdapter messageListener = createMessageListenerInstance();
Object bean = getBean();
Method method = getMethod();
Assert.state(bean != null && method != null, "No bean+method set on endpoint");
InvocableHandlerMethod invocableHandlerMethod =
this.messageHandlerMethodFactory.createInvocableHandlerMethod(bean, method);
messageListener.setHandlerMethod(invocableHandlerMethod);
String responseDestination = getDefaultResponseDestination();
if (StringUtils.hasText(responseDestination)) {
// 根据是否TOPIC OR QUEUE 设置
if (container.isReplyPubSubDomain()) {
messageListener.setDefaultResponseTopicName(responseDestination);
}
else {
messageListener.setDefaultResponseQueueName(responseDestination);
}
}
//根据 container 的属性设置 ListenerAdapter
QosSettings responseQosSettings = container.getReplyQosSettings();
if (responseQosSettings != null) {
messageListener.setResponseQosSettings(responseQosSettings);
}
MessageConverter messageConverter = container.getMessageConverter();
if (messageConverter != null) {
messageListener.setMessageConverter(messageConverter);
}
DestinationResolver destinationResolver = container.getDestinationResolver();
if (destinationResolver != null) {
messageListener.setDestinationResolver(destinationResolver);
}
return messageListener;
}
/**
* Create an empty {@link MessagingMessageListenerAdapter} instance.
* @return a new {@code MessagingMessageListenerAdapter} or subclass thereof
*/
protected MessagingMessageListenerAdapter createMessageListenerInstance() {
return new MessagingMessageListenerAdapter();
}
}
处理消息的适配器。
public class MessagingMessageListenerAdapter extends AbstractAdaptableMessageListener {
//注解了@JmsListener的方法。
@Nullable
private InvocableHandlerMethod handlerMethod;
@Override
public void onMessage(javax.jms.Message jmsMessage, @Nullable Session session) throws JMSException {
//消息格式转换
Message<?> message = toMessagingMessage(jmsMessage);
if (logger.isDebugEnabled()) {
logger.debug("Processing [" + message + "]");
}
//解析消息
//可以处理返回值。
Object result = invokeHandler(jmsMessage, session, message);
if (result != null) {
handleResult(result, jmsMessage, session);
}
else {
logger.trace("No result object given - no result to handle");
}
}
@Nullable
private Object invokeHandler(javax.jms.Message jmsMessage, @Nullable Session session, Message<?> message) {
//获取对应的Method
InvocableHandlerMethod handlerMethod = getHandlerMethod();
try {
//Method 调用
return handlerMethod.invoke(message, jmsMessage, session);
}
catch (MessagingException ex) {
throw new ListenerExecutionFailedException(
createMessagingErrorMessage("Listener method could not be invoked with incoming message"), ex);
}
catch (Exception ex) {
throw new ListenerExecutionFailedException("Listener method '" +
handlerMethod.getMethod().toGenericString() + "' threw exception", ex);
}
}
//根据需要转换为对应的Message 格式。
protected Message<?> toMessagingMessage(javax.jms.Message jmsMessage) {
try {
return (Message<?>) getMessagingMessageConverter().fromMessage(jmsMessage);
}
catch (JMSException ex) {
throw new MessageConversionException("Could not convert JMS message", ex);
}
}
}

//package org.springframework.messaging.handler.invocation;
public class InvocableHandlerMethod extends HandlerMethod {
@Nullable
//message 这是接收到的消息,
//providedArgs 为传入的其他参数:javax.jms.Message jmsMessage, @Nullable Session session
public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
Object[] args = getMethodArgumentValues(message, providedArgs);
if (logger.isTraceEnabled()) {
logger.trace("Arguments: " + Arrays.toString(args));
}
return doInvoke(args);
}
}
对@JmsListener注解的方法的封装。里面设置有其注解里的属性。
public abstract class AbstractMessageListenerContainer extends AbstractJmsListeningContainer
{
//@JmsListener 中定义的
@Nullable
private volatile Object destination;
//@JmsListener 中定义的
@Nullable
private volatile String messageSelector;
// MessagingMessageListenerAdapter ,里面包含JmsListener处理方法
@Nullable
private volatile Object messageListener;
private boolean subscriptionDurable = false;
private boolean subscriptionShared = false;
//@JmsListener 中定义的
@Nullable
private String subscriptionName;
//是topic则设置为true
@Nullable
private Boolean replyPubSubDomain;
@Nullable
private QosSettings replyQosSettings;
protected void doExecuteListener(Session session, Message message) throws JMSException {
if (!isAcceptMessagesWhileStopping() && !isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because of the listener container " +
"having been stopped in the meantime: " + message);
}
rollbackIfNecessary(session);
throw new MessageRejectedWhileStoppingException();
}
try {
//处理消息。
invokeListener(session, message);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
commitIfNecessary(session, message);
}
protected void commitIfNecessary(Session session, @Nullable Message message) throws JMSException {
// 是事务型的。
if (session.getTransacted()) {
// Commit necessary - but avoid commit call within a JTA transaction.
//本地事务,
if (isSessionLocallyTransacted(session)) {
// Transacted session created by this container -> commit.
JmsUtils.commitIfNecessary(session);
}
}
//非事务型的,并且是客户端ACK,则自动应答
//session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE
else if (message != null && isClientAcknowledge(session)) {
message.acknowledge();
}
}
//事务提交
public static void commitIfNecessary(Session session) throws JMSException {
Assert.notNull(session, "Session must not be null");
try {
session.commit();
}
catch (javax.jms.TransactionInProgressException | javax.jms.IllegalStateException ex) {
// Ignore -> can only happen in case of a JTA transaction.
}
}
protected void invokeListener(Session session, Message message) throws JMSException {
Object listener = getMessageListener();
if (listener instanceof SessionAwareMessageListener) {
doInvokeListener((SessionAwareMessageListener) listener, session, message);
}
else if (listener instanceof MessageListener) {
doInvokeListener((MessageListener) listener, message);
}
else if (listener != null) {
throw new IllegalArgumentException(
"Only MessageListener and SessionAwareMessageListener supported: " + listener);
}
else {
throw new IllegalStateException("No message listener specified - see property 'messageListener'");
}
}
protected void doInvokeListener(SessionAwareMessageListener listener, Session session, Message message)
throws JMSException {
Connection conToClose = null;
Session sessionToClose = null;
try {
//主要是为了判断 session是否变化
Session sessionToUse = session;
if (!isExposeListenerSession()) {
// We need to expose a separate Session.
conToClose = createConnection();
sessionToClose = createSession(conToClose);
sessionToUse = sessionToClose;
}
//处理消息。
listener.onMessage(message, sessionToUse);
// Clean up specially exposed Session, if any.
if (sessionToUse != session) {
if (sessionToUse.getTransacted() && isSessionLocallyTransacted(sessionToUse)) {
// Transacted session created by this container -> commit.
JmsUtils.commitIfNecessary(sessionToUse);
}
}
}
finally {
JmsUtils.closeSession(sessionToClose);
JmsUtils.closeConnection(conToClose);
}
}
protected void invokeListener(Session session, Message message) throws JMSException {
//获取到MessagingMessageListenerAdapter ,里面包含JmsListener处理方法
Object listener = getMessageListener();
if (listener instanceof SessionAwareMessageListener) {
doInvokeListener((SessionAwareMessageListener) listener, session, message);
}
else if (listener instanceof MessageListener) {
doInvokeListener((MessageListener) listener, message);
}
else if (listener != null) {
throw new IllegalArgumentException(
"Only MessageListener and SessionAwareMessageListener supported: " + listener);
}
else {
throw new IllegalStateException("No message listener specified - see property 'messageListener'");
}
}
}
DefaultMessageListenerContainer是一个用于异步消息监听的管理类。循环去broker获取消息。
每个DefaultMessageListenerContainer的实例对应一个Consumer。
public class DefaultMessageListenerContainer extends AbstractPollingMessageListenerContainer {
private boolean executeOngoingLoop() throws JMSException {
boolean messageReceived = false;
boolean active = true;
while (active) {
synchronized (lifecycleMonitor) {
....
}
//active 则继续接收消息
if (active) {
messageReceived = (invokeListener() || messageReceived);
}
}
return messageReceived;
}
private boolean invokeListener() throws JMSException {
this.currentReceiveThread = Thread.currentThread();
try {
initResourcesIfNecessary();
boolean messageReceived = receiveAndExecute(this, this.session, this.consumer);
this.lastMessageSucceeded = true;
return messageReceived;
}
finally {
this.currentReceiveThread = null;
}
}
//AbstractPollingMessageListenerContainer 方法
//从consumer中获取消息
@Nullable
protected Message receiveMessage(MessageConsumer consumer) throws JMSException {
return receiveFromConsumer(consumer, getReceiveTimeout());
}
//处理消息
protected void doExecuteListener(Session session, Message message) throws JMSException {
if (!isAcceptMessagesWhileStopping() && !isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because of the listener container " +
"having been stopped in the meantime: " + message);
}
rollbackIfNecessary(session);
throw new MessageRejectedWhileStoppingException();
}
try {
invokeListener(session, message);
}
catch (JMSException | RuntimeException | Error ex) {
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
commitIfNecessary(session, message);
}
}
DefaultMessageListenerContainer 会由DefaultJmsListenerContainerFactory 创建, DefaultJmsListenerContainerFactory 又是由DefaultJmsListenerContainerFactoryConfigurer 进行配置。
public final class DefaultJmsListenerContainerFactoryConfigurer {
public void configure(DefaultJmsListenerContainerFactory factory, ConnectionFactory connectionFactory) {
Assert.notNull(factory, "Factory must not be null");
Assert.notNull(connectionFactory, "ConnectionFactory must not be null");
factory.setConnectionFactory(connectionFactory);
factory.setPubSubDomain(this.jmsProperties.isPubSubDomain());
if (this.transactionManager != null) {
factory.setTransactionManager(this.transactionManager);
}
else {
//没配置TM,则设置为true。
factory.setSessionTransacted(true);
}
if (this.destinationResolver != null) {
factory.setDestinationResolver(this.destinationResolver);
}
if (this.messageConverter != null) {
factory.setMessageConverter(this.messageConverter);
}
// 配置的JmsProperties属性。
JmsProperties.Listener listener = this.jmsProperties.getListener();
factory.setAutoStartup(listener.isAutoStartup());
if (listener.getAcknowledgeMode() != null) {
factory.setSessionAcknowledgeMode(listener.getAcknowledgeMode().getMode());
}
String concurrency = listener.formatConcurrency();
if (concurrency != null) {
factory.setConcurrency(concurrency);
}
Duration receiveTimeout = listener.getReceiveTimeout();
if (receiveTimeout != null) {
factory.setReceiveTimeout(receiveTimeout.toMillis());
}
}
}
public class DefaultJmsListenerContainerFactory
extends AbstractJmsListenerContainerFactory<DefaultMessageListenerContainer> {
//构造 DefaultMessageListenerContainer
@Override
protected DefaultMessageListenerContainer createContainerInstance() {
return new DefaultMessageListenerContainer();
}
// AbstractJmsListenerContainerFactory 的一些方法
}
@Override
public C createListenerContainer(JmsListenerEndpoint endpoint) {
C instance = createContainerInstance();
if (this.connectionFactory != null) {
instance.setConnectionFactory(this.connectionFactory);
}
if (this.destinationResolver != null) {
instance.setDestinationResolver(this.destinationResolver);
}
if (this.errorHandler != null) {
instance.setErrorHandler(this.errorHandler);
}
if (this.messageConverter != null) {
instance.setMessageConverter(this.messageConverter);
}
if (this.sessionTransacted != null) {
instance.setSessionTransacted(this.sessionTransacted);
}
if (this.sessionAcknowledgeMode != null) {
instance.setSessionAcknowledgeMode(this.sessionAcknowledgeMode);
}
if (this.pubSubDomain != null) {
instance.setPubSubDomain(this.pubSubDomain);
}
if (this.replyPubSubDomain != null) {
instance.setReplyPubSubDomain(this.replyPubSubDomain);
}
if (this.replyQosSettings != null) {
instance.setReplyQosSettings(this.replyQosSettings);
}
if (this.subscriptionDurable != null) {
instance.setSubscriptionDurable(this.subscriptionDurable);
}
if (this.subscriptionShared != null) {
instance.setSubscriptionShared(this.subscriptionShared);
}
if (this.clientId != null) {
instance.setClientId(this.clientId);
}
if (this.phase != null) {
instance.setPhase(this.phase);
}
if (this.autoStartup != null) {
instance.setAutoStartup(this.autoStartup);
}
initializeContainer(instance);
// 设置 endpoint 的container
endpoint.setupListenerContainer(instance);
return instance;
}
/**
* Create an empty container instance.
*/
protected abstract C createContainerInstance();
创建消费者实例。根据ActiveMQSession的参数创建。
public ActiveMQMessageConsumer(ActiveMQSession session, ConsumerId consumerId, ActiveMQDestination dest,
String name, String selector, int prefetch,
int maximumPendingMessageCount, boolean noLocal, boolean browser,
boolean dispatchAsync, MessageListener messageListener) throws JMSException {
//..... .... 省略一些
this.session = session;
//此处可以根据 destination 获取 重发策略。
this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
if (this.redeliveryPolicy == null) {
this.redeliveryPolicy = new RedeliveryPolicy();
}
setTransformer(session.getTransformer());
//以下代码就是设置一些属性。
//。。。。。。
//.......
this.info.setDestination(dest);
this.info.setBrowser(browser);
//selector
if (selector != null && selector.trim().length() != 0) {
// Validate the selector
SelectorParser.parse(selector);
this.info.setSelector(selector);
this.selector = selector;
} else if (info.getSelector() != null) {
// Validate the selector
SelectorParser.parse(this.info.getSelector());
this.selector = this.info.getSelector();
} else {
this.selector = null;
}
//..... ....
if (messageListener != null) {
setMessageListener(messageListener);
}
try {
this.session.addConsumer(this);
this.session.syncSendPacket(info);
} catch (JMSException e) {
this.session.removeConsumer(this);
throw e;
}
if (session.connection.isStarted()) {
start();
}
}
Activemq的Destination主要分为Topic和Queue,还有其他的。
destination的name 通过 DynamicDestinationResolver 解决,其是在JmsDestinationAccessor里创建的,JmsDestinationAccessor是DefaultMessageListenerContainer 的父类。
public interface Destination {
}
public interface Queue extends Destination {
String getQueueName() throws JMSException;
String toString();
}
public abstract class ActiveMQDestination
{
}
public class ActiveMQQueue extends ActiveMQDestination implements Queue {
}
public class ActiveMQTopic extends ActiveMQDestination implements Topic {
}

默认情况下,当使用了@JmsListener 注解,而又没有自定义 JmsListenerContainerFactory时,Spring Boot 会自动创建一个默认的对象。
如果想创建多个 JmsListenerContainerFactory,可使用 Spring Boot 提供的DefaultJmsListenerContainerFactoryConfigurer 来创建
@Configuration
static class JmsConfiguration {
// 默认的bean的名字为jmsListenerContainerFactory,此处可以设置自己的。
//如果与默认的一致,则默认的不会再被创建了。
@Bean(name="myContainerFactory")
public DefaultJmsListenerContainerFactory myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory());
factory.setMessageConverter (myMessageConverter());
return factory;
}
}
上面代码定义完JmsListenerContainerFactory 之 后 , 在 @JmsListener 注 解中指定containerFactory 为对应的 Factory 名字(myFactory) 即可。
//消费topic的
@Bean(name = { "jmsListenerContainerFactory4Topic" })
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Topic(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//设置一些默认值
configurer.configure(factory, connectionFactory());
//设置 启用发布/订阅
factory.setPubSubDomain(true);
if (this.transactionManager != null) {
factory.setTransactionManager(transactionManager);
} else {
factory.setSessionTransacted(Boolean.valueOf(true));
}
JmsProperties.Listener listener = jmsProperties.getListener();
factory.setAutoStartup(listener.isAutoStartup());
if (listener.getAcknowledgeMode() != null) {
factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
}
String concurrency = listener.formatConcurrency();
if (concurrency != null)
factory.setConcurrency(concurrency);
return factory;
}
//消费Queue的
@Bean(name = { "jmsListenerContainerFactory4Queue" })
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory4Queue(DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
//设置一些默认值
configurer.configure(factory, connectionFactory());
//不启用发布/订阅
factory.setPubSubDomain(false);
if (this.transactionManager != null) {
factory.setTransactionManager(transactionManager);
} else {
factory.setSessionTransacted(Boolean.valueOf(true));
}
JmsProperties.Listener listener = jmsProperties.getListener();
factory.setAutoStartup(listener.isAutoStartup());
if (listener.getAcknowledgeMode() != null) {
factory.setSessionAcknowledgeMode(Integer.valueOf(listener.getAcknowledgeMode().getMode()));
}
String concurrency = listener.formatConcurrency();
if (concurrency != null)
factory.setConcurrency(concurrency);
return factory;
}
/**
* 自定义JmsTemplate,支持事务
*
*/
@Configuration
public class JmsTemplateConfiguration {
private final JmsProperties properties;
private final ObjectProvider<DestinationResolver> destinationResolver;
private final ObjectProvider<MessageConverter> messageConverter;
public JmsTemplateConfiguration(JmsProperties properties,
ObjectProvider<DestinationResolver> destinationResolver,
ObjectProvider<MessageConverter> messageConverter) {
this.properties = properties;
this.destinationResolver = destinationResolver;
this.messageConverter = messageConverter;
}
/**
* 配置队列生产者的JmsTemplate
* @param connectionFactory
* @return
*/
@Bean(name="jmsQueueTemplate")
@Primary
public JmsTemplate jmsQueueTemplate(ConnectionFactory connectionFactory) {
//设置创建连接的工厂
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
//设置P2P队列消息类型
jmsTemplate.setPubSubDomain(false);
DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
if (destinationResolver != null) {
jmsTemplate.setDestinationResolver(destinationResolver);
}
MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
if (messageConverter != null) {
jmsTemplate.setMessageConverter(messageConverter);
}
//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
jmsTemplate.setExplicitQosEnabled(true);
//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//默认不开启事务
System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted());
//如果不启用事务,则会导致XA事务失效;
//作为生产者如果需要支持事务,则需要配置SessionTransacted为true
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
/**
* 配置发布订阅生产者的JmsTemplate
*/
@Bean(name="jmsTopicTemplate")
public JmsTemplate jmsTopicTemplate(ConnectionFactory connectionFactory) {
//设置创建连接的工厂
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
//设置发布订阅消息类型
jmsTemplate.setPubSubDomain(true);
DestinationResolver destinationResolver = (DestinationResolver) this.destinationResolver.getIfUnique();
if (destinationResolver != null) {
jmsTemplate.setDestinationResolver(destinationResolver);
}
MessageConverter messageConverter = (MessageConverter) this.messageConverter.getIfUnique();
if (messageConverter != null) {
jmsTemplate.setMessageConverter(messageConverter);
}
//deliveryMode, priority, timeToLive 的开关,要生效,必须配置为true,默认false
jmsTemplate.setExplicitQosEnabled(true);
//DeliveryMode.NON_PERSISTENT=1:非持久 ; DeliveryMode.PERSISTENT=2:持久
jmsTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
//默认不开启事务
System.out.println("默认是否开启事务:"+jmsTemplate.isSessionTransacted());
//如果不启用事务,则会导致XA事务失效;
//作为生产者如果需要支持事务,则需要配置SessionTransacted为true
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}
}
@Configuration
public class JmsMessageConfiguration {
//通过name 引用前面定义的JmsTemplate
@Autowired
@Qualifier(value="jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
@Autowired
@Qualifier(value="jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
/** * 定义点对点队列 */
@Bean
public Queue queue() {
return new ActiveMQQueue("my.queue");
}
/** 定义一个主题 */
@Bean
public Topic topic() {
return new ActiveMQTopic("my.topic");
}
/** 创建处理队列消息模板 */
@Bean(name="jmsQueueMessagingTemplate")
public JmsMessagingTemplate jmsQueueMessagingTemplate() {
return new JmsMessagingTemplate(jmsQueueTemplate);
}
/** 创建处理发布订阅消息模板 */
@Bean(name="jmsTopicMessagingTemplate")
public JmsMessagingTemplate jmsTopicMessagingTemplate() {
return new JmsMessagingTemplate(jmsTopicTemplate);
}
}
@Service
public class SendService {
@Autowired
private Queue queue;
@Autowired
@Qualifier(value="jmsQueueMessagingTemplate")
private JmsMessagingTemplate jmsQueueMessagingTemplate;
@Autowired
private Topic topic;
@Autowired
@Qualifier(value="jmsTopicMessagingTemplate")
private JmsMessagingTemplate jmsTopicMessagingTemplate;
//使用事务。
@Transactional(propagation=Propagation.REQUIRED,rollbackFor=ArithmeticException.class)
public void send(){
//发送队列消息
jmsQueueMessagingTemplate.convertAndSend(this.queue, "生产者辛苦生产的点对点消息成果");
System.out.println("生产者:辛苦生产的点对点消息成果");
//发送发布订阅消息
jmsTopicMessagingTemplate.convertAndSend(this.topic, "生产者辛苦生产的发布订阅消息成果");
System.out.println("生产者:辛苦生产的发布订阅消息成果");
System.out.println(1/0);
}
}
@JmsListener(destination = "so55038881")
// 会自动设置Session,还可以获取到JmsMessage。
public void listen(String in, Session session) {
System.out.println(in + ":" + session);
}
//通过@Autowired 把 jmsTemplate 注入。
@Autowired
public MyBean(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
每个@JmsListener 都会产生一个container。每个container 产生一个session。每个session 都有自己的id
JmsPoolSession { ActiveMQSession {id=ID:DESKTOP-F4Q4JRF-52518-1666324884549-1:1:1,started=true} java.lang.Object@150e4fcb } JmsPoolSession { ActiveMQSession {id=ID:DESKTOP-F4Q4JRF-52518-1666324884549-1:2:1,started=true} java.lang.Object@3aa2221d }
- 1
- 2
使用ActiveMQConnectionFactoryCustomizer修改 ConnectionFactory的 RedeliveryPolicyMap 属性。
下面代码配置了2个ActiveMQConnectionFactoryCustomizer,都被执行了。其中一个设置了指定queue的 RedeliveryPolicy。
@Bean
public ActiveMQConnectionFactoryCustomizer myCustomizer() {
return new ActiveMQConnectionFactoryCustomizer() {
/**
* Customize the {@link ActiveMQConnectionFactory}.
*
* @param factory the factory to customize
*/
@Override
public void customize(ActiveMQConnectionFactory factory) {
System.out.println("Customizer 1");
}
};
}
@Bean
public ActiveMQConnectionFactoryCustomizer myCustomizer2() {
return new ActiveMQConnectionFactoryCustomizer() {
/**
* Customize the {@link ActiveMQConnectionFactory}.
*
* @param factory the factory to customize
*/
@Override
public void customize(ActiveMQConnectionFactory factory) {
//设置了队列的 policy。
ActiveMQQueue q = new ActiveMQQueue("queue.dlq.test");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
factory.getRedeliveryPolicyMap().put(q,policy);
System.out.println("Customizer 2");
}
};
}
也可以在DefaultJmsListenerContainerFactory 创建时设置。
用于存储不同destination 对应的不同 重投递策略。
Key:ActiveMQDestination,通过physicalName equals。
Value:RedeliveryPolicy
return physicalName.equals(d.physicalName);