• rabbitmq消息投递失败


    RabbitMQ 中,消息投递失败可能会发生在多个阶段,比如从生产者到交换机、从交换机到队列、从队列到消费者等。处理消息投递失败需要采取适当的措施来确保消息的可靠性和系统的健壮性。以下是处理不同阶段消息投递失败的方法:

    1. 从生产者到交换机的投递失败

    当生产者发送消息到交换机时,如果交换机不存在或者消息被交换机拒绝(比如 mandatory 参数设置为 true 而没有合适的队列),可以通过以下方式处理:

    使用 Confirm 模式

    生产者可以使用 RabbitMQ 的 Confirm 模式来确保消息成功发送到交换机。

    import com.rabbitmq.client.*;
    
    public class Producer {
        private final static String EXCHANGE_NAME = "example_exchange";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
                channel.confirmSelect(); // Enable publisher confirmations
    
                String message = "Hello World!";
                channel.basicPublish(EXCHANGE_NAME, "routing_key", null, message.getBytes("UTF-8"));
    
                if (channel.waitForConfirms()) {
                    System.out.println(" [x] Message sent successfully");
                } else {
                    System.out.println(" [x] Message delivery failed");
                }
            }
        }
    }
    

    2. 从交换机到队列的投递失败

    如果消息发送到交换机后没有合适的队列绑定,可以使用 mandatory 参数和 ReturnListener 来处理未路由的消息。

    import com.rabbitmq.client.*;
    
    public class Producer {
        private final static String EXCHANGE_NAME = "example_exchange";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
    
                channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                    String message = new String(body, "UTF-8");
                    System.out.println(" [x] Message returned: " + message);
                    // 处理未路由的消息
                });
    
                String message = "Hello World!";
                channel.basicPublish(EXCHANGE_NAME, "invalid_routing_key", true, null, message.getBytes("UTF-8"));
            }
        }
    }
    

    3. 从队列到消费者的投递失败

    在消息从队列投递到消费者时,如果消费者无法处理消息,消费者可以使用 NackReject 来处理失败的消息。

    使用手动消息确认

    消费者可以手动确认消息,如果处理失败,可以选择 Nack 消息并重新入队,或者 Reject 消息并将其投递到死信队列(Dead Letter Queue)。

    import com.rabbitmq.client.*;
    
    public class Consumer {
        private final static String QUEUE_NAME = "task_queue";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
                channel.basicQos(1); // Fair dispatch
    
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
    
                    try {
                        // 处理消息
                        processMessage(message);
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    } catch (Exception e) {
                        // 处理失败,重新入队或投递到死信队列
                        channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                    }
                };
    
                boolean autoAck = false; // Explicitly acknowledge message
                channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
            }
        }
    
        private static void processMessage(String message) throws Exception {
            // 消息处理逻辑
            if (message.contains("error")) {
                throw new Exception("Processing error");
            }
            System.out.println(" [x] Processed '" + message + "'");
        }
    }
    

    4. 使用死信队列(DLQ)

    配置死信队列,当消息在队列中被拒绝、过期或者达到最大重试次数时,将其投递到死信队列以便后续处理。

    配置示例
    import com.rabbitmq.client.*;
    
    import java.util.HashMap;
    import java.util.Map;
    
    public class DLQExample {
    
        private static final String MAIN_QUEUE = "main_queue";
        private static final String DLQ_QUEUE = "dlq_queue";
        private static final String EXCHANGE_NAME = "exchange";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
    
                // Declare DLQ
                channel.queueDeclare(DLQ_QUEUE, true, false, false, null);
    
                // Declare main queue with DLQ settings
                Map<String, Object> argsMap = new HashMap<>();
                argsMap.put("x-dead-letter-exchange", "");
                argsMap.put("x-dead-letter-routing-key", DLQ_QUEUE);
                channel.queueDeclare(MAIN_QUEUE, true, false, false, argsMap);
    
                // Declare exchange and bind main queue
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                channel.queueBind(MAIN_QUEUE, EXCHANGE_NAME, "routing_key");
    
                String message = "Test Message";
                channel.basicPublish(EXCHANGE_NAME, "routing_key", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    

    总结

    为了处理 RabbitMQ 中的消息投递失败,应该综合使用以下策略:

    1. 使用 Confirm 模式和 ReturnListener 确保消息从生产者正确发送到交换机和队列。
    2. 使用手动消息确认机制处理消费者无法处理的消息。
    3. 配置死信队列处理无法处理或过期的消息。
    4. 确保消息处理逻辑具有幂等性,以防止重复处理导致的数据不一致。

    通过这些方法,可以提高消息系统的可靠性和健壮性,确保消息不会丢失或重复处理。

  • 相关阅读:
    前端框架 网络请求 Fetch Axios
    2.一步一步教你使用pycharm运行起第一个Django项目
    zabbix监控系统
    C# 中结构体的复制
    【上传图片,文件,视频功能合集】vue-elementul简单实现上传文件,上传图片,上传视频功能【详细注释,简单易用】
    基于JavaSwing开发植物连连看游戏 课程设计 大作业源码
    upload-labs 16/17关
    Monaco-Editor 多人协作 编辑器
    Python进阶教程:pandas数据分析实践示例总结
    【systemd】简单的服务创建练习
  • 原文地址:https://blog.csdn.net/Casual_Lei/article/details/140429639