• RabbitMq优先级队列实现


    1.springboot配置

    设置prefetch=2,就是对应图上的unached的值。acknowledge-mode=manual是设置模式。其他人都是设置simple.acknowledge-mode=manual,而我的是设置direct.acknowledge-mode才好用。

    配置参数参考地址:
    https://blog.csdn.net/hzzdecsdn/article/details/123548022

    spring.rabbitmq.listener.simple.concurrency=1
    spring.rabbitmq.listener.simple.max-concurrency=2
    spring.rabbitmq.listener.simple.prefetch=2
    spring.rabbitmq.listener.direct.acknowledge-mode=manual
    # spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述

    2.代码初始化队列

    代码中的 args.put(“x-max-priority”, 10); 即为设为带优先级队列的标志。本人项目中是用RabbitAdmin初始化的队列。下面的代码,这里只提供个参考。

        private void bindQ(TopicExchange topicExchange, String queuename, String routingKey, RabbitAdmin rabbitAdmin) {
            Map args = new HashMap<>();
            if (queuename.contains("hky")) {
                args.put("x-max-priority", 10);
            }
            Queue queue = new Queue(queuename, true, false, false, args);
    
            rabbitAdmin.declareQueue(queue);
    
            rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(routingKey));
    
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    如此初始化出来的队列,就会多出来个Pri的标记。说明创建优先级队列成功了。
    在这里插入图片描述
    当然,也可以用控制台创建优先级队列,如下图:
    在这里插入图片描述

    3.代码中设置优先级——生产者

    生产者发送消息,设置优先级。代码中的setPriority(5)即为优先级信息。越大就越提前执行。
    比如我设置了发了3条优先级为5的消息,消息状态为Ready状态,接着发了一个优先级为10的消息,那么Ready中的消息会把优先级为10的排到最前面,最先消费,这样就实现了插队。
    (注意:必须是Ready状态,如果都在unacked状态,说明springboot配置prefetch=2未生效)

    rabbitTemplate.convertAndSend("hky", "syp1032.hky", msg, message -> {
        message.getMessageProperties().setPriority(5);
        return message;
    });
    
    • 1
    • 2
    • 3
    • 4

    关于发消息这里,我看只要写优先级队列的人,都用amqpTemplate,主要基本都一个模子刻出来的,有点无语。。。像下面这种的写法的,如果是这种方式的,我没用过,但应该也是可以的吧。

    amqpTemplate.convertAndSend(queue, payload, message -> {
    	message.getMessageProperties().setPriority(priority);
    	return message;
    });
    
    • 1
    • 2
    • 3
    • 4

    4.消费者

    @Component
    public class TestReceiverListener {
    
    //    @RabbitListener(queues = {"syp1032.hky","syp1032.hkyfast"}, ackMode = "MANUAL", concurrency = "1")
        @RabbitListener(queues = "syp1032.hky")
        @Transactional(rollbackOn = Throwable.class)
        public void process(MessageRequestDto dto, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception {
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
    
                e.printStackTrace();
            }
            String parameter = dto.getParameter();
            System.out.println(parameter);
    //        try {
    //            channel.basicAck(deliveryTag, false);
    //        } catch (IOException e) {
    //            // TODO Auto-generated catch block
    //            e.printStackTrace();
    //        }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    MessageRequestDto是自己封装的消息对象,用自己的即可没啥好说的。代码也很简单,就是睡10秒,然后打印消息内容。

    @RabbitListener注解中的ackMode = “MANUAL” 这部分很迷,因为前期框架不是我弄的,如果设置了它,就不自动ack了,就需要代码中手动确认,进行如下调用

    channel.basicAck(deliveryTag, false);

    当然,如果配置了acknowledge-mode=manual也不自动ack了,也可以代码中手动使用channel的basicAck方法来手动ack。虽然我这里没出现这种情况就是了。

    不ack会出现像下图那样,消息就卡住了,消费不了了。
    在这里插入图片描述

  • 相关阅读:
    【软考】5.1 七层模型/局域网/TCP-IP协议
    Docker--容器挂载
    一个新工具 nolyfill
    第 394 场 LeetCode 周赛题解
    Spring Boot 自动装配
    数理统计笔记2:总体均值的抽样分布
    蛋白类产品改性 荧光标记;偶连抗体多肽或者聚合物或其他小分子偶连蛋白文章介绍
    vue+leaflet集成echarts实现动态轨迹图
    喷码机如何与封箱机配合?
    使用mysql语句对分组结果进行再次筛选
  • 原文地址:https://blog.csdn.net/qq_20741133/article/details/126724406