延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。
例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?
你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚22点准时参加会有。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟)就会通知提醒参会人员做好参会准备,会议马上开始...
同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾的是,在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列”来变相的实现。
可以使用rabbitmq_delayed_message_exchange插件实现。
这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息在延时交换机里(x-delayed-message exchange)。
下载插件
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装插件:
将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins
启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启rabbitmq-server
systemctl restart rabbitmq-server
SpringBoot代码案例
(1)xml配置文件与properties连接配置
- <dependencies>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-testartifactId>
- <scope>testscope>
- dependency>
- <dependency>
- <groupId>org.springframework.amqpgroupId>
- <artifactId>spring-rabbit-testartifactId>
- <scope>testscope>
- dependency>
- dependencies>
- spring.application.name=delayed_exchange
- spring.rabbitmq.host=192.168.80.121
- spring.rabbitmq.virtual-host=/
- spring.rabbitmq.username=root
- spring.rabbitmq.password=123456
- spring.rabbitmq.port=5672
- # 设置手动确认消息
- #spring.rabbitmq.listener.simple.acknowledge-mode=manual
(2)SpringBootApplication主入口类
- package com.lagou.rabbit.demo;
-
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
-
- @SpringBootApplication
- public class Demo02RabbitmqDelayedApplication {
-
- public static void main(String[] args) {
- SpringApplication.run(Demo02RabbitmqDelayedApplication.class, args);
- }
-
- }
(3)RabbitMQ的对象配置
- package com.lagou.rabbit.demo.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.annotation.EnableRabbit;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.core.RabbitAdmin;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.ComponentScan;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- @Configuration
- @EnableRabbit
- @ComponentScan("com.lagou.rabbit.demo")
- public class RabbitConfig {
-
- @Bean
- public Queue queue() {
- return new Queue("queue.delayed", true, false, false, null);
- }
-
- @Bean
- public Exchange exchange() {
- Map
arguments = new HashMap<>(); - // 使用x-delayed-type指定交换器的类型
- arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
- // 使用x-delayed-message表示使用delayed exchange插件处理消息
- return new CustomExchange("ex.delayed", "x-delayed-message", true, false, arguments);
- }
-
- @Bean
- public Binding binding() {
- return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
- }
-
- @Bean
- @Autowired
- public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
- return new RabbitAdmin(factory);
- }
-
- @Bean
- @Autowired
- public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
- return new RabbitTemplate(factory);
- }
-
- @Bean
- @Autowired
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- return factory;
- }
- }
(4)使用推消息模式接收延迟队列的广播
- package com.lagou.rabbit.demo.listener;
-
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import com.rabbitmq.client.Channel;
-
- import java.io.IOException;
-
- @Component
- public class MyMeetingListener {
- @RabbitListener(queues = "queue.delayed")
- public void onMessage(Message message, Channel channel) throws IOException {
- System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
-
- // 消息确认
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- }
- }
(5)开发RestController,用于向延迟队列发送消息,并指定延迟的时长
- package com.lagou.rabbit.demo.controller;
-
- import org.springframework.amqp.core.*;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.PathVariable;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- import java.nio.charset.StandardCharsets;
-
- @RestController
- public class DelayedController {
- @Autowired
- private AmqpTemplate amqpTemplate;
-
- @RequestMapping("/meeting/{second}")
- public String bookMeeting(@PathVariable Integer second) {
-
- // RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
- // 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
- // 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
- // 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
- MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
- // 设置消息的过期时间
- .setHeader("x-delay", (second - 10) * 1000)
- .setContentEncoding("utf-8")
- .build();
-
- Message message = MessageBuilder.withBody("还有10s开始开会了".getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
-
- // 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
- // rabbitTemplate.convertAndSend("ex.delayed", "key.delayed", message, msg -> {
- // 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
- // 当消息转换完,设置消息头字段
- // msg.getMessageProperties().setHeader("x-delay", (seconds - 5) * 1000);
- // return msg;
- // });
-
- amqpTemplate.send("ex.delayed","key.delayed",message);
-
- return "会议订好了";
- }
- }
(6)访问,然后查看输出