当消息不能重投递或者消息过期,会被移到死信队列中,由管理员消费。
可以进行以下操作:

消息重投递的常用场景:
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);
重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。
默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。
重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。
默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。
配置个性化死信队列。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true"
processExpired="false"
processNonPersistent="false"/>
deadLetterStrategy>
policyEntry>
policyEntries>
policyMap>
destinationPolicy>
参考:
重投递策略,投递策略 ,https://activemq.apache.org/per-destination-policies, https://activemq.apache.org/message-redelivery-and-dlq-handling
@Bean
public ActiveMQConnectionFactoryCustomizer myCustomizer2() {
return new ActiveMQConnectionFactoryCustomizer() {
/**
* Customize the {@link ActiveMQConnectionFactory}.
*
* @param factory the factory to customize
*/
@Override
public void customize(ActiveMQConnectionFactory factory) {
//设置了队列的 policy。
ActiveMQQueue q = new ActiveMQQueue("queue.dlq.test");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
factory.getRedeliveryPolicyMap().put(q,policy);
System.out.println("Customizer 2");
}
};
}
@JmsListener(destination = "queue.dlq.test" ,id = "test")
public void consume(String param, Session session) {
try {
System.out.println(session.getAcknowledgeMode());
System.out.println(param);
//回滚,则重投递
session.rollback();
}catch (Exception ex){
}
}
由于默认是开启事务的,因此抛出异常,会自动触发回滚。
@JmsListener(destination = "queue.dlq.test2",id="test2")
public void consume2(String param, Session session) {
System.out.println(param);
throw new RuntimeException("xxxx");
}
//AbstractMessageListenerContainer
protected void doExecuteListener(Session session, Message message) throws JMSException {
if (!isAcceptMessagesWhileStopping() && !isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn("Rejecting received message because of the listener container " +
"having been stopped in the meantime: " + message);
}
rollbackIfNecessary(session);
throw new MessageRejectedWhileStoppingException();
}
try {
invokeListener(session, message);
}
//JMSException,RuntimeException,Error 这3类异常会回滚。
catch (JMSException | RuntimeException | Error ex) {
//自动回滚
rollbackOnExceptionIfNecessary(session, ex);
throw ex;
}
//自动提交
commitIfNecessary(session, message);
}
其他的
Exception都会被包装成ListenerExecutionFailedException,它是JMSException的子类,所以所有异常都会导致回滚。