• RabbitMQ消息确认机制-可靠抵达


    消息确认机制

    保证消息不丢失,可靠抵达,可以使用确认机制

    publisher confirmCallback 确认模式

    publisher returnCallback 未投递到queue 退回模式

    consumer ack 机制

    添加配置

    修改publisher服务中的application.yml文件

    1. spring:
    2. rabbitmq:
    3. publisher-confirms: true
    4. publisher-confirm-type: correlated
    5. publisher-returns: true
    6. template:
    7. mandatory: true

    说明:

    • publish-confirm: 开启publish-confirm功能,确认回调
    • publish-confirm-type:开启publisher-confirm,这里支持两种类型:
      • simple:同步等待confirm结果,直到超时
      • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    • template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息

    publish-confirm 确认回调

    作用:确保消息进入交换机

    发送消息

    1. package com.hdb.pingmoweb.order;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.junit.Test;
    4. import org.junit.runner.RunWith;
    5. import org.springframework.amqp.rabbit.connection.CorrelationData;
    6. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.boot.test.context.SpringBootTest;
    9. import org.springframework.test.context.junit4.SpringRunner;
    10. import java.util.UUID;
    11. @Slf4j
    12. @RunWith(SpringRunner.class)
    13. @SpringBootTest
    14. public class PingmowebOrderApplicationTests {
    15. @Test
    16. public void contextLoads() {
    17. }
    18. @Autowired
    19. private RabbitTemplate rabbitTemplate;
    20. @Test
    21. public void testSendMessageExchange() throws InterruptedException {
    22. // 1.消息体
    23. String message = "hello, spring amqp!";
    24. // 2.全局唯一的消息ID,需要封装到CorrelationData中
    25. CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    26. // 3.添加callback
    27. correlationData.getFuture().addCallback(
    28. result -> {
    29. if(result.isAck()){
    30. // 3.1.ack,消息成功
    31. log.info("消息发送成功, ID:{}", correlationData.getId());
    32. }else{
    33. // 3.2.nack,消息失败
    34. log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason());
    35. }
    36. },
    37. ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage())
    38. );
    39. // 4.发送消息
    40. rabbitTemplate.convertAndSend("itcast.direct", "red", message, correlationData);
    41. // 休眠一会儿,等待ack回执
    42. //Thread.sleep(2000);
    43. }
    44. }

    接收消息

    1. package com.hdb.pingmoweb.order.mq;
    2. import org.springframework.amqp.core.ExchangeTypes;
    3. import org.springframework.amqp.rabbit.annotation.Exchange;
    4. import org.springframework.amqp.rabbit.annotation.Queue;
    5. import org.springframework.amqp.rabbit.annotation.QueueBinding;
    6. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    7. import org.springframework.stereotype.Component;
    8. @Component
    9. public class SpringRabbitListener {
    10. @RabbitListener(bindings = @QueueBinding(
    11. value = @Queue(name = "direct.queue1"),
    12. exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    13. key = {"red","blue"}
    14. ))
    15. public void listenDirectQueue1(String msg){
    16. System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    17. }
    18. @RabbitListener(bindings = @QueueBinding(
    19. value = @Queue(name = "direct.queue2"),
    20. exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    21. key = {"red"}
    22. ))
    23. public void listenDirectQueue2(String msg){
    24. System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    25. }
    26. }

    publish-returns 消息进入队列回调

    配置RabbitTemplate,当routingKey找不到指定队列回调ReturanCallback

    1. package com.hdb.pingmoweb.order.mq;
    2. import lombok.extern.slf4j.Slf4j;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.context.annotation.Configuration;
    6. import javax.annotation.PostConstruct;
    7. @Slf4j
    8. @Configuration
    9. public class MyRabbitConfig {
    10. @Autowired
    11. RabbitTemplate rabbitTemplate;
    12. @PostConstruct
    13. public void initRabbitTemplate(){
    14. rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
    15. // 投递失败,记录日志
    16. log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
    17. replyCode, replyText, exchange, routingKey, message.toString());
    18. // 如果有业务需要,可以重发消息
    19. });
    20. }
    21. }

    消费者消息确认

    SpringAMQP则允许配置三种确认模式:

    manual:手动ack,需要在业务代码结束后,调用api发送ack。

    auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack

    none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

    • none 模式下,消息投递是不可靠的,可能丢失
    • auto 模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
    • manual 自己根据业务情况,判断什么时候该ack

    一般,我们都是使用默认的auto即可。

    auto ack

    修改consumer服务的application.yml文件,添加下面内容:

    1. spring:
    2. rabbitmq:
    3. listener:
    4. simple:
    5. acknowledge-mode: auto # 关闭ack

    消费者出现异常,消息不会被删除

    同时可以使用manual ack机制手动ack

    manual ack

    1. @RabbitListener(bindings = @QueueBinding(
    2. value = @Queue(name = "direct.queue2"),
    3. exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
    4. key = {"red"}
    5. ))
    6. public void listenDirectQueue2(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
    7. System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    8. if (msg.contains("success")) {
    9. // RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
    10. channel.basicAck(deliveryTag, false);
    11. } else {
    12. // 第三个参数true,表示这个消息会重新进入队列
    13. channel.basicNack(deliveryTag, false, true);
    14. }
    15. }
    • deliveryTag: 相当于消息的唯一标识,用于 mq 辨别是哪个消息被 ack/nak 了
    • channel: mq 和 consumer 之间的管道,通过它来 ack/nak
    • channel.basicAck(deliveryTag, false); 成功确认
    • channel.basicNack(deliveryTag, false, true); 失败返回队列
  • 相关阅读:
    【Python】sys.argv[ ]简单易懂
    抖音矩阵系统,抖音SEO源码,抖音矩阵系统定制。
    【CSP认证考试】202303-1:田地丈量解题思路+代码
    第一章:IDEA使用介绍
    java毕业设计调酒互动交流平台Mybatis+系统+数据库+调试部署
    (数据结构与算法)LeetCode刷题笔记2-0005:最长回文子串
    P1091 [NOIP2004 提高组] 合唱队形
    python、ruby、go、java写的端口扫描工具
    nginx去除serve请求头 aarch64
    GBase 8c 函数/存储过程参数(一)
  • 原文地址:https://blog.csdn.net/qq_29385297/article/details/127545272