• RabbitMQ_消息确认机制


    消息确认机制分为消息发送确认机制消息消费确认机制

    消息发送确认机制

    消息发送确认机制:消息由producer发送后,确认其是否到达broker,又是否被exchange转发至对应queue的机制

    该机制分为两部分:producer---broker,exchange---queue

    前者的实现依靠ConfirmCallback机制,后者的实现依靠ReturrnsCallback机制

    ConfirmCallback

    实现ConfirmCallback接口,并重写confirm方法

    confirm方法参数含义:

    correlationData:CorrelationData类只有一个 id 属性 用于唯一标识该消息
    public CorrelationData() {
        this.id = UUID.randomUUID().toString();
    }
    
    ack:消息是否成功传输到 broker (true表示成功传输 false表示传输失败)
    
    cause:传输失败的原因

    当消息传输至broker后就会触发ConfirmCallback回调,无论传输是否成功,可根据传输的结果进行后续处理

    1. @Component
    2. // ConfirmCallback 用于确认消息是否到达 broker(rabbitmq服务器)
    3. // 实现 ConfirmCallback 接口 重写confirm()方法
    4. public class ConfirmCallbackComponent implements RabbitTemplate.ConfirmCallback {
    5. /*
    6. correlationData:CorrelationData类只有一个 id 属性 用于唯一标识该消息
    7. public CorrelationData() {
    8. this.id = UUID.randomUUID().toString();
    9. }
    10. ack:消息是否成功传输到 broker (true表示成功传输 false表示传输失败)
    11. cause:传输失败的原因
    12. */
    13. @Override
    14. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    15. if (!ack) {
    16. System.out.println("消息发送异常");
    17. } else {
    18. System.out.println("消息发送成功" + " correlationData=" + correlationData.getId() + " ack=" + ack + " cause=" + cause);
    19. }
    20. }
    21. }

    ReturrnsCallback

    实现ReturnsCallback接口,并重写returnedMessage方法

    当消息转发失败后就会触发ReturrnsCallback,会将消息返回给生产者,同时会返回与消息转发失败的相关信息(包含在参数returned内),可对此采取后续处理

    1. @Component
    2. // 实现ReturnCallback接口 重写returnedMessage()方法
    3. public class ReturnsCallbackComponent implements RabbitTemplate.ReturnsCallback {
    4. @Override
    5. public void returnedMessage(ReturnedMessage returned) {
    6. System.out.println("ReturnCallback: replyCode=" + returned.getReplyCode() + " replyText=" + returned.getReplyText() + " message= " + returned.getMessage() + " exchange=" + returned.getExchange() + " routingKey=" + returned.getRoutingKey());
    7. }
    8. }

    配置文件

    注:生产者端配置文件

    ConfirmCallback

    1. publisher-confirm-type: correlated
    2. #NONE:
    3. #禁用发布确认 是默认值。
    4. #CORRELATED:
    5. #发布消息后 交换机会触发回调方法。
    6. #SIMPLE:
    7. #有两种效果:
    8. #1:和CORRELATED一样会触发回调方法
    9. #2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果
    10. #根据返回结果来判定下一步的逻辑: waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel 则接下来无法发送消息到 broker

    ReturnsCallback

    1. template:
    2. mandatory: true # 设置当交换机分发消息失败时 将消息返回至生产者(否则直接丢弃)
    3. publisher-returns: true # 允许消息返回至生产者

    消息消费确认机制

    生产者Service

    此处需要调用ConfirmCallback接口与ReturnsCallback接口的实现类实例

    1. @Service
    2. public class WorkService {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. @Autowired
    6. private ConfirmCallbackComponent confirmCallbackComponent;
    7. @Autowired
    8. private ReturnsCallbackComponent returnsCallbackComponent;
    9. public void sendMessage(String exchange, String routingKey, Object msg) {
    10. // 消息被手动ack时的处理
    11. rabbitTemplate.setConfirmCallback(confirmCallbackComponent);
    12. // 消息重返队列时的处理
    13. rabbitTemplate.setReturnsCallback(returnsCallbackComponent);
    14. // 发送消息
    15. rabbitTemplate.convertAndSend(exchange, routingKey, msg,
    16. // 是否持久化消息
    17. message -> {
    18. message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
    19. return message;
    20. },
    21. // 实现ConfirmCallback接口 重写其confirm方法时 作为confirm方法的参数
    22. new CorrelationData(UUID.randomUUID().toString()));
    23. }
    24. }

    消费者Service

    deliveryTag:表示消息投递序号 接收消息后deliveryTag++
    手动确认模式下 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作
    
    multiple:是否批量确认消息 值为true则会一次性ack所有小于当前消息deliveryTag的消息
    举个栗子:
            假设已发送三条消息 deliveryTag分别是1、2、3 但均未被确认
            此时发送第四条消息 其deliveryTag为4 且该消息被确认
            若multiple被设置为true 则会将1、2、3、4的消息全部进行确认
    
    requeue:消息是否重入队列 true为重入
    
    方法参数: 
    basicAck: deliveryTag multiple
    basicReject: deliveryTag requeue
    basicNack: deliveryTag multiple requeue
    
    1. @Service
    2. @RabbitListener(queues = "work_confirm_queue")
    3. public class WorkerService {
    4. @RabbitHandler
    5. public void workerMessage(String msg, Channel channel, Message message) throws IOException {
    6. /*
    7. deliveryTag:表示消息投递序号 接收消息后deliveryTag++
    8. 手动确认模式下 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作
    9. multiple:是否批量确认消息 值为true则会一次性ack所有小于当前消息deliveryTag的消息
    10. 举个栗子:
    11. 假设已发送三条消息 deliveryTag分别是1、2、3 但均未被确认
    12. 此时发送第四条消息 其deliveryTag为4 且该消息被确认
    13. 若multiple被设置为true 则会将1、2、3、4的消息全部进行确认
    14. requeue:消息是否重入队列 true为重入
    15. 方法参数:
    16. basicAck: deliveryTag multiple
    17. basicReject: deliveryTag requeue
    18. basicNack: deliveryTag multiple requeue
    19. */
    20. try {
    21. System.out.println("worker收到消息: " + msg);
    22. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息
    23. } catch (Exception e) {
    24. // 判断消息是否已重返过队列
    25. if (message.getMessageProperties().getRedelivered()) {
    26. System.out.println("worker再次接收消息失败 队列拒绝消息的重返 " + msg);
    27. channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息重返
    28. } else {
    29. System.out.println("worker接收消息失败 消息将返回队列 " + msg);
    30. channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 消息重入队列
    31. }
    32. }
    33. }
    34. }

    配置文件

    注:消费者端配置文件

    1. listener:
    2. simple:
    3. prefetch: 1 # 消费者一次性可以消费的最大消息数
    4. acknowledge-mode: manual # 开启手动应答
    5. # none 一律视为应答
    6. # manual 手动应答
    7. # auto 自动应答(与none区别在于有应答条件)
    8. retry:
    9. enabled: true # 开启重试
    10. max-attempts: 10 # 最大重试数(若使用try-catch 则该设置失效)
    11. initial-interval: 1000ms # 重试间隔

    测试

    关于sleep方法:单元测试运行完毕后即关闭,而调用方法与进行通信需要时间,为了确保能收到消费者端的应答,需要保证信道处于开启状态,故sleep

    1. @Autowired
    2. WorkService workService;
    3. @Test
    4. void workQueuesOrders() throws InterruptedException {
    5. workService.sendMessage("", "work_confirm_queue", "hello");
    6. TimeUnit.SECONDS.sleep(5);
    7. }
  • 相关阅读:
    Promise
    自动化测试-友好的第三方库
    腾讯Mini项目课程前置学习笔记(第一轮)
    多进程间通信学习之无名管道
    ChatGPT,AIGC 数据库应用 Mysql 常见优化30例
    Compositional Minimax Optimization学习之路
    Java开发面试--nacos专区
    一起Talk Android吧(第四百零六回:管理画布canvas)
    新课程标准培养学生“高考物理关键能力”的实践研究课题文献综述
    【C语言程序设计】实验 3
  • 原文地址:https://blog.csdn.net/Mudrock__/article/details/128150973