消息丢失:下图是消息从生产者发送到消费者接收的关系图。通过图片可以看出,消息在生产者、MQ、消费者这三个环节都有可能丢失。

配置
package com.qiangesoft.rabbitmq.producer;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 消息发送配置
*
* @author qiangesoft
* @date 2024-05-08
*/
@Configuration
public class MessageConfig {
public static final String EXCHANGE = "simple.exchange";
public static final String QUEUE = "simple.queue";
public static final String ROUTING_KEY = "simple";
@Bean
public DirectExchange simpleExchange() {
return ExchangeBuilder
.directExchange(EXCHANGE)
// 持久化交换机
.durable(true)
.build();
}
@Bean
public Queue simpleQueue() {
return QueueBuilder
// 持久化队列
.durable(QUEUE)
// 避免消息堆积、懒加载
.lazy()
.build();
}
@Bean
public Binding simpleBinding(Queue simpleQueue, DirectExchange simpleExchange) {
return BindingBuilder.bind(simpleQueue).to(simpleExchange).with(ROUTING_KEY);
}
}
背景:生产者发送消息时,出现了网络故障,导致与MQ的连接中断。
解决方案:配置连接超时时间、重试机制。
spring:
rabbitmq:
# 设置MQ的连接超时时间
connection-timeout: 1s
template:
# 连接重试机制
retry:
enabled: true
# 失败后的初始等待时间
initial-interval: 1000ms
# 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
multiplier: 1
# 最大重试次数
max-attempts: 3
背景:
解决方案:配置Publisher Confirm机制、Publisher Return机制。
spring:
rabbitmq:
# 开启publisher confirm机制,并设置confirm类型,确保消息到达交换机
publisher-confirm-type: correlated
# 开启publisher return机制,确保消息到达队列
publisher-returns: true
package com.qiangesoft.rabbitmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
/**
* 生产者
*
* @author qiangesoft
* @date 2024-05-08
*/
@Slf4j
@RequestMapping("/producer")
@RestController
public class ProducerController {
@Autowired
public RabbitTemplate rabbitTemplate;
@GetMapping("/send")
public void send(String content) {
CorrelationData correlation = getCorrelationData();
Message message = MessageBuilder
.withBody(content.getBytes(StandardCharsets.UTF_8))
.setMessageId(UUID.randomUUID().toString())
// 消息持久化
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 正常发送
rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);
}
private static CorrelationData getCorrelationData() {
// 异步回调返回回执,开启publisher confirm机制【确保消息到达交换机】
CorrelationData correlation = new CorrelationData();
correlation.getFuture().addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息发送异常,ID:{},原因:{}", correlation.getId(), ex.getMessage());
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
log.info("触发【publisher confirm】机制");
if (result.isAck()) {
log.info("消息发送成功到达交换机,ID:{}", correlation.getId());
} else {
// 消息发送失败
log.error("消息发送失败未到达交换机,ID:{},原因:{}", correlation.getId(), result.getReason());
}
}
});
return correlation;
}
}
package com.qiangesoft.rabbitmq.producer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
/**
* 消息路由失败回退配置
*
* @author qiangesoft
* @date 2024-05-08
*/
@Slf4j
@Configuration
public class ReturnsCallbackConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 消息路由失败退回,设置ReturnsCallback【消息到达交换机没有达到队列】
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("触发【publisher return】机制");
log.error("消息投递失败未到达队列,应答码:{},原因:{},交换机:{},路由键:{},消息:{}", returned.getReplyCode(), returned.getReplyText(),
returned.getExchange(), returned.getRoutingKey(), returned.getMessage());
}
});
}
}
@Bean
public DirectExchange simpleExchange() {
return ExchangeBuilder
.directExchange(EXCHANGE)
// 持久化交换机
.durable(true)
.build();
}
@Bean
public Queue simpleQueue() {
return QueueBuilder
// 持久化队列
.durable(QUEUE)
// 避免消息堆积、懒加载
.lazy()
.build();
}
Message message = MessageBuilder
.withBody(content.getBytes(StandardCharsets.UTF_8))
.setMessageId(UUID.randomUUID().toString())
// 消息持久化
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
// 发送
rabbitTemplate.convertAndSend(MessageConfig.EXCHANGE, MessageConfig.ROUTING_KEY, message, correlation);
spring:
rabbitmq:
listener:
simple:
# 自动ack
acknowledge-mode: auto
spring:
rabbitmq:
listener:
simple:
# 消费者失败重试机制
retry:
enabled: true
# 初始的失败等待时长为1秒
initial-interval: 1000ms
# 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
multiplier: 1
# 最大重试次数
max-attempts: 3
# true无状态;false有状态。如果业务中包含事务,这里改为false
stateless: true
package com.qiangesoft.rabbitmq.consumer;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 消息消费失败配置
* ps:配置处理失败消息的交换机和队列
*
* @author qiangesoft
* @date 2024-05-08
*/
@Configuration
public class ErrorMessageConfig {
public static final String EXCHANGE = "error.exchange";
public static final String QUEUE = "error.queue";
public static final String ROUTING_KEY = "error";
@Bean
public DirectExchange errorMessageExchange() {
return new DirectExchange(EXCHANGE);
}
@Bean
public Queue errorQueue() {
return new Queue(QUEUE, true);
}
@Bean
public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(ROUTING_KEY);
}
@Bean
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
return new RepublishMessageRecoverer(rabbitTemplate, EXCHANGE, ROUTING_KEY);
}
}