✨✨个人主页:沫洺的主页
📚📚系列专栏: 📖 JavaWeb专栏📖 JavaSE专栏 📖 Java基础专栏📖vue3专栏
📖MyBatis专栏📖Spring专栏📖SpringMVC专栏📖SpringBoot专栏
📖Docker专栏📖Reids专栏📖MQ专栏📖SpringCloud专栏
💖💖如果文章对你有所帮助请留下三连✨✨
在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:
- 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时。 或者拒绝basicReject
- 消费者发生异常,超过重试次数 。
- 消息的 TTL 时间过期。
- 消息队列达到最大长度。
上述场景经常产生死信,即消息在这些场景中时,被称为死信。
死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。
死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。
//定义业务队列 @Bean public Queue directQueue() { return QueueBuilder.durable(QNAME) .deadLetterExchange(DENAME) //通过这两个配置,使我们的业务队列与死信交换机有关系了 .deadLetterRoutingKey(DKEY) .ttl(60*1000) //设置消息多久过期,消息超过这个时间就直接给死信交换机,让它处理 .maxLength(1000) //设置队列最大容量,超过这个容量就直接给死信交换机,让它处理 .build(); }环境搭建
消费者
@Component public class DeadConsumer { //死信 private static final String DENAME = "211-DeadExchage-死信"; private static final String DQNAME = "211-DeadQueue-死信"; private static final String DKEY = "211-DeadQueue-RoutingKey"; //业务 private static final String ENAME = "211-DirectExchage-业务"; private static final String QNAME = "211-DirectQueue-业务"; private static final String KEY = "211-DirectQueue-RoutingKey"; //region 队列和交换机的注册 //定义死信交换机,本质就是普通的直连交换机 @Bean public DirectExchange deadExchange() { return new DirectExchange(DENAME, true, false); } //定义死信队列 @Bean public Queue deadQueue() { return QueueBuilder.durable(DQNAME).build(); } //创建死信队列和死信交换机的绑定关系 @Bean public Binding binding1() { return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(DKEY); } //定义业务交换机 @Bean public DirectExchange directExchange() { return new DirectExchange(ENAME, true, false); } //定义业务队列 @Bean public Queue directQueue() { return QueueBuilder.durable(QNAME) .deadLetterExchange(DENAME) //通过这两个配置,使我们的业务队列与死信交换机有关系了 .deadLetterRoutingKey(DKEY) .ttl(60*1000) //设置消息多久过期,消息超过这个时间就直接给死信交换机,让它处理 .maxLength(5) //设置队列最大容量,超过这个容量就直接给死信交换机,让它处理 .build(); } //创建业务队列和业务交换机的绑定关系 @Bean public Binding binding2() { return BindingBuilder.bind(directQueue()).to(directExchange()).with(KEY); } //endregion //业务消费者 @RabbitHandler @RabbitListener(queues = QNAME) public void process1(UserRegisterOk userRegisterOk) { System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"消费者接收消息:" + userRegisterOk.getName() + "," + userRegisterOk.getAge()); } }生产者
@Component public class DeadProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ UserRegisterOk userRegisterOk1 = UserRegisterOk.builder().name("张三").age(18).build(); //要将对象序列化,转成字符串,使用消息转换器MessageConverter rabbitTemplate.convertAndSend( "211-DirectExchage-业务", "211-DirectQueue-RoutingKey", userRegisterOk1); System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生产者生产消息-->张三发送成功"); } }启动类
@SpringBootApplication public class App4 { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(App4.class, args); DeadProducer producer = context.getBean(DeadProducer.class); producer.sendMessage(); } }场景描述: 当没有消费者时,消息的 TTL 时间过期
ttl(60*1000) //设置消息多久过期,消息超过这个时间就直接给死信交换机,让它处理
死信队列一般是没有消费者的,都是通过人工干预去处理
场景描述: 没有消费者时,消息队列达到最大长度
生产者发送8条消息,消息队列最大长度为5,多出的3条会放到死信队列里
maxLength(5) //设置队列最大容量,超过这个容量就直接给死信交换机,让它处理
场景描述: 手动应答下,消息被拒绝访问
#开启消费者应答模式为 auto自动应答 manual手动应答 spring.rabbitmq.listener.simple.acknowledge-mode=manual #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试 spring.rabbitmq.listener.simple.retry.enabled=false还是生产者发送8条消息,区别是对象年龄不同
@Component public class DeadProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ for (int i=1;i<=8;i++){ UserRegisterOk userRegisterOk1 = UserRegisterOk.builder().name("张三").age(18+i).build(); //要将对象序列化,转成字符串,使用消息转换器MessageConverter rabbitTemplate.convertAndSend( "211-DirectExchage-业务", "211-DirectQueue-RoutingKey", userRegisterOk1); System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"生产者生产消息-->张三发送成功"); } } }消费者判断对象的年龄,符合就应答,不符合就拒绝应答
//业务消费者 @RabbitHandler @RabbitListener(queues = QNAME) public void process1(UserRegisterOk userRegisterOk,Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); if(userRegisterOk.getAge()>23){ System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"消费者接收消息:" + userRegisterOk.getName() + "," + userRegisterOk.getAge()+"年龄符合"); channel.basicAck(deliveryTag,false); }else { System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"消费者接收消息:" + userRegisterOk.getName() + "," + userRegisterOk.getAge()+"年龄不符合"); channel.basicNack(deliveryTag,false,false); } }场景描述: 消费者发生异常,超过重试次数
重试5次
#开启消费者应答模式为 auto自动应答 manual手动应答 spring.rabbitmq.listener.simple.acknowledge-mode=auto #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试 spring.rabbitmq.listener.simple.retry.enabled=true #设置重试最大次数 spring.rabbitmq.listener.simple.retry.max-attempts=5 #设置重试时间最大间隔 spring.rabbitmq.listener.simple.retry.max-interval=8000ms #设置重试时间间隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000ms #设置重试时间间隔的倍数 spring.rabbitmq.listener.simple.retry.multiplier=2模拟消费者异常
//业务消费者 @RabbitHandler @RabbitListener(queues = QNAME) public void process1(UserRegisterOk userRegisterOk) { System.out.println(DateUtil.format(DateUtil.date(),"HH:mm:ss") +"消费者接收消息:" + userRegisterOk.getName() + "," + userRegisterOk.getAge()); int i=1/0; }5次之后就不再消费了,将消息放到死信队列里
#超过重试次数后,进入死信队列,默认为false #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列) spring.rabbitmq.listener.simple.default-requeue-rejected = false