• RabbitMQ初步到精通-第七章-RabbitMQ之延迟队列


    目录

    第七章-RabbitMQ之延迟队列

    1. 延迟队列概念

    2. 应用场景

    3. 架构模式

    3.1 队列TTL实现

    3.2 消息TTL实现

    3.3 插件实现

    4. 代码验证

    5. 总结


    第七章-RabbitMQ之延迟队列

    1. 延迟队列概念

    延迟-意即 非实时,之前我们讨论大部分的案例都是生产者将消息发送至Broker,消费者立即进行了消费,若消费者消费能力小于生产者生产能力,产生了消息堆积,也会产生延迟。但这种延迟不是我们主观要求的延迟。

    此次涉及到的延迟-是在生产者发消息时,就明确预知的,会产生延迟消费,而且延迟的时间也是设定好的。消息会暂存到queue中,等待预设时间到达后,再即时触发消费。

    2. 应用场景

    所有发出的消息不想立即消费的场景,例如:

    2.1 用户下单后30M未支付会取消订单

    2.2 用户下单后2M消息提醒其支付

    2.3 用户注册后2天内未登录,进行提醒

    .....

    当然我们可以通过定时扫描的方式,来实现,但定时扫描会存在控制不精准,若扫描的数据量大对性能有影响。

    优雅的方式可以引入延迟队列。

    3. 架构模式

    实现延迟队列的话,我们考虑两种方式,第一种是通过TTL ,第二种通过rabbitmq的插件,TTL这部分已经在死信队列的篇章介绍过了,与之前死信队列的TTL实现是一致的:

    RabbitMQ初步到精通-第六章-RabbitMQ之死信队列_Mr-昊哥的博客-CSDN博客

    3.1 队列TTL实现

     如上图,我们将队列1 TTL 设置为10s,且队列1不再有消费者进行消费,那生产者生产的消息都会在队列1中暂存10S,然后投递到死信交换机,再路由到队列2,最后被消费者2成功消费,这样就实现了 消息的延迟消费。而且通过队列设置TTL,消息的延迟都是准确的。

    问题:

    若我们的业务,并非单一的失效时间,存在多种失效时间,或失效的时间不是固定的,这样就会比较麻烦,我们不可能为每一种失效时间再去增加一个队列吧,队列3-TTL20s,队列4-TTL30s,队列5-TTL50s ... 

     那我们试着从消息入手,在消息发出的时候就设定好TTL。

    3.2 消息TTL实现

     这次是不是万无一失呢,队列1不再设置失效时间,发送的四条消息设置不同的时间,通过时间到期,自动转到队列2,消费者2成功去消费。

    问题:

    若按消息设置失效期,则会存在失效时间不准的情况。例如msg1 TTL 60s ,msg2 ttl 10s ,理论上是msg2先失效,结果是 msg1 60s失效后,再msg2失效。

    结论呢,RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。原因是,rabbitmq在等到消息投递给消费者的时候判断当前消息是否过期。

    那还有没有更好的办法,装插件,使用延迟的交换机,来实现。

    3.3 插件实现

    3.3.1 插件的安装

    rabbitmq-delayed-message-exchange

    官网下载插件:Community Plugins — RabbitMQ


    安装完成后:面板展示:

    3.3.2 使用延迟插件架构

     这次又回到最经典的模式了,生产者->交换机->队列->消费者,

    只是交换机是一个特殊的 延迟交换机而已。

    延迟交换机面板:

     核心的不同是声明交换机的时候:

    1. 绑定direct类型 与 延迟交换机参数:

    1. Map argMap = new HashMap<>();
    2. argMap.put("x-delayed-type", "direct");

    2. 声明交换机的时候:

    channel.exchangeDeclare(PLUGINS_EXCHANGE, "x-delayed-message", true, false, argMap );
    

    4. 代码验证

    代码验证不再对TTL进行验证,可以参考前一章节 死信队列的内容

    针对插件验证:

    生产者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description -
    5. * @createTime 2022/07/27 19:34:00
    6. */
    7. public class PluginsDelayProducer {
    8. public static String PLUGINS_EXCHANGE = "plugins.exchange";
    9. public static String PLUGINS_ROUTING_KEY = "plugins";
    10. public static String PLUGINS_QUEUE = "plugins.queue";
    11. //生产者
    12. public static void main(String[] args) throws Exception {
    13. //1、获取connection
    14. Connection connection = RabbitCommonConfig.getConnection();
    15. //2、创建channel
    16. Channel channel = connection.createChannel();
    17. List delayedTimes = Arrays.asList(5, 2, 3, 4, 1);
    18. for (Integer delayedTime : delayedTimes) {
    19. sendMsg(channel, delayedTime);
    20. }
    21. //4、关闭管道和连接
    22. channel.close();
    23. connection.close();
    24. }
    25. private static void sendMsg(Channel channel, Integer delayedTime) throws IOException, InterruptedException {
    26. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    27. String content = String.format("消息时间:[%s],延时[%d]s", sdf.format(new Date()), delayedTime);
    28. byte[] msg = content.getBytes(StandardCharsets.UTF_8);
    29. Map headers = new HashMap<>();
    30. headers.put("x-delay", delayedTime * 1000);
    31. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).build();
    32. channel.basicPublish(PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY, properties, msg);
    33. System.out.println("消息发送完成:" + content);
    34. }
    35. }

    消费者:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description
    5. * @createTime 2022/11/17 16:53:00
    6. */
    7. public class PluginsDelayConsumer {
    8. public static String PLUGINS_EXCHANGE = "plugins.exchange";
    9. public static String PLUGINS_ROUTING_KEY = "plugins";
    10. public static String PLUGINS_QUEUE = "plugins.queue";
    11. public static void main(String[] args) throws IOException, TimeoutException {
    12. //1、获取连对象、
    13. Connection connection = RabbitCommonConfig.getConnection();
    14. //2、创建channel
    15. Channel channel = connection.createChannel();
    16. // 延迟交换机参数
    17. Map delayParams = getNormalAndDeadParams();
    18. // 4.声明一个队列与交换机及绑定关系
    19. handleQueueAndBinding(channel, PLUGINS_QUEUE, delayParams, PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY);
    20. channel.basicQos(1);
    21. //5.开启监听Queue
    22. DefaultConsumer consumer = new DefaultConsumer(channel) {
    23. @Override
    24. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    25. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    26. String content = sdf.format(new Date());
    27. System.out.println("延迟消费者接收消息: " + new String(body, "UTF-8") + "当前时间: " + content);
    28. channel.basicAck(envelope.getDeliveryTag(), false);
    29. }
    30. };
    31. channel.basicConsume(PLUGINS_QUEUE, false, consumer);
    32. System.out.println("延迟消费者启动接收消息......");
    33. //5、键盘录入,让程序不结束!
    34. System.in.read();
    35. //6、释放资源
    36. channel.close();
    37. connection.close();
    38. }
    39. private static Map getNormalAndDeadParams() {
    40. Map argMap = new HashMap<>();
    41. argMap.put("x-delayed-type", "direct");
    42. return argMap;
    43. }
    44. /**
    45. * 处理队列与绑定关系
    46. *
    47. * @param channel
    48. * @param deadQueueName
    49. * @param o
    50. * @param exchangeName
    51. * @param routingKey
    52. * @throws IOException
    53. */
    54. private static void handleQueueAndBinding(Channel channel, String deadQueueName, Map o, String exchangeName, String routingKey) throws IOException {
    55. channel.exchangeDeclare(PLUGINS_EXCHANGE, "x-delayed-message", true, false, o);
    56. channel.queueDeclare(PLUGINS_QUEUE, true, false, false, new HashMap<>());
    57. channel.queueBind(PLUGINS_QUEUE, PLUGINS_EXCHANGE, PLUGINS_ROUTING_KEY);
    58. }
    59. }

    5. 总结

    总的来说实现延迟队列有3种形式:

    1. 队列TTL

    2.消息TTL

    3. 安装延迟插件

    使用TTL会有限制且不通用,架构也相对复杂,但也有一些业务失效时间是明确的也可以使用。

    使用插件会相对简单,但有些公司,中间件是独立管理的,安装插件还需要沟通,也不一定能够同意安装。

    所已,还是因地制宜。适合的就是最好的!

  • 相关阅读:
    爬虫-获取数据bs4
    使用 matlab 的 Robotics ToolBox 完成5自由雄克机械臂的建模与运动学仿真,并计算8个点的运动轨迹,绘制运动动画
    (6)SpringMVC中使用CharacterEncodingFilter编码过滤器处理请求和响应的乱码问题
    C++多态基础
    Vue框架--理解MVVM
    MATLAB画三维曲面(surf,mesh)以及不规则meshgrid
    Web APIs 第03天上
    Golang学习笔记—结构体
    【老生谈算法】matlab实现遗传算法在调节控制系统参数中的应用——遗传算法
    Mysql的group_concat函数长度限制
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127945348