RabbitMQ提供了Confirm的确认机制。
Confirm机制用于确认消息是否已经发送给了交换机
- <dependency>
- <groupId>com.rabbitmqgroupId>
- <artifactId>amqp-clientartifactId>
- <version>5.6.0version>
- dependency>
- package com.qf.mq2302.hello;
-
- import com.qf.mq2302.utils.MQUtils;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
-
- public class Send {
- //声明队列名字
- public static final String QUEUE_NAME="queueA";
-
- public static void main(String[] args) throws Exception {
-
- //1.获取连接对象
- Connection conn = MQUtils.getConnection();
-
- //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
- Channel channel = conn.createChannel();
-
- //3 开启confirm
- channel.confirmSelect();
- //3.声明了一个队列
- /**
- * queue – the name of the queue
- * durable – true代表创建的队列是持久化的(当mq重启后,该队列依然存在)
- * exclusive – 该队列是不是排他的 (该对立是否只能由当前创建该队列的连接使用)
- * autoDelete – 该队列是否可以被mq服务器自动删除
- * arguments – 队列的其他参数,可以为null
- */
- // channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello doubleasdasda!";
-
- //生产者如何发送消息,使用下面的方法即可
- /**
- * exchange – 交换机的名字 ,如果是空串,说明是把消息发给了默认交换机
- * routingKey – 路由的key,当发送消息给默认交换机时,routingkey代表队列的名字
- * other properties - 消息的其他属性,可以为null
- * body – 消息的内容,注意,要是有 字节数组
- */
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
-
- //检查消息是否发送成功了
- try {
- /**
- * 判断是否发送到交换机上,如果发送到了返回true,
- * 如果因为交换机名字错了,发送不到交换机,则会抛出异常,会自动关闭channel
- */
- if (channel.waitForConfirms()) {
- //如果返回true,代表交换机成功接收到了消息
- System.out.println("消息已经成功发送给了交换机");
- //关闭资源
- channel.close();
- }else {
- System.out.println("消息发送给交换机失败了");
- //关闭资源
- channel.close();
- }
- } catch (InterruptedException e) {
- System.out.println("消息发送给交换机失败了");
- System.out.println("失败的消息为:"+message);
- }
-
-
-
- conn.close();
- }
- }
- package com.qf.mq2302.hello;
-
- import com.qf.mq2302.utils.MQUtils;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.DeliverCallback;
- import com.rabbitmq.client.Delivery;
-
- import java.io.IOException;
-
- public class Recv {
- private final static String QUEUE_NAME="hello-queue";
-
- public static void main(String[] args) throws Exception {
- //1.获取连接对象
- Connection conn = MQUtils.getConnection();
-
- //2. 创建一个channel对象,对于MQ的大部分操作,都定义在了channel对象上
- Channel channel = conn.createChannel();
-
- /**
- * 第一个参数队列名称
- * 第二个参数,耐用性
- * 第三个参数排外性
- * 第四个参数是否自动删除
- * 第五个参数,可以定义什么类型的队列
- */
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- //3.该消费者收到消息之后的处理逻辑,写在DeliverCallback对象中
- DeliverCallback deliverCallback =new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery message) throws IOException {
- System.out.println(consumerTag);
- //从Delivery对象中可以获取到生产者,发送的消息的字节数组
- byte[] body = message.getBody();
- String msg = new String(body, "utf-8");
-
- //在这里写消费者的业务逻辑,例如,发送邮件
- System.out.println(msg);
-
- }
- };
-
- //4.让当前消费者开始消费(QUEUE_NAME)队列中的消息
- /**
- * queue – the name of the queue
- * autoAck – true 代表当前消费者是不是自动确认模式。true代表自动确认。
- * deliverCallback – 当有消息发送给该消费者时,消费者如何处理消息的逻辑
- * cancelCallback – 当消费者被取消掉时,如果要执行代码,写到这里
- */
- channel.basicConsume(QUEUE_NAME,true,deliverCallback,consumerTag -> {});
-
-
-
-
-
- }
-
-
-
-
-
- }
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- spring:
- rabbitmq:
- host: 8.140.244.227
- port: 6786
- username: test
- password: test
- virtual-host: /test
- publisher-confirm-type: correlated #在springboot 项目下开启生产者的confirm机制
- package com.qf.bootmq2302.config;
-
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class RabbitConfig {
-
-
- @Bean
- public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory){
- RabbitTemplate rabbitTemplate = new RabbitTemplate();
-
- //设置连接工厂对象
- rabbitTemplate.setConnectionFactory(cachingConnectionFactory);
-
-
- rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- System.out.println("correlationData:"+correlationData.getId());
- System.out.println("correlationData:"+new String(correlationData.getReturnedMessage().getBody()));
-
- //通过id可以去redis 里取 value消息
-
-
- //代表消息是否发送给交换机成功,发送失败false ,发送成功 true
- System.out.println("ack:"+ack);
- //代表错误的原因
- System.out.println("cause:"+cause);
- }
- });
-
- return rabbitTemplate;
- }
-
-
-
- }
- @Autowired
- RabbitTemplate rabbitTemplate;
- @GetMapping("/test1")
- public String test1(String msg,String routkey){
- System.out.println(msg);
- String exchangeName = "";//默认交换机
- String routingkey = routkey;//队列名字
-
- //创建一个 CorrelationData 对象
- CorrelationData correlationData = new CorrelationData();
- correlationData.setId("001");
-
- Message message = new Message(msg.getBytes(), null);
- correlationData.setReturnedMessage(message);
-
- //要把消息的内容和消息的编号 存放到redis中, key=消息编号,value=消息内容
- //key = bootmq:failmessage:001
-
- //生产者发送消息
- //第四个参数,可以携带自定义的correlationData
- rabbitTemplate.convertAndSend(exchangeName,routingkey,msg,correlationData);
- return "ok";
- }
- @RabbitListener(queues = "queueA")
- public void getMsg1(Map
data, Channel channel,Message message) throws IOException { -
-
- System.out.println(data);
-
- //手动ack//若开启手动ack,不给手动ack,就按照 prefetch: 1 #等价于basicQos(1)的量,就这么多,不会多给你了,因为你没有确认。确认一条,就给你一条
- channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
-
- }
- spring:
- rabbitmq:
- host: 8.140.244.227
- port: 6786
- username: test
- password: test
- virtual-host: /test
- #手动ACK
- listener:
- simple:
- acknowledge-mode: manual # 手动ack
- prefetch: 1 #等价于basicQos(1)