目录
延迟-意即 非实时,之前我们讨论大部分的案例都是生产者将消息发送至Broker,消费者立即进行了消费,若消费者消费能力小于生产者生产能力,产生了消息堆积,也会产生延迟。但这种延迟不是我们主观要求的延迟。
此次涉及到的延迟-是在生产者发消息时,就明确预知的,会产生延迟消费,而且延迟的时间也是设定好的。消息会暂存到queue中,等待预设时间到达后,再即时触发消费。
所有发出的消息不想立即消费的场景,例如:
2.1 用户下单后30M未支付会取消订单
2.2 用户下单后2M消息提醒其支付
2.3 用户注册后2天内未登录,进行提醒
.....
当然我们可以通过定时扫描的方式,来实现,但定时扫描会存在控制不精准,若扫描的数据量大对性能有影响。
优雅的方式可以引入延迟队列。
实现延迟队列的话,我们考虑两种方式,第一种是通过TTL ,第二种通过rabbitmq的插件,TTL这部分已经在死信队列的篇章介绍过了,与之前死信队列的TTL实现是一致的:
RabbitMQ初步到精通-第六章-RabbitMQ之死信队列_Mr-昊哥的博客-CSDN博客
如上图,我们将队列1 TTL 设置为10s,且队列1不再有消费者进行消费,那生产者生产的消息都会在队列1中暂存10S,然后投递到死信交换机,再路由到队列2,最后被消费者2成功消费,这样就实现了 消息的延迟消费。而且通过队列设置TTL,消息的延迟都是准确的。
问题:
若我们的业务,并非单一的失效时间,存在多种失效时间,或失效的时间不是固定的,这样就会比较麻烦,我们不可能为每一种失效时间再去增加一个队列吧,队列3-TTL20s,队列4-TTL30s,队列5-TTL50s ...
那我们试着从消息入手,在消息发出的时候就设定好TTL。
这次是不是万无一失呢,队列1不再设置失效时间,发送的四条消息设置不同的时间,通过时间到期,自动转到队列2,消费者2成功去消费。
问题:
若按消息设置失效期,则会存在失效时间不准的情况。例如msg1 TTL 60s ,msg2 ttl 10s ,理论上是msg2先失效,结果是 msg1 60s失效后,再msg2失效。
结论呢,RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。原因是,rabbitmq在等到消息投递给消费者的时候判断当前消息是否过期。
那还有没有更好的办法,装插件,使用延迟的交换机,来实现。
3.3.1 插件的安装
rabbitmq-delayed-message-exchange
官网下载插件:Community Plugins — RabbitMQ
安装完成后:面板展示:
3.3.2 使用延迟插件架构
这次又回到最经典的模式了,生产者->交换机->队列->消费者,
只是交换机是一个特殊的 延迟交换机而已。
延迟交换机面板:
核心的不同是声明交换机的时候:
1. 绑定direct类型 与 延迟交换机参数:
- Map
argMap = new HashMap<>(); - argMap.put("x-delayed-type", "direct");
2. 声明交换机的时候:
channel.exchangeDeclare(PLUGINS_EXCHANGE, "x-delayed-message", true, false, argMap );
代码验证不再对TTL进行验证,可以参考前一章节 死信队列的内容
针对插件验证:
生产者:
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description -
- * @createTime 2022/07/27 19:34:00
- */
- public class PluginsDelayProducer {
-
- public static String PLUGINS_EXCHANGE = "plugins.exchange";
- public static String PLUGINS_ROUTING_KEY = "plugins";
- public static String PLUGINS_QUEUE = "plugins.queue";
-
- //生产者
- public static void main(String[] args) throws Exception {
- //1、获取connection
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
- List
delayedTimes = Arrays.asList(5, 2, 3, 4, 1); - for (Integer delayedTime : delayedTimes) {
- sendMsg(channel, delayedTime);
- }
- //4、关闭管道和连接
- channel.close();
- connection.close();
- }
-
- private static void sendMsg(Channel channel, Integer delayedTime) throws IOException, InterruptedException {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String content = String.format("消息时间:[%s],延时[%d]s", sdf.format(new Date()), delayedTime);
- byte[] msg = content.getBytes(StandardCharsets.UTF_8);
- Map
headers = new HashMap<>(); - headers.put("x-delay", delayedTime * 1000);
- AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
- channel.basicPublish(PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY, properties, msg);
- System.out.println("消息发送完成:" + content);
- }
-
- }
消费者:
-
- /**
- * @author rabbit
- * @version 1.0.0
- * @Description
- * @createTime 2022/11/17 16:53:00
- */
- public class PluginsDelayConsumer {
-
- public static String PLUGINS_EXCHANGE = "plugins.exchange";
- public static String PLUGINS_ROUTING_KEY = "plugins";
- public static String PLUGINS_QUEUE = "plugins.queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- //1、获取连对象、
- Connection connection = RabbitCommonConfig.getConnection();
- //2、创建channel
- Channel channel = connection.createChannel();
-
- // 延迟交换机参数
- Map
delayParams = getNormalAndDeadParams(); -
- // 4.声明一个队列与交换机及绑定关系
- handleQueueAndBinding(channel, PLUGINS_QUEUE, delayParams, PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY);
-
- channel.basicQos(1);
-
- //5.开启监听Queue
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- String content = sdf.format(new Date());
- System.out.println("延迟消费者接收消息: " + new String(body, "UTF-8") + "当前时间: " + content);
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
-
- channel.basicConsume(PLUGINS_QUEUE, false, consumer);
-
- System.out.println("延迟消费者启动接收消息......");
-
- //5、键盘录入,让程序不结束!
- System.in.read();
-
- //6、释放资源
- channel.close();
- connection.close();
-
- }
-
- private static Map
getNormalAndDeadParams() { - Map
argMap = new HashMap<>(); - argMap.put("x-delayed-type", "direct");
- return argMap;
- }
-
- /**
- * 处理队列与绑定关系
- *
- * @param channel
- * @param deadQueueName
- * @param o
- * @param exchangeName
- * @param routingKey
- * @throws IOException
- */
- private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map
o, String exchangeName, String routingKey) throws IOException { - channel.exchangeDeclare(PLUGINS_EXCHANGE, "x-delayed-message", true, false, o);
- channel.queueDeclare(PLUGINS_QUEUE, true, false, false, new HashMap<>());
- channel.queueBind(PLUGINS_QUEUE, PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY);
- }
-
- }
总的来说实现延迟队列有3种形式:
1. 队列TTL
2.消息TTL
3. 安装延迟插件
使用TTL会有限制且不通用,架构也相对复杂,但也有一些业务失效时间是明确的也可以使用。
使用插件会相对简单,但有些公司,中间件是独立管理的,安装插件还需要沟通,也不一定能够同意安装。
所已,还是因地制宜。适合的就是最好的!