• RabbitMQ的工作队列有哪些?带你玩转工作队列(可学习、可复习、可面试)


    🎁作者简介:在校大学生一枚,Java领域新星创作者,Java、Python正在学习中,期待和大家一起学习一起进步~
    💗个人主页:我是一棵卷心菜的个人主页
    🔶本文专栏:RabbitMQ学习
    📕自我提醒:多学多练多思考,编程能力才能节节高!

    一、准备工作

    • 为了方便后续代码的简洁性以及可读性,我们先准备一个工具类,代码如下:
    public class RabbitMqUtils {
        //创建连接的方法
        public static Connection getConnection() throws IOException, TimeoutException {
            //创建一个连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置一些参数
            factory.setHost("192.168.205.131");
            factory.setUsername("cabbage");
            factory.setPassword("cabbage");
            factory.setPort(5672);
            return factory.newConnection();
        }
        //得到一个连接的 channel
        public static Channel getChannel() throws Exception {
            return getConnection().createChannel();
        }
        //关闭channel
        public static void closeChannel() throws Exception {
            getChannel().close();
        }
        //关闭连接
        public static void closeConnection() throws IOException, TimeoutException {
            getConnection().close();
        }
    }
    

    二、channel方法讲解

    • 在学习工作队列的过程中,我们还会用到很多方法以及方法参数的使用,很有必要弄清楚!

    1、queueDeclare

    • 源码:
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
    
    • 参数讲解:
    参数含义注意点
    queue队列名称
    durable队列中的消息是否持久化队列的数据默认是存放到内存中的,但rabbitmq重启会丢失队列,如果想重启之后数据还存在就要使队列持久化 ( 设置durable为true ),保存到Erlang自带的数据库中,当rabbitmq重启之后会读取该数据库。
    autoDelete当最后一个消费者断开连接之后队列是否自动被删除如果生产者声明了一个queue,此队列从来没有消费者连接过,那么 即使consumers = 0,队列也不会自动删除
    arguments其它参数
    exclusive该队列是否只供一个消费者进行消费 or 当connection关闭时是否删除队列如果队列是排外的,会对当前队列加锁,exclusive等于true的话用于一个队列只能有一个消费者来消费的场景

    注意:

    rabbitmq消息持久化有两个步骤:一是队列持久化;二是消息持久化。当durable=true时,如果只设置该值,是把队列进行持久化(重启后消息还是会丢失);所以我们还应该在发送消息时对消息进行持久化。代码如下:

    //发送消息
    channel.basicPublish("交换机名称", "路由类型", MessageProperties.PERSISTENT_TEXT_PLAIN, "消息主体".getBytes());
    

    MessageProperties.PERSISTENT_TEXT_PLAIN就是对消息进行持久化,但仍需注意的是,即使设置了这两种操作,消息也不是100%会被持久化的!

    2、basicPublish

    • 源码:
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    
    • 参数讲解:
    参数含义
    exchange要将消息发送到交换机的名称
    routingKey路由key
    props一些配置信息
    body消息内容

    3、basicConsume

    • 源码:
    String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    
    • 参数讲解:
    参数含义
    queue消费队列的名称
    autoAck消费成功之后是否要自动应答
    deliverCallback当一个消息发送过来后的回调接口
    cancelCallback当一个消费者取消订阅时的回调接口

    注意:

    自动确认 autoAck = true 只要消息从队列中获取,无论消费者获取到消息后是否成功消费消息,都认为是消息已经成功消费。
    手动确认 autoAck = false 消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。


    三、简单模式

    在这里插入图片描述

    • 生产者代码:我们向消费者发送了消息你好,MessageQueue
    public class Producer {
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMqUtils.getConnection();
            Channel channel = RabbitMqUtils.getChannel();
            channel.queueDeclare("hello", false, false, false, null);
            channel.basicPublish("", "hello", null, "你好,MessageQueue".getBytes());
            // 释放资源
            channel.close();
            connection.close();
        }
    }
    

    当我们执行生产者代码,就会产生一个带有数据的队列:

    在这里插入图片描述

    • 消费者代码:
    public class Consumer {
        public static void main(String[] args) throws Exception {
            Connection connection = RabbitMqUtils.getConnection();
            Channel channel = RabbitMqUtils.getChannel();
            // 声明 接收消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
            };
            // 取消消息时的回调
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume("hello", true, deliverCallback, cancelCallback);
        }
    }
    

    当我们执行完消费者代码,可以看到控制台界面和消息被消费

    在这里插入图片描述

    在这里插入图片描述


    四、Work模式

    在这里插入图片描述

    一个生产者对应多个消费者,但是一条消息只能由一个消费者消费。
    主要有两种模式:一是轮询模式的分发,一个消费者一条消息,按均分配;二是公平分发,根据消费者的消费能力进行公平分发,能力越高,消费能力越强,按劳分配。

    • 接下来主要介绍轮询模式,生产者代码:
    public class Producer {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            channel.queueDeclare("work", false, false, false, null);
            for (int i = 1; i <= 10; i++) {
                Thread.sleep(i * 10);
                channel.basicPublish("", "work", null, (i + "I'am rabbitMQ").getBytes());
            }
            RabbitMqUtils.closeChannel();
            RabbitMqUtils.closeConnection();
        }
    }
    
    • 消费者1的代码:
    public class Work_1 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                try {
                    Thread.sleep(1000);
                    System.out.println(new String(message.getBody()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            };
            CancelCallback cancelCallback = (consumerTag) -> {
                System.out.println("消息消费被中断");
            };
    //        channel.basicQos(1);
            channel.basicConsume("work", false, deliverCallback, cancelCallback);
            RabbitMqUtils.closeChannel();
            RabbitMqUtils.closeConnection();
        }
    }
    
    • 消费者2的代码:
    public class Work_2 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            // 声明 接收消息
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                try {
                    Thread.sleep(5 * 1000);
                    System.out.println(new String(message.getBody()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //返回确定状态,由于我们是设置手动返回状态的,如果没有调用该方法,那么消息仍然在我们队列中,不会被删除
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
    //        channel.basicQos(1);
            channel.basicConsume("work", false, deliverCallback, cancelCallback);
            RabbitMqUtils.closeChannel();
            RabbitMqUtils.closeConnection();
        }
    }
    
    • 接着开始测试代码,首先让生产者发送10条消息到队列中,然后运行消费者代码,打开控制台,如图:

    在这里插入图片描述

    在这里插入图片描述

    我们可以发现,即使两个消费者的睡眠时间不同(处理消息的能力不一样),但是消费消息的数目确实一样的。

    分析运行结果:

    1、消费者1和消费者2获取到的消息内容是不同的,即同一个消息只能被一个消费者获取。
    2、两个消费者获取消息的条数是一样的。

    • 而对于公平分发,我们只需要在上述代码中添加下列代码:
    channel.basicQos(1);
    

    五、发布/订阅模式

    在这里插入图片描述

    P:在订阅模式中,多了一个Exchange角色,生产者不再发送到队列中,而是发送给交换机。
    Exchange:一方面接收生产者发送的消息,另一方面知道如何处理消息;而到底如何处理,取决于Exchange的类型。有常见的以下三种类型:

    • Fanout:广播,将消息交给所有绑定到交换机的队列
    • Direct:定向,把消息交给符合指定routing key的队列
    • Topic:通配符,把消息交给符合routing pattern的队列

    注意:交换机只负责转发消息,不具备存储信息的能力,因此如果没有任何队列与交换机绑定,或者没有符合路由规则的队列,那么信息会丢失!

    • 生产者代码:
    public class Producer {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            /*
            String exchange:交换机名称
            BuiltinExchangeType type:交换机类型
            boolean durable:是否持久化
            boolean autoDelete:自否自动删除
            boolean internal:内部使用,一般为false
            Map arguments:参数
             */
            channel.exchangeDeclare("_fanout", BuiltinExchangeType.FANOUT, false, false, false, null);
            channel.queueDeclare("_fanout_1", false, false, false, null);
            channel.queueDeclare("_fanout_2", false, false, false, null);
            // 绑定队列和交换机
            /*
            String queue:队列名称
            String exchange:交换机名称
            String routingKey:路由键
                规则:如果交换机的类型为fanout,routingkey设置为""
             */
            channel.queueBind("_fanout_1", "_fanout", "");
            channel.queueBind("_fanout_2", "_fanout", "");
            channel.basicPublish("_fanout", "", null, "日志信息:张三调用了XXX方法,日志级别为info".getBytes());
            channel.close();
            RabbitMqUtils.closeConnection();
        }
    }
    
    • 消费者1代码:
    public class PubSun_1 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
                System.out.println("获取信息,并打印到控制台");
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume("_fanout_1", true, deliverCallback, cancelCallback);
        }
    }
    
    • 消费者2代码:
    public class PubSun_2 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
                System.out.println("获取信息,并保存到数据库");
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume("_fanout_2", true, deliverCallback, cancelCallback);
        }
    }
    
    • 运行生产者代码,可以看到控制台的打印信息如下:

    在这里插入图片描述

    在这里插入图片描述


    六、路由模式

    在这里插入图片描述

    在此模式下,队列与交换机不能任意绑定,而是要指定一个RoutingKey(路由key);生产者在向交换机发送消息时,也必须指定消息的RoutingKey。

    交换机不再把消息交给每一个绑定的队列,而是根据消息的路由key进行判断,只有队列的路由key与消息的路由key完全一致,才会接收到消息。

    • 生产者代码:
    public class Producer {
        // 发消息
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare("routing", BuiltinExchangeType.DIRECT, false, false, false, null);
            channel.queueDeclare("_routing_1", false, false, false, null);
            channel.queueDeclare("_routing_2", false, false, false, null);
            // 绑定队列和交换机
            channel.queueBind("_routing_1", "routing", "error");
            channel.queueBind("_routing_2", "routing", "error");
            channel.queueBind("_routing_2", "routing", "info");
            channel.queueBind("_routing_2", "routing", "warning");
            channel.basicPublish("routing", "info", null, "日志信息:张三调用了XXX方法,日志级别为info".getBytes());
            channel.close();
            RabbitMqUtils.closeConnection();
        }
    }
    
    • 消费者1代码:
    public class Routing1 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
                System.out.println("获取信息,并保存到数据库");
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume("_routing_1", true, deliverCallback, cancelCallback);
        }
    }
    
    • 消费者2代码:
    public class Routing2 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
                System.out.println("获取信息,并打印到控制台");
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume("_routing_2", true, deliverCallback, cancelCallback);
        }
    }
    
    • 运行代码,查看控制台打印信息:

    在这里插入图片描述

    在这里插入图片描述

    因为生产者代码中发送消息的路由key是info,而info对应的是消费者2,所以消费者1没有消费消息,控制台为空。


    七、通配符模式

    在这里插入图片描述

    通配符模式通俗来讲就是模糊匹配。符号#匹配一个或多个词,符号*匹配一个词。例如com.#可以匹配com.it.cabbage或者com.cabbage等等com.*只能匹配com.cabbage

    Topic类型与Direct相比,都是可以根据路由key把消息路由到不同的队列。只不过Topic类型的交换机可以让队列绑定路由key的时候使用通配符

    • 生产者代码:
    public class Producer {
        // 发消息
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            channel.exchangeDeclare("topic", BuiltinExchangeType.TOPIC, false, false, false, null);
            channel.queueDeclare("_topic_1", false, false, false, null);
            channel.queueDeclare("_topic_2", false, false, false, null);
            // 绑定队列和交换机
            channel.queueBind("_topic_1", "topic", "*.error");
            channel.queueBind("_topic_1", "topic", "order.#");
            channel.queueBind("_topic_2", "topic", "*.*");
            channel.basicPublish("topic", "e.sayhello", null, "对你说hello".getBytes());
            channel.close();
            RabbitMqUtils.closeConnection();
        }
    }
    
    • 消费者1代码:
    public class Topic1 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume("_topic_1", true, deliverCallback, cancelCallback);
        }
    }
    
    • 消费者2代码:
    public class Topic2 {
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtils.getChannel();
            DeliverCallback deliverCallback = (consumerTag, message) -> {
                System.out.println(new String(message.getBody()));
            };
            CancelCallback cancelCallback = consumerTag -> {
                System.out.println("消息消费被中断");
            };
            channel.basicConsume("_topic_2", true, deliverCallback, cancelCallback);
        }
    }
    
    • 运行代码,查看控制台打印信息:

    在这里插入图片描述

    在这里插入图片描述


    八、总结

    • 简单模式:一个生产者,一个消费者,不需要设置交换机。
    • Work模式:一个生产者,多个消费者(竞争关系),不需要设置交换机。
    • 发布/订阅模式:需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
    • 路由模式:需要设置类型为Direct的交换机,并且交换机和队列进行绑定,需要指定路由key,当发送消息到交换机后,交换机会将消息发送到绑定的队列。
    • 通配符模式:需要设置类型为Topic的交换机,并且交换机和队列进行绑定,需要指定通配符方式的路由key,当发送消息到交换机后,交换机会将消息发送到绑定的队列。

    感谢阅读,一起进步,嘻嘻~

  • 相关阅读:
    springboot基于JAVA的电影推荐系统的开发与实现毕业设计源码112306
    乘法逆元做法——约数之和
    万字 HashMap 详解,基础(优雅)永不过时
    记一次 Oracle 下的 SQL 优化过程
    hash模式和history模式
    taobao.logistics.dummy.send( 无需物流发货处理 )接口,淘宝r2接口,淘宝oAu2.0接口,淘宝订单发货接口
    对称和非对称加密
    Linux开发——shell操作(一)
    关于Request复用的那点破事儿。研究明白了,给你汇报一下。
    Qt vs2022使用QCefView控件与html通信
  • 原文地址:https://blog.csdn.net/weixin_59654772/article/details/126930910