ConcucrrentSkipListMap来存储id - 消息内容 的映射,在回调函数中用correlationData.getId()来获取对应的消息;也可以通过备份交换机进行处理ConfirmCallback 回调函数(重心在判断是否找到exchange) ,生产者的消息只要进入了broker就会触发,如果没找到对应的exchange,则ack=false;如果找到了对应的exchange,则ack=trueReturnCallback退回模式(重心在判断是否找到对应的queue),使用这个之前需要先设置mandatory=true;触发条件是:找到了exchange,但没找到routingkey对应的queue,才会执行(找到了queue则不执行)也可以用连接工厂 而不是写死在配置文件中

/**
* 消息只要被 rabbitmq broker 接收到就会触发 confirmCallback 回调 。
* @param correlationData 回调的相关数据。
* @param ack ack为真,nack为假
* @param cause 一个可选的原因,用于nack,如果可用,否则为空。
*/
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("消息发送异常!correlationData={} ,ack={}", correlationData.getId(), ack);
} else {
log.info("消息已成功推送到mq队列,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
}
}
}
此时如果直接@Autowired一个RabbitTemplate对象,是没有装配这个回调的,需要手动调用:
rabbitTemplate.setConfirmCallback(RabbitTemplate.ConfirmCallback confirmCallback)
由于ConfirmCallback本身是一个接口,根据多态性,这里传入其实现类即可
在3.2中有一个实现类,直接传进入就好,为了可以区分不同的配置,我们可以采用如下方式:
在配置类中,创建名为myRabbitTemplate的Bean对象,逻辑是根据连接工厂(后面写)新new一个RabbitTemplate对象进行设置属性
不直接操作原生的rabbitTemplate对象是为了解耦
@Bean(name = "myRabbitTemplate")
public RabbitTemplate wwRabbitTemplate(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setConfirmCallback(new ConfirmCallbackService());
return rabbitTemplate;
}
使用时直接用@Resource
@Resource(name = "myRabbitTemplate")
private RabbitTemplate rabbitTemplate;
/**
* 如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。
*/
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("returnedMessage:replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
}
}
当然配置信息其实不应该这样写死,因为能会使用非集群的多个mq,因此在5号标题中引出连接工厂
spring.rabbitmq.host=12...
#spring.rabbitmq.port=5672 默认
spring.rabbitmq.username=ad..
spring.rabbitmq.password=.
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@Bean(name = "myRabbitTemplate")
public RabbitTemplate wwRabbitTemplate(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
/**
* 确保消息发送失败后可以重新返回到队列中
* 注意:yml需要配置 publisher-returns: true
*/
// 将两种不同确认模式的回调接口实例传入
rabbitTemplate.setConfirmCallback(new ConfirmCallbackService());
//找不到routingkey时的处理
rabbitTemplate.setMandatory(Boolean.TRUE);//退回函数生效
rabbitTemplate.setReturnCallback(new ReturnCallbackService());
return rabbitTemplate;
}

我现在有很多个mq,其中有一个的配置如下(写在配置类中)
@Bean(name = "myConnectionFactory")
public ConnectionFactory myConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(host);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setPublisherConfirms(true); //发布确认回调
connectionFactory.setPublisherReturns(true); //退回
return connectionFactory;
}
使用时用@Resource注入即可
@Bean(name = "myRabbitTemplate")
public RabbitTemplate wwRabbitTemplate(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
/**
* 确保消息发送失败后可以重新返回到队列中
* 注意:yml需要配置 publisher-returns: true
*/
// 将两种不同确认模式的回调接口实例传入
rabbitTemplate.setConfirmCallback(new ConfirmCallbackService());
//找不到routingkey时的处理
rabbitTemplate.setMandatory(Boolean.TRUE);//退回函数生效
rabbitTemplate.setReturnCallback(new ReturnCallbackService());
return rabbitTemplate;
}
首先,Spring整合RabbitMQ中的回调实现是异步、独立的
并且这个ConfirmCallback不是原生的那个,是Spring中的,是一个内部接口,不要混淆了

这就要涉及到另外两个原生的方案:
异步、独立确认:不影响消息发布线程,且能准确定位具体是哪条消息出的问题
correlationData.getId()获取一个唯一的id,更加可靠Channel channel = connection.createChannel();
channel.queueDeclare("异步队列",true, false,false,null);
//1.线程安全有序的一个队列,并发
ConcurrentSkipListMap mqMap = new ConcurrentSkipListMap<>();
//2.回调
//2.1确认回调
ConfirmCallback ack = (deliveryTag,multiple)->{
//日志记录等操作,略
//去掉回调收到的第deliveryTag号消息
if(multiple) {//如果是批量,则批量删除
ConcurrentNavigableMap newMqMap = mqMap.headMap(deliveryTag);
newMqMap.clear();
}else{//单个直接删
mqMap.remove(deliveryTag);
}
//此时的mqMap中只剩下了尚未确认的消息
};
//2.2否认回调
ConfirmCallback nack = (deliveryTag,multiple)->{
//日志记录等操作,略
System.out.println(mqMap.get(deliveryTag));
/*收到了nack,消息不会重新入队,需要自行处理
一般都是需要重发的,所以nack时不去除mqMap中的元素
*/
};
/*** 生产者开启confirm确认模式****/
channel.confirmSelect();
//3.新的线程:发布确认的消息监听 监听ack和nack
//注意顺序, 前面是ack,后面是nack
channel.addConfirmListener(ack,nack);//还有另外一种构造形式,效果相同
//4.模拟发布消息
for(int i = 0 ; i < 1000 ; i++){
channel.basicPublish("",//交换机
"队列:"+i,//routingKey
MessageProperties.PERSISTENT_TEXT_PLAIN,//消息策略:消息持久化
("消息"+i).getBytes(StandardCharsets.UTF_8));//消息内容
//每次发布,mqMap都更新一个值
mqMap.put(channel.getNextPublishSeqNo(),"消息"+i);
}
由上述可知:我们可以从回调接口中获取CorrelationData参数,这个类中包含了一个很重要的信息————消息id,我们可以创建一个ConcucrrentSkipListMap来存储id - 消息内容 的映射,每次回调时:
ConfirmCallback:只要进入broker就回调,可以感知到exchange是否出错mandatory和ReturnCallback:可以感知到queue是否出错
他本质也是个普通的扇出交换机,所以区别只是在“消息失败后投递给他”
普通交换机 指向(alternate-exchange) 备份交换机

SpringBoot默认是自动应答(监听队列也是),同时提供的API也会应答
首先需要关闭自动应答

spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
当然,仍然可以通过配置类来进行低耦合配置
@Bean
public RabbitListenerContainerFactory> rabbitListenerContainerFactory(@Qualifier("myConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
注:如果@RabbitListener写在类上,就需要搭配@RabbitHandler使用


形参中有channel
// 确认 : 消息tag,不批量
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//拒绝(不重新入队): 消息tag,不批量
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
// 否定(可选重新入队):消息tag,不批量,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
在使用方面,声明队列的时候,其实底层也是调用的channel.queueDeclare(…)中第二个参数durable设置为true

SpringBoot提供的API中是默认消息持久化的
channel.basicPublish的第三个参数,将策略设置为MessageProperties.PERSISTENT_TEXT_PLAIN

参考:

其他的关于RabbitMQ负载均衡部分省略
一般也都是用Redis,快
全局唯一ID + Redis

操作的是max-priority属性

