延时队列就是用来存放需要在指定时间被处理的元素的队列,延时属性,可以理解为就是有ttl时间的死信队列,就是需要我们可以控制时间处理消息。
特点:需要在某个事件发生之后或者之前的指定时间点完成某一项任务
依赖
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-webartifactId>
dependency>
<dependency>
<groupId>com.alibabagroupId>
<artifactId>fastjsonartifactId>
<version>1.2.47version>
dependency>
<dependency>
<groupId>org.projectlombokgroupId>
<artifactId>lombokartifactId>
dependency>
<dependency>
<groupId>io.springfoxgroupId>
<artifactId>springfox-swagger2artifactId>
<version>2.9.2version>
dependency>
<dependency>
<groupId>io.springfoxgroupId>
<artifactId>springfox-swagger-uiartifactId>
<version>2.9.2version>
dependency>
<dependency>
<groupId>org.springframework.amqpgroupId>
<artifactId>spring-rabbit-testartifactId>
<scope>testscope>
dependency>
配置MQ服务器,用户密码啥的
添加Swagger 配置类
package com.feng.springbootrabbitmq.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.service.Contact;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* @Author Feng
* @Date 2022/11/25 16:50
* @Version 1.0
* @Description Swagger配置类
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket webApiConfig() {
return new Docket(DocumentationType.SWAGGER_2)
.groupName("webApi")
.apiInfo(webApiInfo())
.select()
.build();
}
private ApiInfo webApiInfo() {
return new ApiInfoBuilder()
.title("rabbitmq 接口文档")
.description("本文档描述了 rabbitmq 微服务接口定义")
.version("1.0")
.contact(new Contact("feng", "http://feng.com", "308266103@qq.com"))
.build();
}
}
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下:
因为我们没有消费者直接去正常队列中进行消费,所以消息TTL后直接进入死信队列,再由消费者消费
package com.feng.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author Feng
* @Date 2022/11/25 17:06
* @Version 1.0
* @Description 具有存活时间的队列配置类
*/
@Configuration
public class TtlQueueConfig {
//普通交换机名称
public static final String NORMAL_EXCHANGE = "X";
//死信交换机名称
public static final String DEAD_EXCHANGE = "Y";
//普通队列名称
public static final String NORMAL_QUEUE_A = "QA";
public static final String NORMAL_QUEUE_B = "QB";
//死信队列名称
public static final String DEAD_QUEUE = "QD";
//声明普通交换机
@Bean("xExchange")//别名,不设置默认是方法名
public DirectExchange xExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
//声明死信交换机
@Bean("yExchange")//别名,不设置默认是方法名
public DirectExchange yExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
//声明普通队列,有TTL 10S
@Bean("queueA")//别名,不设置默认是方法名
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>(3);
//过期TTL
arguments.put("x-message-ttl", 10000);
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信交换机的RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder
//是否持久化
.durable(NORMAL_QUEUE_A)
.withArguments(arguments)
.build();
}
@Bean("queueB")//别名,不设置默认是方法名
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>(3);
//过期TTL
arguments.put("x-message-ttl", 40000);
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信交换机的RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder
//是否持久化
.durable(NORMAL_QUEUE_B)
.withArguments(arguments)
.build();
}
//声明死信队列``````````````````````````````
@Bean("queueD")//别名,不设置默认是方法名
public Queue queueD() {
return QueueBuilder
//是否持久化
.durable(DEAD_QUEUE)
.build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列 B 绑定 X 交换机
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XB");
}
// 声明队列 D 绑定 Y交换机
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueA,
@Qualifier("yExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("YD");
}
}
package com.feng.springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
/**
* @Author Feng
* @Date 2022/11/25 17:53
* @Version 1.0
* @Description 发送延迟消息接口
*/
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable("message") String msg){
log.info("当前时间:{},发送一条信息给两个TTL队列,内容是:{}",new Date().toString(),msg);
rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10秒的队列:"+msg);
rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40秒的队列:"+msg);
}
}
package com.feng.springbootrabbitmq.consumer;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Author Feng
* @Date 2022/11/25 18:46
* @Version 1.0
* @Description TTL队列消费者
*
* 消息是通过监听的方式进行消费的
*/
@Component
@Slf4j
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receviD(Message message, Channel channel){
String msg = new String(message.getBody());
log.info("当前时间:{},收到死信队列的消息,内容是:{}",new Date().toString(),msg);
}
}
报错信息
原因
springboot 版本过高,使swagger 异常,或者说是引入的swagger版本过高导致的问题,或者说是springboot2.6.0更新以后引起的问题。springboot2.6.x之后将spring MVC默认路径匹配策略从ANT_PATH_MATCHER模式改为PATH_PATTERN_PARSER模式导致出错,解决方法是切换会原先的ANT_PATH_MATCHER模式,这个路径通配规则就是一种MVC对路径匹配的要求,在2.6之后更加严格,假如我们定义了一个’/hello‘接口,默认情况下,我们可以按照/hello来访问页面,也可以按照/hello.do这样带有“.do”后缀的接口来访问资源。但是在2.6之后就不行了,它严格了
解决方案
配置文件将SpringMvc的通配路径改回原来的
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
测试结果
浏览器输入:http://localhost:8080/ttl/sendMsg/嘻嘻嘻
在这里新增了一个队列 QC,绑定关系如下,该队列不设置TTL 时间,我们由生产者指定消息过期时间,不用频繁去对列里进行指定
package com.feng.springbootrabbitmq.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author Feng
* @Date 2022/11/25 17:06
* @Version 1.0
* @Description 具有存活时间的队列配置类
*/
@Configuration
public class TtlQueueConfig {
//普通交换机名称
public static final String NORMAL_EXCHANGE = "X";
//死信交换机名称
public static final String DEAD_EXCHANGE = "Y";
//普通队列名称
public static final String NORMAL_QUEUE_A = "QA";
public static final String NORMAL_QUEUE_B = "QB";
//通用普通队列
public static final String NORMAL_QUEUE_C = "QC";
//死信队列名称
public static final String DEAD_QUEUE = "QD";
//声明普通交换机
@Bean("xExchange")//别名,不设置默认是方法名
public DirectExchange xExchange() {
return new DirectExchange(NORMAL_EXCHANGE);
}
//声明死信交换机
@Bean("yExchange")//别名,不设置默认是方法名
public DirectExchange yExchange() {
return new DirectExchange(DEAD_EXCHANGE);
}
//声明普通队列,有TTL 10S
@Bean("queueA")//别名,不设置默认是方法名
public Queue queueA() {
Map<String, Object> arguments = new HashMap<>(3);
//过期TTL
arguments.put("x-message-ttl", 10000);
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信交换机的RoutingKey
arguments.put("x-dead-letter-routing-key", "YD");
return QueueBuilder
//是否持久化
.durable(NORMAL_QUEUE_A)
.withArguments(arguments)
.build();
}
@Bean("queueB")//别名,不设置默认是方法名
public Queue queueB() {
Map<String, Object> arguments = new HashMap<>(3);
//过期TTL
arguments.put("x-message-ttl", 40000);
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信交换机的RoutingKey
arguments.put("x-dead-letter-routing-key", "YC");
return QueueBuilder
//是否持久化
.durable(NORMAL_QUEUE_B)
.withArguments(arguments)
.build();
}
//声明死信队列
@Bean("queueD")//别名,不设置默认是方法名
public Queue queueD() {
return QueueBuilder
//是否持久化
.durable(DEAD_QUEUE)
.build();
}
//通用普通队列
@Bean("queueC")//别名,不设置默认是方法名
public Queue queueC() {
Map<String, Object> arguments = new HashMap<>(3);
//设置队列的死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置死信交换机的RoutingKey
arguments.put("x-dead-letter-routing-key", "XC");
return QueueBuilder
//是否持久化
.durable(NORMAL_QUEUE_C)
.withArguments(arguments)
.build();
}
// 声明队列 A 绑定 X 交换机
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
// 声明队列 B 绑定 X 交换机
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
// 声明队列 C 绑定 X 交换机
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
// 声明队列 D 绑定 Y交换机
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange xExchange) {
return BindingBuilder.bind(queueD).to(xExchange).with("YD");
}
}
package com.feng.springbootrabbitmq.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
/**
* @Author Feng
* @Date 2022/11/25 17:53
* @Version 1.0
* @Description 发送延迟消息接口
*/
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
/**
* 发送消息
*/
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable("message") String msg){
log.info("当前时间:{},发送一条信息给两个TTL队列,内容是:{}",new Date().toString(),msg);
rabbitTemplate.convertAndSend("X","XA","消息来自TTL为10秒的队列:"+msg);
rabbitTemplate.convertAndSend("X","XB","消息来自TTL为40秒的队列:"+msg);
}
/**
* 有指定TTL的发消息
*/
@GetMapping("/sendExpirationMsg/{message}/{ttl}")
public void sendMsg(@PathVariable("message") String msg,@PathVariable("ttl") String ttl){
log.info("当前时间:{},发送一条时长是:{}毫秒的信息给通用TTL队列,内容是:{}",new Date().toString(),ttl,msg);
rabbitTemplate.convertAndSend("X","XC",msg,message->{
//在生产端设置发送消息的延迟时长
message.getMessageProperties().setExpiration(ttl);
return message;
});
}
}
测试
http://localhost:8080/ttl/sendExpirationMsg/come on baby1/20000
http://localhost:8080/ttl/sendExpirationMsg/come on baby1/2000
问题思考
因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行,因为这是队列的特性,先进先出,解决方案使用插件
由于上面的问题,如果不能实现在消息粒度上的TTL,并使其在设置的TTL时间及时死亡,就无法设计成一个通用的延时队列。那如何解决呢,接下来我们就去解决该问题。
github下载链接(版本3.8.0):
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
使用教程:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
与所有第 3 方插件一样,该.ez文件必须放在节点的插件目录 中,并且可以被 RabbitMQ 进程的有效用户读取,要找出插件目录是什么,可以使用下面的命令:
rabbitmq-plugins directories -s
下载 rabbitmq_delayed_message_exchange 插件,放置到 RabbitMQ 的插件目录: /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
安装插件
#安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启MQ服务
systemctl restart rabbitmq-server
成功截图
禁用插件,但请注意,所有未发送的延迟消息都将丢失。
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
原有消息TTL是在队列里进行的,然后因为队列特性,所以没法做到时间排序
现在是在交换机里进行时间的TTL,再分发给队列
这里新增了一个队列delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下:
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中。
配置类代码
package com.feng.springbootrabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author Feng
* @Date 2022/11/26 14:14
* @Version 1.0
* @Description 插件实现延迟队列的配置类
*/
@Configuration
public class DelayedQueueConfig {
//延迟交换机名
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
//普通队列名
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
//路由Key
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//声明队列
@Bean
public Queue delayQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
//声明延迟交换机, CustomExchange:自定义交换机类
@Bean
public CustomExchange delayExchange(){
Map<String, Object> arguments = new HashMap<>();
//表示是direct的路由方式
arguments.put("x-delayed-type", "direct");
//x-delayed-message:表示这个交换机是延迟交换机
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,arguments);
}
//绑定,因为是自定义的交换机,所以最后需要 noargs()因为with返回的不是Bind
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayQueue") Queue queue,
@Qualifier("delayExchange") CustomExchange delayedExchange) {
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者
package com.feng.springbootrabbitmq.controller;
import com.feng.springbootrabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.Date;
import static com.feng.springbootrabbitmq.config.DelayedQueueConfig.DELAYED_ROUTING_KEY;
/**
* @Author Feng
* @Date 2022/11/25 17:53
* @Version 1.0
* @Description 发送延迟消息接口
*/
@RestController
@RequestMapping("/ttl")
@Slf4j
public class SendMsgController {
/**
* 基于插件的延迟的生产者
*/
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME,
DELAYED_ROUTING_KEY, message,
correlationData -> {
//设置消息TTL
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发 送 一 条 延 迟 {} 毫秒的信息给队列 delayed.queue:{}", new Date(), delayTime, message);
}
}
消费者
package com.feng.springbootrabbitmq.consumer;
import com.feng.springbootrabbitmq.config.DelayedQueueConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* @Author Feng
* @Date 2022/11/26 14:47
* @Version 1.0
* @Description 基于插件的延迟队列消费者
*/
@Component
@Slf4j
public class DelayQueueConsumer {
//监听消息进行消费
@RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
public void receiveDelayQueue(Message message){
String msg = new String(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}", new Date().toString(), msg);
}
}
测试
http://localhost:8080/ttl/sendDelayMsg/come on baby1/20000
http://localhost:8080/ttl/sendDelayMsg/come on baby2/2000
使用 RabbitMQ 来实现延时队列可以很好的利用RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为
单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景