Springboot 版本: 2.7.0
死信,即无法被正常消费的消息。 正常情况下到达中间件队列中的消息都应该也都会被消费者消费,但如果因为一些原因导致消息没有被消费,那么这个消息就被认为是死信了。
配置文件:
spring:
rabbitmq:
host: localhost
port: 5672
username: admin
password: admin
virtual-host: my_vhost
# 消息确认(ACK)
publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
publisher-returns: true #确认消息已发送到队列(Queue)
配置类:配置交换机和队列
@Slf4j
@Configuration
public class RabbitConfiguration {
public final static String TOPIC_EXCHANGE = "myExchange";
public final static String QUEUE_NAME = "myQueue";
public final static String DEAD_EXCHANGE = "myDeadExchange";
public final static String DEAD_QUEUE = "myDeadQueue";
@Bean
public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonConverter());
template.setExchange(TOPIC_EXCHANGE);
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息:{}发送成功", correlationData.getId());
} else {
log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
}
});
template.setMandatory(true);
template.setReturnsCallback(returned -> {
log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
});
return template;
}
// 申明一个常规的交换机
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE, true, false);
}
// 申明一个常规使用的队列
@Bean
public Queue queue() {
// 常规队列与死信交换机的绑定关系
Map<String, Object> queueParams = new HashMap<>(3);
queueParams.put("x-dead-letter-exchange", DEAD_EXCHANGE);
queueParams.put("x-dead-letter-routing-key","my.dead.letter");
//设置队列长度为5
queueParams.put("x-max-length", 5);
return new Queue(QUEUE_NAME, true, false, false, deadLetterParams);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
}
@Bean
public Jackson2JsonMessageConverter jsonConverter() {
return new Jackson2JsonMessageConverter();
}
// 申明一个死信交换机
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE, true, false);
}
// 申明一个死信队列
@Bean
public Queue deadQueue() {
return new Queue(DEAD_QUEUE);
}
// 绑定死信交换机和死信队列
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("my.dead.letter");
}
}
运行项目,会自动生成2个Exchange和2个Queue, 如下图所示:
D: durable, 表示持久的。
DLX: x-dead-letter-exchange = myDeadExchange,表示这个队列关联到的死信交换机。
DLK: x-dead-letter-routing-key = my.dead.letter,表示这个队列的死信 routingKey, 根据这个routingKey将死信路由到相应的死信队列。
生产者代码:设置太ttl=10s,没有消费者。
@Component
public class Publisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Jackson2JsonMessageConverter jsonConverter;
public void send(){
CorrelationData correlationData = new CorrelationData();
MessageProperties messageProperties = new MessageProperties();
//ttl 10s
messageProperties.setExpiration("10000");
Message message = jsonConverter.toMessage(new User("Kleven", 18), messageProperties);
rabbitTemplate.convertAndSend("my.test.message", message, correlationData);
}
}
结果:生产者发布运行10s后,由于没有消费者消息,消息过期进入死信队列。
生成者:发送10条消息,因为配置类通过 “x-max-length” 参数限制了队列长度为5,所以有5条消息会进入死信队列。
@Component
public class PublisherForMaxQueueLength {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
int age = 18;
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my.test.message", new User("kleven", age++), new CorrelationData());
}
}
}
结果:前5条信息(age = 18 - 22)进入死信队列。
Lim: x-max-length = 5,代表队列长度为5。
生产者:生成5个消息,年龄从16开始。
@Component
public class PublisherForReject{
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(){
int age = 16;
for (int i = 0; i < 5; i++) {
rabbitTemplate.convertAndSend("my.test.message", new User("kleven", age++), new CorrelationData());
}
}
}
消费者:用户年龄大于18才能被消费。
@Slf4j
@Component
public class Consumer {
// 正常消费
@RabbitListener(queues = "myQueue", messageConverter = "jsonConverter", ackMode = "MANUAL")
public void normalConsumer(@Payload User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
int age = user.getAge();
if (age >= 18) {
log.info("正常消费者消费 -> {}", user);
channel.basicAck(deliveryTag, false);
} else {
channel.basicNack(deliveryTag, false, false);
}
}
// 死信消费
@RabbitListener(queues = "myDeadQueue", messageConverter = "jsonConverter")
public void deadConsumer(User user) {
log.info("死信消费者消费 -> {}", user);
}
}
结果:年龄18,19,20被正常消费,16,17被当做死信处理。