比如未付款订单,超过一定时间后,系统自动取消订单并释放占有物品。
spring的schedule定时任务轮询数据库
缺点:消耗系统内存,增加了数据库的压力、存在较大时间误差
解决:rabbitmq的消息TTL和死信Exchange结合
消息的TTL就是消息的存活时间
RabbitMQ可以对队列和消息分别设置TTL
延时队列实现-1
延时队列实现-2
使用延时队列-1,给整个消息队列设置过期时间
1、声明一个队列,并且指定TTL
2、要给队列设置超时时间,需要在声明队列时配置x-message-ttl属性
3、声明交换机,将ttl与交换机绑定
注意,这个队列设定了死信交换机为dl.ttl.direct
- package com.hdb.pingmoweb.order.mq;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import javax.annotation.PostConstruct;
-
- @Slf4j
- @Configuration
- public class MyRabbitConfig {
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @PostConstruct
- public void initRabbitTemplate(){
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- // 投递失败,记录日志
- log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
- replyCode, replyText, exchange, routingKey, message.toString());
- // 如果有业务需要,可以重发消息
- });
- }
-
- @Bean
- public Queue ttlQueue(){
- return QueueBuilder.durable("ttl.queue")
- .withArgument("x-message-ttl",60000)
- .withArgument("x-dead-letter-exchange","dl.ttl.direct")
- // 指定队列名称,并持久化
- //.ttl(10000) // 设置队列的超时时间,10秒
- //.deadLetterExchange("dl.ttl.direct") // 指定死信交换机
- .build();
- }
-
- @Bean
- public DirectExchange ttlExchange(){
- return new DirectExchange("ttl.direct");
- }
-
- @Bean
- public Binding ttlBinding(){
- return BindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");
- }
-
- }
4、接收超时死信的死信交换机
在consumer服务的SpringRabbitListener中,定义一个新的消费者,并且声明 死信交换机、死信队列
- package com.hdb.pingmoweb.order.mq;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Slf4j
- @Component
- public class SpringRabbitListener {
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue(name = "dl.ttl.queue", durable = "true"),
- exchange = @Exchange(name = "dl.ttl.direct"),
- key = "ttl"
- ))
- public void listenDlQueue(String msg) {
- log.info("接收到 dl.ttl.queue的延迟消息:{}", msg);
- }
-
- }
5、发送消息,但是不要指定TTL
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @RequestMapping("/ttl/{msg}")
- public R testTTLMsg(@PathVariable String msg){
- // 消息ID,需要封装到CorrelationData中
- CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
- // 发送消息
- rabbitTemplate.convertAndSend("ttl.direct", "ttl", msg, correlationData);
- // 记录日志
- log.debug("发送消息成功");
- return R.ok();
- }