• Spring boot实现Activemq死信队列


    死信队列是什么

    当消息不能重投递或者消息过期,会被移到死信队列中,由管理员消费。

    可以进行以下操作:

    • delete:删除记录
    • retry:重新投递
    • copy: 复制到一个选择的队列中。
    • move:移动到一个选择的队列中。

    image-20221021142625060

    什么情况下消息会重投递

    消息重投递的常用场景:

    • 事务回滚
    • 事务提交前close
    • A client is using CLIENT_ACKNOWLEDGE on a Session and calls recover() on that Session.
    重新投递策略
    RedeliveryPolicy policy = connection.getRedeliveryPolicy();
    policy.setInitialRedeliveryDelay(500);
    policy.setBackOffMultiplier(2);
    policy.setUseExponentialBackOff(true);
    policy.setMaximumRedeliveries(2);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    死信队列

    重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

    默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

    Spring整合

    Activemq服务端配置

    重新投递 次数超过 MaximumRedeliveries ,则会进入死信队列。

    默认情况,有一个死信队列:AcitveMQ.DLQ,所有的消息都投递到此队列,包括过期消息,重投递失败消息。

    配置个性化死信队列。

    <destinationPolicy>
        <policyMap>
            <policyEntries>
                <policyEntry queue=">">
                    <deadLetterStrategy>
                      <individualDeadLetterStrategy queuePrefix="DLQ."
                      useQueueForQueueMessages="true"
                      processExpired="false"
                      processNonPersistent="false"/>
                    deadLetterStrategy>
                policyEntry>
            policyEntries>
        policyMap>
    destinationPolicy>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    参考:

    重投递策略投递策略 ,https://activemq.apache.org/per-destination-policies, https://activemq.apache.org/message-redelivery-and-dlq-handling

    可以单独设置重投递策略

        @Bean
        public ActiveMQConnectionFactoryCustomizer myCustomizer2() {
            return new ActiveMQConnectionFactoryCustomizer() {
    
                /**
                 * Customize the {@link ActiveMQConnectionFactory}.
                 *
                 * @param factory the factory to customize
                 */
                @Override
                public void customize(ActiveMQConnectionFactory factory) {
    
                    //设置了队列的 policy。
                    ActiveMQQueue q = new ActiveMQQueue("queue.dlq.test");
                    RedeliveryPolicy policy = new RedeliveryPolicy();
                    policy.setMaximumRedeliveries(3);
                    factory.getRedeliveryPolicyMap().put(q,policy);
    
                    System.out.println("Customizer 2");
                }
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    触发重投递

    代码回滚

     @JmsListener(destination = "queue.dlq.test" ,id = "test")
        public void consume(String param, Session session) {
    
            try {
                System.out.println(session.getAcknowledgeMode());
                System.out.println(param);
                //回滚,则重投递
                session.rollback();
            }catch (Exception ex){
    
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    抛出异常自动回滚

    由于默认是开启事务的,因此抛出异常,会自动触发回滚。

    @JmsListener(destination = "queue.dlq.test2",id="test2")
        public void consume2(String param, Session session) {
            System.out.println(param);
            throw new RuntimeException("xxxx");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    //AbstractMessageListenerContainer
    protected void doExecuteListener(Session session, Message message) throws JMSException {
    		if (!isAcceptMessagesWhileStopping() && !isRunning()) {
    			if (logger.isWarnEnabled()) {
    				logger.warn("Rejecting received message because of the listener container " +
    						"having been stopped in the meantime: " + message);
    			}
    			rollbackIfNecessary(session);
    			throw new MessageRejectedWhileStoppingException();
    		}
    
    		try {
    			invokeListener(session, message);
    		}
         //JMSException,RuntimeException,Error  这3类异常会回滚。
    		catch (JMSException | RuntimeException | Error ex) {
          //自动回滚
    			rollbackOnExceptionIfNecessary(session, ex);
    			throw ex;
    		}
         //自动提交
    		commitIfNecessary(session, message);
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    其他的Exception 都会被包装成ListenerExecutionFailedException,它是JMSException的子类,所以所有异常都会导致回滚。

  • 相关阅读:
    docker常见命令
    44--Django-项目实战-全栈开发-基于django+drf+vue+elementUI企业级项目开发流程-支付宝二次封装、支付成功页面以及后台设计
    一种基于区块链的物联网架构设计
    因斯布鲁克大学团队量子计算硬件突破了二进制
    ORB-SLAM2从理论到代码实现(十五):KeyFrameDatabase类
    std::format 如何实现编译期格式检查
    卖出看涨期权的例子:Selling a Call Option
    Mysql里常见的问题
    用最清爽的方式开发dotNet
    excel的vlookup函数用法
  • 原文地址:https://blog.csdn.net/demon7552003/article/details/127464734