RabbitMQ的消息确认有两种。 一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
org.springframework.boot
spring-boot-starter-amqp
spring:
rabbitmq:
#连接地址
host: 192.168.56.10
#端口号
port: 5672
#账号
username: guest
#密码
password: guest
#虚拟主机
virtual-host: /
#开启发送端消息抵达broker的确认
publisher-confirm-type: correlated
#开启发送端消息抵达队列的确认
publisher-returns: true
template:
#mandatory 默认为FALSE,指定消息在没有被队列接收时是否强行退回还是直接丢弃,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,会直接将消息扔掉
mandatory: true
listener:
simple:
# 设置消费端手动 ack
acknowledge-mode: manual
package com.itheima.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author: XuXin
* @date: 2023/9/26
*/
@Configuration
public class MessageConverterConfig {
/**
* 使用json序列化机制,进行消息转换
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
发送消息确认:用来确认生产者 producer 将消息发送到 broker ,broker 上的交换机 exchange 再投递给队列 queue的过程中,消息是否成功投递。
消息从 producer 到 rabbitmq broker有一个 confirmCallback 确认模式。
消息从 exchange 到 queue 投递失败有一个 returnCallback 退回模式。
我们可以利用这两个Callback来确保消息的100%送达。
package com.itheima.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @author: XuXin
* @date: 2023/9/22
*/
@Slf4j
@Component
public class ConfirmCallback implements RabbitTemplate.ConfirmCallback {
/**
* 不管消息是否成功到达交换机都会被调用
*
* @param correlationData 当前消息的唯一关联数据(这个是消息的唯一id)
* @param ack 消息是否成功收到 只要消息抵达broker就ack=true
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info(" 回调id:" + correlationData);
if (ack) {
log.info("消息成功发送");
} else {
log.info("消息发送失败:" + cause);
}
}
}
实现接口 ConfirmCallback ,重写其confirm()方法,方法内有三个参数correlationData、ack、cause。
correlationData:对象内部只有一个 id 属性,用来表示当前消息的唯一性。
ack:消息投递到broker 的状态,true表示成功。
cause:表示投递失败的原因。
但消息被 broker 接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标 queue 里。所以接下来需要用到 returnCallback。
package com.itheima.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
/**
* @author: XuXin
* @date: 2023/9/25
*/
@Slf4j
@Component
public class ReturnCallback implements RabbitTemplate.ReturnsCallback {
/**
* @param re 只有在交换机到达队列失败的时候才会被触发,当这个回调函数被调用的时候说明交换机的消息没有顺利的到达队列
* message 投递失败的消息详细信息
* replyCode 回复的状态码
* replyText回复的文本内容
* exchange 当时这个消息发给哪个交换机
* routingKey 当时这个消息发给哪个路由键
*/
@Override
public void returnedMessage(ReturnedMessage re) {
log.info("Returned: " + re.getMessage() + "\nreplyCode: " + re.getReplyCode()
+ "\nreplyText: " + re.getReplyText() + "\nexchange: "
+ re.getExchange() + "\nroutingKey: " + re.getRoutingKey());
}
}
实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。
在rabbitTemplate中设置 Confirm 和 Return 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id 为10000000000。
package com.itheima.config;
import com.itheima.producer.ConfirmCallback;
import com.itheima.producer.ReturnCallback;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
/**
* @author xx
* @date 2023/9/22
* springboot自动读取yml文件自动配置,这里可删
* 定义完成后 rabbitmq服务器会自动创建交换机和队列以及绑定关系
* 在Spring启动时,利用Spring Bean管理工厂BeanFactory接口,实现动态创建交换机、队列、交换机和队列的绑定关系,让我们无需进行重复的编码工作。
*/
@Slf4j
@Configuration
public class RabbitMQConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallback confirmCallback;
@Autowired
private ReturnCallback returnCallback;
/**
* 队列
*/
public static final String QUEUE1 = "atguigu";
public static final String QUEUE2 = "atguigu.news";
public static final String QUEUE3 = "atguigu.emps";
public static final String QUEUE4 = "gulixueyuan.news";
/**
* 定义交换机名称
*/
public static final String EXCHANGE_DIRECT_NAME = "exchange.direct";
public static final String EXCHANGE_FANOUT_NAME = "exchange.fanout";
public static final String EXCHANGE_TOPIC_NAME = "exchange.topic";
/**
* 设置路由key
* #匹配0个或多个单词,*匹配一个单词
*/
public static final String ROUTINGKEY1 = "atguigu";
public static final String ROUTINGKEY2 = "atguigu.news";
public static final String ROUTINGKEY3 = "atguigu.emps";
public static final String ROUTINGKEY4 = "gulixueyuan.news";
public static final String ROUTINGKEY5 = "atguigu.#";
public static final String ROUTINGKEY6 = "*.news";
/**
* 定义 直连交换机
*/
@Bean("directExchange")
public DirectExchange directExchange() {
//参数 交换机名称
return new DirectExchange(EXCHANGE_DIRECT_NAME, true, false);
}
/**
* 定义 扇形交换机
*/
@Bean("fanoutExchange")
public FanoutExchange fanoutExchange() {
//参数 交换机名称
return new FanoutExchange(EXCHANGE_FANOUT_NAME, true, false);
}
/**
* 定义 主题交换机
*/
@Bean("topicExchange")
public TopicExchange topicExchange() {
//参数 交换机名称
return new TopicExchange(EXCHANGE_TOPIC_NAME, true, false);
}
/**
* 创建队列
* 参数一:队列名称
* 参数二durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* 参数三exclusive:默认也是false,是否独占队列
* 参数四autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
*/
@Bean("queue1")
public Queue queue1() {
return new Queue(QUEUE1, true, false, false);
}
@Bean("queue2")
public Queue queue2() {
return new Queue(QUEUE2, true, false, false);
}
@Bean("queue3")
public Queue queue3() {
return new Queue(QUEUE3, true, false, false);
}
@Bean("queue4")
public Queue queue4() {
return new Queue(QUEUE4, true, false, false);
}
/**
* 队列绑定交换机
*
* @param queue 队列注入到容器的id,也就是方法名 Queue1
* @param directExchange 交换机注入到容器的id,也就是方法名 directExchange
* @return
*/
@Bean
public Binding bindingQueue1DirectExchange(@Qualifier("queue1") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY1);
}
@Bean
public Binding bindingQueue2DirectExchange(@Qualifier("queue2") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY2);
}
@Bean
public Binding bindingQueue3DirectExchange(@Qualifier("queue3") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY3);
}
@Bean
public Binding bindingQueue4DirectExchange(@Qualifier("queue4") Queue queue, @Qualifier("directExchange") DirectExchange directExchange) {
return BindingBuilder.bind(queue).to(directExchange).with(ROUTINGKEY4);
}
@Bean
public Binding bindingQueue1FanoutExchange(@Qualifier("queue1") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue2FanoutExchange(@Qualifier("queue2") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue3FanoutExchange(@Qualifier("queue3") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue4FanoutExchange(@Qualifier("queue4") Queue queue, @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
@Bean
public Binding bindingQueue1TopicExchange(@Qualifier("queue1") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY5);
}
@Bean
public Binding bindingQueue2TopicExchange(@Qualifier("queue2") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY5);
}
@Bean
public Binding bindingQueue3TopicExchange(@Qualifier("queue3") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY5);
}
@Bean
public Binding bindingQueue4TopicExchange(@Qualifier("queue4") Queue queue, @Qualifier("topicExchange") TopicExchange topicExchange) {
return BindingBuilder.bind(queue).to(topicExchange).with(ROUTINGKEY6);
}
/**
* 定制rabbitTemplate
* 1、服务收到消息就回调
* 1、spring.rabbitmq.publisher-confirms=true
* 2、设置确认回调ConfirmCallback
* 2、消息正确抵达队列进行回调
* 1、spring.rabbitmq.publisher-returns=true
* spring.rabbitmq.template.mandatory= true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息被正确消费,此时才可以broker删除这个消息)。
* listener.simple.acknowledge-mode=manual
* 1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息
* 问题:
* 我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。发生消息丢失;
* 消费者手动确认。只要我们没有明确告诉MQ,货物被签收。没有Ack,消息就一 直是unacked状态。即使Consumer宕机,
* 消息也不会丢失,会重新变为Ready,下次有新的Consumer连接进来就发给他
* 2、如何签收:
* channel.basicAck(deliveryTag, false) ;签收;业务成功完成就应该签收
* channel.basicNack(deliveryTag, false, true);拒签;业务失败,拒签|
*/
@PostConstruct
public void initRabbitTemplate() {
//设置确认回调
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnsCallback(returnCallback);
}
}
package com.itheima.producer;
import com.itheima.config.RabbitConfig;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* @author: XuXin
* @date: 2023/9/25
* 1、发送消息,如果发送的消息是个对象,我们会使用序列化机制,将对象写出去。对象必须实现Serializable
*/
@Component
public class MsgProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg(String routingKey, Object content) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DIRECT_NAME, routingKey, content, correlationId);
}
public void sendMsg(String routingKey, Object content, String uuid) {
CorrelationData correlationId = new CorrelationData(uuid);
//把消息放入ROUTINGKEY_A对应的队列当中去,对应的是队列A
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_DIRECT_NAME, routingKey, content, correlationId);
}
}
package com.itheima.receiver;
import com.itheima.domain.Book;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author: XuXin
* @date: 2023/9/25
*/
@Slf4j
@Component
@RabbitListener(queues = "atguigu.news")
public class MsgReceiver {
/**
* queues: 声明需要监听的所有队列
* org.springframework-amqp.core.Message
* 参数可以写一下类型
* 1、Message message: 原生消息详细信息。头+体
* 2、T<发送的消息的类型> OrderReturnReasonEntity content;
* 3、Channel channel: 当前传输数据的通道
*
* Queue: 可以很多人都来监听。只要收到消息,队列删除消息,而且只能有一个收到此消息
* 场景:
* 1)、订单服务启动多个;同一个消息,只能有一个客户端收到
* 2)、只有一个消息完全处理完,方法运行结束,我们就可以接收到下一-个消息
*
* @param message
* @param content
* @throws IOException
*/
@RabbitHandler
public void processHandler(Message message, Book content, Channel channel) throws IOException {
try {
log.info("收到消息:{}", content);
//TODO 具体业务
//是deliveryTagchannel内按顺序自增的
long deliveryTag = message.getMessageProperties().getDeliveryTag();
//签收消息,非批量模式
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error("消息即将再次返回队列处理...");
//退货 requeue=false丢弃 requeue=true发回服务器,服务器重新入队。
//long deliveryTag, boolean multiple, boolean requeue
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
监听消息必须要有@EnableRabbit注解,如果只是创建交换机,队列以及绑定不需要此注解。
消费消息有三种回执方法,我们来分析一下每种方法的含义。
1、basicAck
2、basicNack
3、basicReject
package com.itheima.controller;
import com.itheima.config.RabbitConfig;
import com.itheima.domain.Book;
import com.itheima.producer.MsgProducer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author: XuXin
* @date: 2023/9/25
*/
@Slf4j
@RestController
public class TestRabbitMq {
@Autowired
private MsgProducer msgProducer;
@GetMapping("/sendMq")
public String sendMq() {
System.out.println("执行");
Book book = new Book();
book.setName("cook");
book.setType("eat");
msgProducer.sendMsg(RabbitConfig.ROUTINGKEY2,book);
return "ok";
}
}
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
try {
log.info("消费者 2 号收到:{}", msg);
int a = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}