• RabbitMQ的Confirm机制


    1.消息的可靠性

    RabbitMQ提供了Confirm的确认机制。

    Confirm机制用于确认消息是否已经发送给了交换机

    2.Java的实现 

    1.导入依赖

    1. <dependency>
    2. <groupId>com.rabbitmqgroupId>
    3. <artifactId>amqp-clientartifactId>
    4. <version>5.6.0version>
    5. dependency>

    2.Confirm机制的生产者

    1. package com.qf.mq2302.hello;
    2. import com.qf.mq2302.utils.MQUtils;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. public class Send {
    6. //声明队列名字
    7. public static final String QUEUE_NAME="queueA";
    8. public static void main(String[] args) throws Exception {
    9. //1.获取连接对象
    10. Connection conn = MQUtils.getConnection();
    11. //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
    12. Channel channel = conn.createChannel();
    13. //3 开启confirm
    14. channel.confirmSelect();
    15. //3.声明了一个队列
    16. /**
    17. * queue – the name of the queue
    18. * durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)
    19. * exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)
    20. * autoDelete – 该队列是否可以被mq服务器自动删除
    21. * arguments – 队列的其他参数,可以为null
    22. */
    23. // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    24. String message = "Hello doubleasdasda!";
    25. //生产者如何发送消息,使用下面的方法即可
    26. /**
    27. * exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
    28. * routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
    29. * other properties - 消息的其他属性,可以为null
    30. * body – 消息的内容,注意,要是有 字节数组
    31. */
    32. channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
    33. System.out.println(" [x] Sent '" + message + "'");
    34. //检查消息是否发送成功了
    35. try {
    36. /**
    37. * 判断是否发送到交换机上,如果发送到了返回true,
    38. * 如果因为交换机名字错了,发送不到交换机,则会抛出异常,会自动关闭channel
    39. */
    40. if (channel.waitForConfirms()) {
    41. //如果返回true,代表交换机成功接收到了消息
    42. System.out.println("消息已经成功发送给了交换机");
    43. //关闭资源
    44. channel.close();
    45. }else {
    46. System.out.println("消息发送给交换机失败了");
    47. //关闭资源
    48. channel.close();
    49. }
    50. } catch (InterruptedException e) {
    51. System.out.println("消息发送给交换机失败了");
    52. System.out.println("失败的消息为:"+message);
    53. }
    54. conn.close();
    55. }
    56. }

    3.confirm 机制的消费者

     

    1. package com.qf.mq2302.hello;
    2. import com.qf.mq2302.utils.MQUtils;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.DeliverCallback;
    6. import com.rabbitmq.client.Delivery;
    7. import java.io.IOException;
    8. public class Recv {
    9. private final static String QUEUE_NAME="hello-queue";
    10. public static void main(String[] args) throws Exception {
    11. //1.获取连接对象
    12. Connection conn = MQUtils.getConnection();
    13. //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
    14. Channel channel = conn.createChannel();
    15. /**
    16. * 第一个参数队列名称
    17. * 第二个参数,耐用性
    18. * 第三个参数排外性
    19. * 第四个参数是否自动删除
    20. * 第五个参数,可以定义什么类型的队列
    21. */
    22. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    23. //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
    24. DeliverCallback deliverCallback =new DeliverCallback() {
    25. @Override
    26. public void handle(String consumerTag, Delivery message) throws IOException {
    27. System.out.println(consumerTag);
    28. //从Delivery对象中可以获取到生产者,发送的消息的字节数组
    29. byte[] body = message.getBody();
    30. String msg = new String(body, "utf-8");
    31. //在这里写消费者的业务逻辑,例如,发送邮件
    32. System.out.println(msg);
    33. }
    34. };
    35. //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
    36. /**
    37. * queue – the name of the queue
    38. * autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
    39. * deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
    40. * cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
    41. */
    42. channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});
    43. }
    44. }

    3.整合springboot实现

    1.导入依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>

    2.yml配置文件

    1. spring:
    2. rabbitmq:
    3. host: 8.140.244.227
    4. port: 6786
    5. username: test
    6. password: test
    7. virtual-host: /test
    8. publisher-confirm-type: correlated #在springboot 项目下开启生产者的confirm机制

    3.RabbitMQ配置文件

    1. package com.qf.bootmq2302.config;
    2. import org.springframework.amqp.core.Message;
    3. import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    4. import org.springframework.amqp.rabbit.connection.CorrelationData;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. @Configuration
    9. public class RabbitConfig {
    10. @Bean
    11. public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
    12. RabbitTemplate rabbitTemplate = new RabbitTemplate();
    13. //设置连接工厂对象
    14. rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
    15. rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    16. @Override
    17. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    18. System.out.println("correlationData:"+correlationData.getId());
    19. System.out.println("correlationData:"+new String(correlationData.getReturnedMessage().getBody()));
    20. //通过id可以去redis 里取 value消息
    21. //代表消息是否发送给交换机成功,发送失败false ,发送成功 true
    22. System.out.println("ack:"+ack);
    23. //代表错误的原因
    24. System.out.println("cause:"+cause);
    25. }
    26. });
    27. return rabbitTemplate;
    28. }
    29. }

    4.生产者写一个Controller

    1. @Autowired
    2. RabbitTemplate rabbitTemplate;
    3. @GetMapping("/test1")
    4. public String test1(String msg,String routkey){
    5. System.out.println(msg);
    6. String exchangeName = "";//默认交换机
    7. String routingkey = routkey;//队列名字
    8. //创建一个 CorrelationData 对象
    9. CorrelationData correlationData = new CorrelationData();
    10. correlationData.setId("001");
    11. Message message = new Message(msg.getBytes(), null);
    12. correlationData.setReturnedMessage(message);
    13. //要把消息的内容和消息的编号 存放到redis中, key=消息编号,value=消息内容
    14. //key = bootmq:failmessage:001
    15. //生产者发送消息
    16. //第四个参数,可以携带自定义的correlationData
    17. rabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData);
    18. return "ok";
    19. }

    5.消费者写一个接收队列消息

    1. @RabbitListener(queues = "queueA")
    2. public void getMsg1(Map data, Channel channel,Message message) throws IOException {
    3. System.out.println(data);
    4. //手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条
    5. channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    6. }

    6.消费者的配置文件

    1. spring:
    2. rabbitmq:
    3. host: 8.140.244.227
    4. port: 6786
    5. username: test
    6. password: test
    7. virtual-host: /test
    8. #手动ACK
    9. listener:
    10. simple:
    11. acknowledge-mode: manual # 手动ack
    12. prefetch: 1 #等价于basicQos(1)

  • 相关阅读:
    锂热电池检测设备 你一定没见过这种检测方式!
    Linux基础操作命令详解
    共享购模式:数据驱动的消费增值新体验
    强大的JTAG边界扫描(1):基本原理介绍
    神经元模型图手工制作,神经元模型图手工模型
    漏洞复现----48、Airflow dag中的命令注入(CVE-2020-11978)
    node js 快速构建部署 Wiki 风格的文档网站
    使用 FHE 实现加密大语言模型
    springboot缓存
    Zookeeper从入门到精通
  • 原文地址:https://blog.csdn.net/qq_53374893/article/details/132767156