一、死信队列
死信队列,英文缩写:DLX。Dead Letter Exchange (死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
1.1消息在什么情况下会成为死信?
1.队列消息长度到达限制;
⒉消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
3.原队列存在消息过期设置,消息到达超时时间未被消费;
1.2如何绑定死信交换机?
1.2.1 broker设置:
给队列设置参数: x-dead-letter-exchange和x-dead-letter-routing-key
1.2.2 代码设置
1.2.2.1 消息过期进入死信队列
/**
* 正常队列
* @return
*/
@Bean
public Binding normalBinding(){
return BindingBuilder.bind(normalQueue()).to(topicExchange()).with("normal.#");
}
@Bean
public Queue normalQueue(){
Queue queue = QueueBuilder.durable("normal.queue").ttl(5000).deadLetterRoutingKey("dlx.hello").deadLetterExchange("dlx.exchange").build();
return queue;
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("normal.exchange");
}
/**
* 死信队列
* @return
*/
@Bean
public Queue dlxQueue(){
Queue queue = QueueBuilder.durable("dlx.queue").build();
queue.setActualName("dlx.queue");
return queue;
}
@Bean
public TopicExchange dlxTopicExchange(){
return new TopicExchange("dlx.exchange");
}
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxTopicExchange()).with("dlx.#");
}
1.2.2.2 队列的消息超过最大长度进入死信队列
Queue queue = QueueBuilder.durable("normal.queue").maxLength(5).deadLetterRoutingKey("dlx.hello").deadLetterExchange("dlx.exchange").build();
public void testDlx(){
String msg = "这是发送的信息";
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("normal.exchange","normal.test",msg+i);
}
}
1.2.2.3 消费者拒收进入死信队列(注入设置requeue为false,不然会重试无法进入死信队列)
@RabbitListener(queues = {"normal.queue"})
public void normalReceieveMsg(Message message,Channel channel) throws IOException {
System.out.println("正常队列接收到消息->"+messageConverter.fromMessage(message));
System.out.println("拒收进入死信队列");
long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag,true,false);
}
二、延迟队列
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
使用TTL加死信队列实现
三、消息追踪
在使用任何消息中间件的过程中,难免会出现某条消息异常丢失的情况。对于RabbitMQ而言,可能是因为生产者或消费者与RabbitMQ断开了连接,而它们与RabbitMQ又采用了不同的确认机制;也有可能是因为交换器与队列之间不同的转发策略;甚至是交换器并没有与任何队列进行绑定,生产者又不感知或者没有采取相应的措施;另外RabbitMQ本身的集群策略也可能导致消息的丢失。这个时候就需要有一个较好的机制跟踪记录消息的投递过程,以此协助开发和运维人员进行问题的定位。
在RabbitMQ中可以使用Firehose和rabbitmq_tracing插件功能来实现消息追踪。
1、
进入rabbitmq
rabbitmqctl trace_on
2、
启用rabbitmq_tracing插件
rabbitmq-plugins enable rabbitmq_tracing