上一篇文章《SpringBoot案例》
概述
过期时间 TTL 表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对 消息和队列 设置 TTL,目前有两种方法可以设置
如果上述两种方式同时使用,则消息的过期时间以两者之间 TTL 较小的那个数值为准。消息队列的生存时间一旦超过设置的 TLL 值,就称为 dead message 被投递到死信队列,消费者将无法再收到该消息
队列过期时间 - 代码测试
config
包下,新建TTLRabbitMQConfiguration.java类package com.vinjcent.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLRabbitMQConfiguration {
// 1.声明注册direct模式交换机
@Bean
public DirectExchange ttl_directExchange(){
return new DirectExchange("ttl_order_exchange", true, false);
}
// 2.声明队列
@Bean
public Queue ttl_accountQueue(){
// 设置过期时间
Map<String, Object> args = new HashMap<>();
// 根据参数说明设置对应参数
args.put("x-message-ttl",5000);
return new Queue("account.ttl.queue", true, false, false, args);
}
// 3.完成绑定关系(队列)
@Bean
public Binding ttl_accountBinding(){
return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
}
}
service
包下的OrderServiceImpl.java新增 ttl 函数package com.vinjcent.rabbitmq.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@SuppressWarnings("all")
@Service
public class OrderServiceImpl {
@Autowired
RabbitTemplate rabbitTemplate;
// fanout模式
public void createOrderFanout(String userId, String productId, int num){
//......
}
// direct模式
public void createOrderDirect(String userId, String productId, int num){
//......
}
// topic
public void createOrderTopic(String userId, String productId, int num){
//......
}
// ttl
public void createOrderTtl(String userId, String productId, int num){
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功: " + orderId);
// 3.通过MQ来完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "ttl_order_exchange";
// 路由给三个消息队列推送消息
String routingKey = "ttl";
/*
* #.account.#
* *.express.#
* sms.#
*/
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
OrderServiceImpl orderService;
@Test
void contextLoads() {
orderService.createOrderFanout("1","1",12);
}
@Test
void testDirect() {
orderService.createOrderDirect("1","1",12);
}
@Test
void testTopic() {
orderService.createOrderTopic("1","1",12);
}
// 选择该测试用例
@Test
void testTtl() {
orderService.createOrderTtl("1","1",12);
}
}
队列消息过期时间 - 代码测试
config
包下,在TTLRabbitMQConfiguration.java类添加以下内容package com.vinjcent.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLRabbitMQConfiguration {
// 1.声明注册fanout模式交换机
@Bean
public DirectExchange ttl_directExchange(){
return new DirectExchange("ttl_order_exchange", true, false);
}
// 2.声明队列
@Bean
public Queue ttl_accountQueue(){
// 设置过期时间
Map<String, Object> args = new HashMap<>();
// 根据参数说明设置对应参数
args.put("x-message-ttl",5000);
return new Queue("account.ttl.queue", true, false, false, args);
}
@Bean
public Queue ttlMessage_accountQueue(){
return new Queue("account.ttl.message.queue", true, false, false);
}
@Bean
public Binding ttl_accountBinding(){
// ...
}
// 绑定操作
@Bean
public Binding ttlMessage_accountBinding(){
return BindingBuilder.bind(ttlMessage_accountQueue()).to(ttl_directExchange()).with("ttlmessage");
}
}
service
包下的OrderServiceImpl.java新增 ttlMessage 函数package com.vinjcent.rabbitmq.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@SuppressWarnings("all")
@Service
public class OrderServiceImpl {
@Autowired
RabbitTemplate rabbitTemplate;
// fanout模式
public void createOrderFanout(String userId, String productId, int num){
//......
}
// direct模式
public void createOrderDirect(String userId, String productId, int num){
//......
}
// topic
public void createOrderTopic(String userId, String productId, int num){
//......
}
// ttl
public void createOrderTtl(String userId, String productId, int num){
//......
}
// ttlmessage
public void createOrderTtlMessage(String userId, String productId, int num){
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功: " + orderId);
// 3.通过MQ来完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "ttl_order_exchange";
// 路由给三个消息队列推送消息
String routingKey = "ttlmessage";
// 给消息设置过期时间
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 参数类型是字符串
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId, postProcessor);
}
}
package com.vinjcent.rabbitmq;
import com.vinjcent.rabbitmq.service.OrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
OrderServiceImpl orderService;
@Test
void contextLoads() {
orderService.createOrderFanout("1","1",12);
}
@Test
void testDirect() {
orderService.createOrderDirect("1","1",12);
}
@Test
void testTopic() {
orderService.createOrderTopic("1","1",12);
}
@Test
void testTtl() {
orderService.createOrderTtl("1","1",12);
}
// 使用该测试用例
@Test
void testTtlMessage() {
orderService.createOrderTtlMessage("1","1",12);
}
}
两者的区别在于,过期的消息队列,不会立即删除掉队列里的消息,而是将该队列中的消息放在死信队列中;而过期消息的消息队列则会立马删除掉该指定的消息
当两者同时存在时,谁的有效期时间短,就以该较短的时间作为标准
概述
DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为私信邮箱。当消息在一个队列中变成了死信(Dead Message)之后,它能被重新发送到另一个交换机中,这个交换机就是 DLX,绑定 DLX 的队列就称之为死信队列
消息变成死信,有如下原因
DLX 也是一个正常的交换机,和一般的交换机没有区别,它能在任何队列上被指定,实际上就是设置某一个队列的属性。当这个队列中存在死信时,Rabbitmq 就会自动将这个消息重新发布到设置的 DLX 上去,进而被路由到另外一个队列,即死信队列
想要使用死信队列,只需要在定义队列的时候设置队列参数x-dead-letter-exchange
指定交换机即可
config
包下,添加DeadRabbitMQConfiguration.java类(定义死信交换机和死信队列
)package com.vinjcent.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class DeadRabbitMQConfiguration {
// 1.声明注册fanout模式交换机
@Bean
public DirectExchange deadDirect(){
return new DirectExchange("ttl_dead_exchange", true, false);
}
// 2.声明队列
@Bean
public Queue deadQueue(){
return new Queue("direct.dead.queue", true);
}
// 3.完成绑定关系(队列)
// 路由
@Bean
public Binding deadBinding(){
return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
}
}
service
包下的TTLRabbitMQConfiguration.java类(为该队列和交换机绑定死信参数
)
package com.vinjcent.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class TTLRabbitMQConfiguration {
// 1.声明注册fanout模式交换机
@Bean
public DirectExchange ttl_directExchange(){
return new DirectExchange("ttl_order_exchange",true,false);
}
// 2.声明队列
@Bean
public Queue ttl_accountQueue(){
// 设置过期时间
Map<String, Object> args = new HashMap<>();
// 根据参数说明设置对应参数
args.put("x-message-ttl",5000);
// 配置死信交换机
args.put("x-dead-letter-exchange","ttl_dead_exchange");
// 配置死信队列的路由key
args.put("x-dead-letter-routing-key","dead"); // direct模式需要配置路由key,而fanout模式不需要
return new Queue("account.ttl.queue",true,false,false,args);
}
// 3.完成绑定关系(队列)
// direct模式比fanout模式多了一个路由key
@Bean
public Binding ttl_accountBinding(){
return BindingBuilder.bind(ttl_accountQueue()).to(ttl_directExchange()).with("ttl");
}
}
service
包下的OrderServiceImpl.java类下的createOrderTtl函数package com.vinjcent.rabbitmq.service;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@SuppressWarnings("all")
@Service
public class OrderServiceImpl {
@Autowired
RabbitTemplate rabbitTemplate;
// fanout模式
public void createOrderFanout(String userId, String productId, int num){
//......
}
// direct模式
public void createOrderDirect(String userId, String productId, int num){
//......
}
// topic
public void createOrderTopic(String userId, String productId, int num){
//......
}
// ttl
public void createOrderTtl(String userId, String productId, int num){
// 1.根据商品id查询库存是否充足
// 2.保存订单
String orderId = UUID.randomUUID().toString();
System.out.println("订单生产成功: " + orderId);
// 3.通过MQ来完成消息的分发
// 参数1: 交换机 参数2: 路由key/queue队列名称 参数3: 消息内容
String exchangeName = "ttl_order_exchange";
// 路由给三个消息队列推送消息
String routingKey = "ttl";
/*
* #.account.#
* *.express.#
* sms.#
*/
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
// ttlmessage
public void createOrderTtlMessage(String userId, String productId,int num){
//......
}
}
配置的死信队列参数可在web界面查看
package com.vinjcent.rabbitmq;
import com.vinjcent.rabbitmq.service.OrderServiceImpl;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
class SpringbootOrderRabbitmqProducerApplicationTests {
@Autowired
OrderServiceImpl orderService;
@Test
void contextLoads() {
orderService.createOrderFanout("1","1",12);
}
@Test
void testDirect() {
orderService.createOrderDirect("1","1",12);
}
@Test
void testTopic() {
orderService.createOrderTopic("1","1",12);
}
// 使用该测试用例
@Test
void testTtl() {
orderService.createOrderTtl("1","1",12);
}
@Test
void testTtlMessage() {
orderService.createOrderTtlMessage("1","1",12);
}
}
报错原因:由于已创建的队列,在原来的基础上进行代码的修改,这个队列不会被更改或者覆盖,只能通过删除之后重新运行
在实际开发中,并不推荐直接从线上直接删除,这样做的风险比较大。可以通过重新创建一个队列,将要删除的队列的死信队列与之绑定,将死信队列里的信息用生产者发送到新的队列当中,实现转换/迁移的过程
对于参数的设置,可以在web界面查看异常队列规则,从而限制队列消息的推入
下一篇文章《内存磁盘的监控》