死信:无法被消费的消息,一般情况下:生产者将消息投递到broker或者直接到queue中,消费者从queue取出消息进行消费,但是某些时候,由于特定原因导致queue中的某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信消息,有了死信消息就产生了死信队列。
当消息消费发生异常时,将消息投入死信队列中。比如:用户在商城下单成功并点击去支付后,在指定时间未支付时自动失效。
1.消息TTL(存活时间)过期
2.队列达到最大长度(队列满了,无法再添加数据到mq中)
3.消息被拒绝(basic.reject或basic.nack)并且requeue = false(不放回队列中)
示例代码:
消费者1是最为重要的,需要声明交换机1,队列1,以及消费者2,队列2,并且需要当消息异常时,将死信消息转发到交换机2,再由交换机2转发到队列2.
消费者2只需要消费队列2的消息。
生产者需要向交换机1发送消息。
消费者1:
和以前不一样,这里需要在队列1,就是普通队列中,添加对死信的操作,因此为arguments参数进行了定义。
/**
* 死信
* 消费者01
*/
public class Consumer01 {
//正常交换机
public static final String NORMAL_EXCHANGE = "normal_exchange";
//死信交换机
public static final String DEAD_EXCHANGE = "dead_exchange";
//正常队列
public static final String NORMAL_QUEUE = "normal_queue";
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtiles.getChannel();
//声明普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
//声明普通队列
/**
* 普通队列需要指定对应的参数
* 才能够当消息成为死信后
* 转发到死信交换机
* 再发送到死信队列
* arguments
*/
Map<String, Object> arguments = new HashMap<>();
//过期时间 10s 单位毫秒
//也可以在生产者发送时,指定过期时间,一般使用生产者发送时指定
// arguments.put("x-message-ttl",10000);
//正常队列过期后设置死信交换机 x-dead-letter-exchange 固定格式
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//设置死信交换机的routingkey
arguments.put("x-dead-letter-routing-key","roukey2");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//绑定普通交换机与队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"roukey1");
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定死信交换机与死信队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"roukey2");
//消息消费的时候如何处理消息
DeliverCallback deliverCallback = (consumerTag, message)->{
//消息有消息头、消息体
System.out.println("消费者01接收到的消息"+new String(message.getBody(),"UTF-8"));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被终断");
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
生产者:
生产者同样做了变动,设置了消息变为死信的过期时间,并且作为参数传递了进去。
public class ProducerLog {
public static final String EXCHANGE_NAME = "normal_exchange";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtiles.getChannel();
//设置过期时间,变为死信 10s 单位毫秒
AMQP.BasicProperties properties = new AMQP.BasicProperties().
builder().expiration("10000").
build();
for (int i = 0; i < 10; i++) {
String message = "info:"+i;
channel.basicPublish(EXCHANGE_NAME,"roukey1",properties,message.getBytes("UTF-8"));
}
}
}
消费者2
只消费死信队列中的消息
public class Consumer02 {
//死信队列
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtiles.getChannel();
//消息消费的时候如何处理消息
DeliverCallback deliverCallback = (consumerTag, message)->{
//消息有消息头、消息体
System.out.println("消费者02接收到的消息"+new String(message.getBody(),"UTF-8"));
};
//取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被终断");
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
1.先启动消费者1,创建正常队列、死信队列、正常交换机、死信交换机
2.再关闭消费者1,交换机和队列创建后,依然存活,模拟消费者1,未能及时消费
3.启动生产者,生产者发送消息值正常交换机
4.现象:正常队列内信息条数为10,过了10秒后,变为0,死信队列消息条数由0变为10.
5.启动消费者2,死信队列消息被消费。
需要在消费1,声明正常队列时,加入如下参数即可
设置最大长度为6,超过6个,多余的就为死信,进入死信队列。
生产者删除,最大存活时间的设置。
注,之前队列已经创建,属性改变,之前队列需要删除
//设置正常队列长度的限制
arguments.put("x-max—letter",6);
需要在消费者1接收消息的回调函数中,进行改造,增加拒绝操作
//消息消费的时候如何处理消息
DeliverCallback deliverCallback = (consumerTag, message)->{
String mes = new String(message.getBody(), "UTF-8");
if (mes.equals("info:5")){
System.out.println("消费者01拒绝消息"+mes);
/**
* 拒绝消息
* 消息的标签
* 是否放回原队列
*/
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
}else {
/**
* 应答消息
* 消息的标签
* 是否批量应答
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
//消息有消息头、消息体
System.out.println("消费者01接收到的消息"+mes);
}
};