• 536、RabbitMQ详细入门教程系列 -【消息与队列进阶 RabbitMQ(二)】 2022.07.29


    一、Priority优先级

    生活中尊卑有序、长幼有别,技术上自然存在优先级概念,比如滴滴打车那种加钱插队操作就可以使用消息优先级实现。上一节中提到了优先级属性priority,看过我前面Java队列的文章其中就有优先级队列ProrityBlockingQueue,当然这只是联想而已,RabbitMQ中实现消息优先级还是依赖于对象BasicProperties

    	AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
    											  .Builder()
    											  .priority(9)
    											  .build();
    	channel.basicPublish(exchangeName,routingKey,basicProperties,message.getBytes());
    
    • 1
    • 2
    • 3
    • 4
    • 5

    数字越大优先级别越高,当然这个范围一般在0-9之间即可。测试时发送不同优先级消息到队列中消费查看结果即可,下图所示发现只是按照消息插入的顺序消费,优先级并未生效,你在搞什么飞机?稍安勿躁,若实现消息优先级则必须设置队列为优先级队列。

    在这里插入图片描述
    设置优先级队列操作在队列实例化时通过参数map实现,前面一直展示代码时queueDeclare()第五个参数都设置为null,其实该参数为Map集合,表示可以通过key-value的形式设置队列的其它属性。其中优先级key为x-max-priority,value表示优先级最大数值。通过控制台可以看到队列属性多了Pri标志,表示该队列为优先级队列。

            Map<String,Object> argumentMap = new HashMap<>();
            argumentMap.put("x-max-priority",10);
            channel.queueDeclare("queueName",true,false,false,argumentMap);
    
    • 1
    • 2
    • 3

    在这里插入图片描述
    再使用代码进行测试将会得到如下示例,消息生产是安装优先级倒序,即0-9进行投递。最后消费结果为9-0,表示优先级生效,且证明优先级策略为数值越大优先级越高
    在这里插入图片描述

    二、TTL 自动删除过期

    反正个人在学习这里时脑海中总是联想到Redis的TTL,实践场景个人理解可以用到具备一定时效性消息上。如上层业务调用短信交付模块功能,需要异步通过RabbitMQ通信交换消息,上层业务常见会Redis缓存生成校验码,且设置一定过期时效,若超过时限则用户接收到短信也是无用的。这时就可以考虑将Redis中的TTL与RabbitMQ中的TTL时效一致,保证用户接收到的验证短信都具备可用性
    自然,相对于Durable、Priority等特性需要队列与消息合作完成而言,TTL自动删除过期在这点上具有独立性。可以做如下三个方面设置:

    设置某个单独消息的过期时间
    设置某个队列的过期时间
    设置某个队列中消息过期时间(这样相当于统一设置队列中所有消息过期TTL)

    2.1 单消息TTL

    某条消息的TTL时间借助于BasicProperties 实例对象的expiration属性完成,单位为ms。其余的就不多赘述,测试时发送一条20s消息验证即可

    	AMQP.BasicProperties basicProperties = new AMQP.BasicProperties
    											  .Builder()
    											  .expiration("20000")
    											  .build();
    	channel.basicPublish(exchangeName,routingKey,basicProperties,message.getBytes());
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2.2 队列TTL

    某个队列长时间不进行操作后需要被删除则可以使用设置队列TTL时间实现,这个实现方式依旧采用Map方式,key为x-expires,value为过期时限,单位ms

            Map<String,Object> argumentMap = new HashMap<>();
            argumentMap.put("x-expires",2000);
            channel.queueDeclare("queueName",true,false,false,argumentMap);
    
    • 1
    • 2
    • 3

    最后通过面板可以看到队列属性中多了Exp标识,该标识标识队列为自动过期删除队列
    在这里插入图片描述

    2.3 队列消息TTL

    某个队列中所有消息过期时长一致,有必要在每个消息生产是设置单消息TTL?自然这时候就可以采用队列消息TTL策略实现,其实现不言而喻自然是依赖于队列实例化时设置

            Map<String,Object> argumentMap = new HashMap<>();
            argumentMap.put("x-message-ttl",2000);
            channel.queueDeclare("queueName",true,false,false,argumentMap);
    
    • 1
    • 2
    • 3

    通过控制面板可以观察到对列属性多了TTL标识,该标识表示队列中消息到期自动删除
    在这里插入图片描述

    三、MaxLength、MaxLengthBytes

    消息可以无限积压?可以传输任意大小消息?自然,RabbitMQ的设计考虑到这两点,可以通过在队列初始化的操作中设置。其实个人理解这两个属性作用意义不大,仁者见仁智者见智,知识完整性还是需要考虑的

    • key为x-max-length表示队列最大积压消息数量限制
    • key为x-max-length-bytes表示单消息最大字节数量
            Map<String,Object> argumentMap = new HashMap<>();
            argumentMap.put("x-max-length",1);
            argumentMap.put("x-max-length-bytes",1024);
            channel.queueDeclare("queueName",true,false,false,argumentMap);
    
    • 1
    • 2
    • 3
    • 4

    Lim标识标识队列限制最大消息积压数量,Lim B表示队列限制单消息最大字节数。两个策略可以同时设置,共同生效。测试发现当超过最大积压数量时会删除头部消息腾出空间存放新消息,这是RabbitMQ默认采用的策略drop-head
    在这里插入图片描述
    如果想实现拒绝新消息加入则可以使用参数x-overflow实现,key为x-overflow,value为reject-publish表示拒绝接收新消息加入队列。可以采用不同策略然后超过最大积压数量设置值,查看最后消费到的数据是否与策略实现理论结果一致

            argumentMap.put("x-overflow","reject-publish");
    
    • 1

    四、Dead-Letter 死信转发

    前面讲到单消息自动过期TTL策略实现使用BasicProperties类属性expiration即可,同时也说到了队列积压消息最大数量限制在队列实例化时依赖x-max-length属性实现,采用默认策略drop-head会删除头部消息。问题来了,这些操作都会导致消息丢失,这时候若想换个地方存储这些消息怎么办?可采用队列死信转发实现,场景如下:

    • 使用BasicProperties属性expiration设置的TTL到期自动删除消息
    • 使用x-max-length限制消息最大积压数量,且采用默认策略drop-head删除的头部消息
    • 后面讲消费者会讲到的消息确认机制中拒绝确认消息,且未将消息放回队列中删除的消息
            // 设置死信交换器
            channel.exchangeDeclare("deadExchange",BuiltinExchangeType.DIRECT, true,false,null);
            // 设置死信队列
            channel.queueDeclare("deadQueue",true,false,false,null);
            // 绑定死信交换器与队列
            channel.queueBind("deadQueue","deadExchange","deadBinding");
            // 设置正常队列的死信交换器
            Map<String,Object> argumentMap = new HashMap<>();
            argumentMap.put("x-dead-letter-exchange","deadExchange");
            argumentMap.put("x-dead-letter-routing-key","deadBinding");
            channel.queueDeclare("queueName",true,false,false,argumentMap);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    首先在控制面板上可以看到正常队列有DLX、DLK两个标识,表示该队列设置了死信交换器和死信路由routingKey。其次最终结果是TTL到期自动删除的消息转发到了deadQueue中。最后需要说明若不设置x-dead-letter-routing-key参数,死信交换器将采用消息自身携带的routingKey进行路由
    在这里插入图片描述

    五、参考链接

    [01] 消息与队列进阶 RabbitMQ(二)

  • 相关阅读:
    Docker中仓库、镜像和容器用法详解
    Nuxt.js的详细使用
    双周总结#002 - 红树林
    html与css知识点
    Spring Security 学习笔记
    文件的上传和下载
    Android 驾车出行路线规划
    异常情况下的生命周期
    详解KubeEdge EdgeMesh v1.15 边缘CNI特性
    java初探之代理模式
  • 原文地址:https://blog.csdn.net/youyouwuxin1234/article/details/126061656