• Spring boot实现Activemq死信队列


    死信队列是什么

    当消息不能重投递或者消息过期,会被移到死信队列中,由管理员消费。

    可以进行以下操作:

    • delete:删除记录
    • retry:重新投递
    • copy: 复制到一个选择的队列中。
    • move:移动到一个选择的队列中。

    image-20221021142625060

    什么情况下消息会重投递

    消息重投递的常用场景:

    • 事务回滚
    • 事务提交前close
    • A client is using CLIENT_ACKNOWLEDGE on a Session and calls recover() on that Session.
    重新投递策略
    RedeliveryPolicy policy = connection.getRedeliveryPolicy();
    policy.setInitialRedeliveryDelay(500);
    policy.setBackOffMultiplier(2);
    policy.setUseExponentialBackOff(true);
    policy.setMaximumRedeliveries(2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    死信队列

    重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

    默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

    Spring整合

    Activemq服务端配置

    重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

    默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

    配置个性化死信队列。

    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue=">">
                    <deadLetterStrategy>
                      <individualDeadLetterStrategy queuePrefix="DLQ."
                      useQueueForQueueMessages="true"
                      processExpired="false"
                      processNonPersistent="false"/>
                    deadLetterStrategy>
                policyEntry>
            policyEntries>
        policyMap>
    destinationPolicy>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    参考:

    重投递策略投递策略 ,https://activemq.apache.org/per-destination-policies, https://activemq.apache.org/message-redelivery-and-dlq-handling

    可以单独设置重投递策略

        @Bean
        public ActiveMQConnectionFactoryCustomizer myCustomizer2() {
            return new ActiveMQConnectionFactoryCustomizer() {
    
                /**
                 * Customize the {@link ActiveMQConnectionFactory}.
                 *
                 * @param factory the factory to customize
                 */
                @Override
                public void customize(ActiveMQConnectionFactory factory) {
    
                    //设置了队列的 policy。
                    ActiveMQQueue q = new ActiveMQQueue("queue.dlq.test");
                    RedeliveryPolicy policy = new RedeliveryPolicy();
                    policy.setMaximumRedeliveries(3);
                    factory.getRedeliveryPolicyMap().put(q,policy);
    
                    System.out.println("Customizer 2");
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    触发重投递

    代码回滚

     @JmsListener(destination = "queue.dlq.test" ,id = "test")
        public void consume(String param, Session session) {
    
            try {
                System.out.println(session.getAcknowledgeMode());
                System.out.println(param);
                //回滚,则重投递
                session.rollback();
            }catch (Exception ex){
    
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    抛出异常自动回滚

    由于默认是开启事务的,因此抛出异常,会自动触发回滚。

    @JmsListener(destination = "queue.dlq.test2",id="test2")
        public void consume2(String param, Session session) {
            System.out.println(param);
            throw new RuntimeException("xxxx");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    //AbstractMessageListenerContainer
    protected void doExecuteListener(Session session, Message message) throws JMSException {
    		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
    			if (logger.isWarnEnabled()) {
    				logger.warn("Rejecting received message because of the listener container " +
    						"having been stopped in the meantime: " + message);
    			}
    			rollbackIfNecessary(session);
    			throw new MessageRejectedWhileStoppingException();
    		}
    
    		try {
    			invokeListener(session, message);
    		}
         //JMSException,RuntimeException,Error  这3类异常会回滚。
    		catch (JMSException | RuntimeException | Error ex) {
          //自动回滚
    			rollbackOnExceptionIfNecessary(session, ex);
    			throw ex;
    		}
         //自动提交
    		commitIfNecessary(session, message);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    其他的Exception 都会被包装成ListenerExecutionFailedException,它是JMSException的子类,所以所有异常都会导致回滚。

  • 相关阅读:
    C++【内存管理】
    修正TiKnob的指示箭头显示问题
    良/恶性乳腺肿瘤预测(逻辑回归分类器)
    Java基础面试题
    [SpringBoot] AOP-AspectJ 切面技术
    Flink 集群部署
    新手小白学Java|零基础入门笔记|原来学Java可以这么简单
    卷积网络知识--深度可分离卷积
    【Sentinel】Sentinel与gateway的限流算法
    无人艇轨迹跟踪的预设性能抗扰控制研究
  • 原文地址:https://blog.csdn.net/demon7552003/article/details/127464734