• RabbitMQ消息可靠性(二)-- 消费者消息确认


    一、消费者消息确认是什么?

    在这种机制下,消费者在接收到消息后,需要向 RabbitMQ 发送确认信息,告知 RabbitMQ 已经接收到该消息,并已经处理完毕。如果 RabbitMQ 没有接收到确认信息,则会将该消息重新加入队列,等待其他消费者继续处理。

    消费者消息确认机制能够保证消息不会因为消费者宕机或其他原因而丢失,从而保证了消息的可靠性和稳定性。

    RabbitMQ 支持两种消费者消息确认机制:自动确认和手动确认。在自动确认模式下,消费者在接收到消息后,RabbitMQ 会自动将该消息标记为已经确认。在手动确认模式下,消费者需要向 RabbitMQ 显式地发送确认信息,才能完成消息的确认。

    二、代码实现

    1.修改application.yml 配置

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. # RabbitMQ开启手动确认
    6. acknowledge-mode: manual

    而SpringAMQP则允许配置三种确认模式:

    1. manual:手动ack,需要在业务代码结束后,调用api发送ack。
    2. auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
    3. none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

    2.消费者确认

    生产者发送一笔需要消费的订单到Direct Exchange直连交换机

    1. @GetMapping("/sendDirectMessage")
    2. @ApiOperation(value = "sendDirectMessage")
    3. @ApiOperationSupport(order = 1)
    4. public String sendDirectMessage(@RequestParam String orderNo){
    5. //设置消息唯一ID
    6. String uniqueId = "MQ"+ DateUtils.dateTimeNow("yyyyMMddHHmmss")+ RandomUtil.randomNumbers(4);
    7. CorrelationData correlationData = new CorrelationData(uniqueId);
    8. log.info("------生产者发送消息,消息唯一id {},订单编号 {}-------",uniqueId,orderNo);
    9. rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting",orderNo,correlationData);
    10. return "ok";
    11. }

    下面是消费者的处理逻辑
    这里的消息序号是系统自动生成的,还需要注意的是,在手动确认模式下,如果消费者在处理消息时发生了异常或错误的时候

    需要确保将该消息重新加入队列或者删除队列之后将该信息保存至数据库中记录下来,否则该消息将被认为已经成功处理并确认。因此,在编写消费者代码时,需要谨慎处理异常情况,避免因为异常而导致消息丢失或重复处理等问题。

    1. /**
    2. * 消费者,用于消费队列信息
    3. */
    4. @Component
    5. @Slf4j
    6. public class DirectConsumer {
    7. @Resource
    8. RedisService redisService;
    9. @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
    10. public void process(Message message, Channel channel) {
    11. // 消息序号
    12. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    13. //取出消息唯一标识
    14. String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
    15. // 取出订单编码
    16. String orderNo = new String(message.getBody(), StandardCharsets.UTF_8);
    17. log.info("------消费者收到消息,消息唯一id {},订单编号 {}-------",messageId,orderNo);
    18. try {
    19. //消费者在消费消息之前,先去redis中查看消息状态是否已被消费
    20. if (redisService.setCacheMapIfAbsent("rabbit-tag", messageId, Boolean.FALSE)){
    21. //删除过期订单.......
    22. //消费完消息后,设置key的值为true
    23. redisService.setCacheMapValue("rabbit-tag", messageId, Boolean.TRUE);
    24. channel.basicAck(deliveryTag,false);
    25. log.info("------订单处理完毕,订单编号 {}--------", orderNo);
    26. }else {
    27. //如果从redis中获取消息的value是TRUE,表示已消费,直接发送确认信号,避免重复消费
    28. if (Boolean.TRUE.equals(redisService.getCacheMapValue("rabbit-tag",messageId))) {
    29. /**
    30. * TODO 手动确认消息
    31. * tag:消息序号
    32. * multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除
    33. */
    34. channel.basicAck(deliveryTag, false);
    35. log.info("--------订单已经被消费过了,订单编号 {}-------", orderNo);
    36. }
    37. }
    38. } catch (Exception e) {
    39. e.printStackTrace();
    40. try {
    41. /**
    42. * TODO 消费者消费消息异常,手动否认信息,将消息退回到队列中
    43. * tag:消息序号
    44. * multiple:消息的标识,是否确认多条,false只确认当前一个消息收到,true确认所有consumer获得的消息(成功消费,消息从队列中删除
    45. * requeue:是否要退回到队列
    46. */
    47. channel.basicNack(deliveryTag, true, false);
    48. redisService.setCacheMapValue("rabbit-tag", messageId, Boolean.FALSE);
    49. log.error("------------订单消费失败,已从队列删除.订单编号 {}, 原因 {}--------",orderNo, e.getMessage());
    50. } catch (IOException ex) {
    51. ex.printStackTrace();
    52. }
    53. }
    54. log.info("------消费者处理完毕-------");
    55. }
    56. }
  • 相关阅读:
    00后整顿职场!因面试时公司让填“紧急联系人”,觉得侵犯隐私,举报公司消防措施不合理,导致公司被停业整顿!...
    胆固醇-聚乙二醇-荧光素 Cholesterol-PEG-FITC Fluorescein-PEG-CLS概述
    vue路由守卫
    项目管理5大过程组对应47个过程的内容解析
    字典序问题
    用信号量实现进程互斥, 同步, 前驱关系
    Rust 错误处理
    Spring-依赖注入DI
    deckGL自定义图层学习笔记
    网站流量不是很多,还有这几十种途径让网站变现转化
  • 原文地址:https://blog.csdn.net/bbj12345678/article/details/132907988