• RabbitMQ系列【4】六大工作模式


    有道无术,术尚可求,有术无道,止于术。

    前言

    工作模式指的是消息发送及接受的策略。

    RabbitMQ 支持六种模式,官网图示:
    在这里插入图片描述

    简单模式

    接下里我们使用RabbitMQ JAVA 客户端,实现一个简单的发送/接收消息案例。
    在这里插入图片描述
    RabbitMQ Java Client GitHub 地址

    1. 添加依赖

    客户端5.X 版本,需要JDK 1.8+、RabbitMQ 服务端3.X 支持。

    <dependency>
        <groupId>com.rabbitmqgroupId>
        <artifactId>amqp-clientartifactId>
        <version>5.16.0version>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2. 发送消息

    首先需要创建连接工厂,配置服务端地址信息:

            // 1. 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    客户端和服务端进行通信时,会使用Connection建立 TCP 连接。然后每一次通信时,会在Connection内部建立的逻辑连接Channel,这样可以大量减少TCP 连接开销。
    在这里插入图片描述
    所以接下来需要创建ConnectionChannel

            // 2. 新建连接
            Connection connection = factory.newConnection();
            // 3. 新建通道
            Channel channel = connection.createChannel();
    
    • 1
    • 2
    • 3
    • 4

    接着声明队列:

    channel.queueDeclare("hello", false, false, false, null);
    
    • 1

    发送消息:

    channel.basicPublish("", "hello", null, "hello world".getBytes());
    
    • 1

    完整代码如下:

    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1. 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            // 2. 新建连接
            Connection connection = factory.newConnection();
            // 3. 新建通道
            Channel channel = connection.createChannel();
            /*
             * 4. 声明队列,参数:
             *  1.队列名称
             *  2.队列里面的消息是否持久化 默认消息存储在内存中
             *  3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
             *  4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
             *  5.其他参数
             */
            channel.queueDeclare("hello", false, false, false, null);
            /*
             * 5. 发送消息,参数:
             *  1.发送到那个交换机,(为空表示使用默认交换机)
             *  2.路由的 key ,默认交换隐式绑定到每个队列,路由KEY等于队列名称。无法显式绑定到默认交换或从默认交换解除绑定。它也不能被删除。
             *  3.其他的参数信息
             *  4.发送消息的消息体
             */
            channel.basicPublish("", "hello", null, "hello world".getBytes());
            System.out.println("消息发送完毕");
            // 6. 关闭连接
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    3. 接收消息

    代码如下:

    public class Consumer {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1. 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            // 2. 创建连接及通道
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();   //与rabbitmq通讯的通道
            // 3. 绑定队列
            channel.queueDeclare("hello", false, false, false, null);
            // 4. 处理消息的回调对象
            DeliverCallback deliverCallback = (consumerTag, delivery) ->
            {
                String message = new String(delivery.getBody());
                System.out.println(message);
            };
            // 4. 取消消息处理的回调对象
            CancelCallback cancelCallback = s -> {
                System.out.println("消息消费被中断");
            };
            // 5. 接收消息,把消息传递到回调对象处理,参数:
            /*  1.消费哪个队列
                2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
                3.消费者未成功消费的回调
             */
            channel.basicConsume("hello", true, deliverCallback, cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    4. 测试

    运行Consumer,再运行Producer,成功接收到消息:
    在这里插入图片描述

    工作队列模式

    Work Queues是官方中提出的第二种工作模式,一个生产者发送消息,有多个消费者来监听任务,但是只有一个消费者能收到消息:
    在这里插入图片描述

    工作队列/任务队列的主要思想是把任务封装为消息并将其发送到队列。在后台运行的工作进程将接收到任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。从而能实现异步多线程任务。

    这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口中不可能处理复杂的任务。

    案例演示

    复制一份Consumer类,分别启动:
    在这里插入图片描述
    Producer类中,分别发送aaa、bbb、ccc、ddd四个消息:
    在这里插入图片描述
    第一个消费者,收到了aaa、ccc:
    在这里插入图片描述
    第二个消费者,收到了bbb、ddd:
    在这里插入图片描述
    总结:在该模式下,一个消息只能被一个线程消费,采用的是轮询接收。

    发布/订阅模式

    发布/订阅模式中,消息的发送者通过消息通道广播出去,让订阅改消息主题的订阅者消费。

    就是一个生产者发送的消息会被多个消费者获取,所以也叫广播模式、一对多模式。
    在这里插入图片描述
    流程示意图如下

    1. 生产者将消息发送到交换机
    2. 交换机将信息发给所有绑定的队列
    3. 绑定队列的消费者收到消息

    该模式需要指定一个Exchange交换机,起本身只负责转发消息,不具备存储消息的能力。一方面,接收生产者发送的消息。另一方面,需要知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。

    到底如何操作,取决于Exchange的类型。有常见以下几种类型:

    • Fanout:广播。将消息交给所有绑定到交换机的队列
    • Direct:定向。把消息交给符合指定routing key的队列
    • Topic:通配符。把消息交给符合routing pattern(路由模式)的队列
    • Header:通过消息内容中的headers属性来进行匹配

    按照流程,编写生产者代码:

    public class Producer {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1. 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            // 2. 新建连接
            Connection connection = factory.newConnection();
            // 3. 新建通道
            Channel channel = connection.createChannel();
            /*
             * 4. 声明一个交换机
             * exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean , Map arguments)
             *  参数:
             *  exchange=》交换机名称
             *  type=》指定交换机的类型为FANOUT广播模式
             *  durable=》是否持久化
             *  autoDelete=》自动删除
             *  internal=》内部使用,一般为false
             *  arguments=》参数
             */
            String exchangeName = "fanoutExchange";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
            // 5. 创建队列
            String fanoutQueueOne = "fanoutQueueOne";
            String fanoutQueueTwo = "fanoutQueueTwo";
            channel.queueDeclare(fanoutQueueOne, true, false, false, null);
            channel.queueDeclare(fanoutQueueTwo, true, false, false, null);
            // 6. 绑定队列和交换机,参数(队列名称、交换机名称,路由键(绑定规则,如果交换机的类型为fanout,routingKey为“”)
            channel.queueBind(fanoutQueueOne, exchangeName, "");
            channel.queueBind(fanoutQueueTwo, exchangeName, "");
            // 7. 发送消息
            String body = "广播消息";
            channel.basicPublish(exchangeName, "", null, body.getBytes(StandardCharsets.UTF_8));
            // 8. 关闭连接
            channel.close();
            connection.close();
            System.out.println("消息发送完毕");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    运行程序,发送消息,在控制台中,可以看到创建的交换机、绑定关系如下图:
    在这里插入图片描述
    在创建的两个队列中,可以看到都收到了消息,并处于就绪状态:
    在这里插入图片描述

    创建消费者,并复制代码,运行两个消费者:

    public class FanoutConsumerOne {
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1. 创建连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
            // 2. 创建连接及通道
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();   //与rabbitmq通讯的通道
            // 3. 绑定队列,队列和交换机绑定
            String fanoutQueueOne = "fanoutQueueOne";
            String exchangeName = "fanoutExchange";
            channel.queueBind(fanoutQueueOne, exchangeName, "");
            // 4. 处理消息的回调对象
            DeliverCallback deliverCallback = (consumerTag, delivery) ->
            {
                String message = new String(delivery.getBody());
                System.out.println(message);
            };
            // 4. 取消消息处理的回调对象
            CancelCallback cancelCallback = s -> {
                System.out.println("消息消费被中断");
            };
            // 5. 接收消息,把消息传递到回调对象处理
            channel.basicConsume(fanoutQueueOne, true, deliverCallback, cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    可以看到,每个消息都会被所有消费者接收:
    在这里插入图片描述

    在这里插入图片描述

    路由模式

    官网的路由(Routing)模式流程示意图如下:
    在这里插入图片描述
    该模式也需要加入交换机,指定其类型为direct。队列在绑定交换机时要指定routing key(路由键),消息会转发到符合routing key 的队列。交换机根据routingKey进行完全匹配,如果匹配失败则丢弃消息。

    流程说明

    1. 生产者P,向Exchange发送消息,发送消息时,会指定一个 routing key
    2. 交换机接收生产者的消息,然后把消息递交给与 routing key 完全匹配的队列。
    3. 消费者C1,其所在队列指定了需要 routing key 为 error 的消息。
    4. 消费者C2,其所在队列指定了需要 routing key 为 info、error、warning 的消息。

    生产者核心代码

           // 4. 声明一个交换机
            String exchangeName = "directExchange";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
            // 5. 创建队列
            String directQueueInfo = "directQueueInfo";
            String directQueueError = "directQueueError";
            channel.queueDeclare(directQueueInfo, true, false, false, null);
            channel.queueDeclare(directQueueError, true, false, false, null);
            // 6. 绑定队列和交换机,参数(队列名称、交换机名称,路由键)
            channel.queueBind(directQueueInfo, exchangeName, "info");
            channel.queueBind(directQueueError, exchangeName, "error");
            // 7. 发送消息
            channel.basicPublish(exchangeName, "info", null, "INFO日志".getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(exchangeName, "error", null, "ERROR日志".getBytes(StandardCharsets.UTF_8));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消费者代码

            // 3. 绑定队列
            String directQueueError = "directQueueError";
            // 队列和交换机绑定
            String exchangeName = "directExchange";
            channel.queueBind(directQueueError, exchangeName, "error");
            // 4. 处理消息的回调对象
            DeliverCallback deliverCallback = (consumerTag, delivery) ->
            {
                String message = new String(delivery.getBody());
                System.out.println(message);
            };
            // 4. 取消消息处理的回调对象
            CancelCallback cancelCallback = s -> {
                System.out.println("消息消费被中断");
            };
            // 5. 接收消息,把消息传递到回调对象处理,参数:
            channel.basicConsume(directQueueError, true, deliverCallback, cancelCallback);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    再复制代码创建另外一个消费者,绑定路由键为info,分别启动后运行发送消息。

    结果如下所示,绑定了error路由的队列只收到ERROR日志,info路由的队列只收到INFO日志。
    在这里插入图片描述在这里插入图片描述

    主题模式

    在这里插入图片描述

    主题Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型的Routing key 可以使用通配符

    Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: order.insert

    通配符规则

    • #:匹配一个或多个词

    • *:匹配一个词

    举例

    • item.#:能够匹配item.spu.insert 或者 item.spu

    • item.*:只能匹配item.spu

    创建 Topic 类型的交换机,发送消息:

            // 4. 声明一个交换机类型为TOPIC
            String exchangeName = "topicExchange";
            channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
            // 5. 发送消息时,携带通配符KEY
            channel.basicPublish(exchangeName, "aa.error", null, "日志=》aa.error".getBytes(StandardCharsets.UTF_8));
            channel.basicPublish(exchangeName, "bbb.ccc.error", null, "日志=>bbb.ccc.error".getBytes(StandardCharsets.UTF_8));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    创建两个消费者,使用通配符绑定:

            // 3. 绑定队列
            String topicQueueError = "topicQueueError";
            // 队列和交换机绑定,*.error 只能接到aa.error、bb.error等前缀为一个单词的消息
            String exchangeName = "topicExchange";
            channel.queueBind(topicQueueError, exchangeName, "*.error");
    
    • 1
    • 2
    • 3
    • 4
    • 5
            // 3. 绑定队列
            String topicQueueError = "topicQueueError";
            // 队列和交换机绑定,#.error 只能接到aa.error、bb.cc.error等前缀为一个或多个单词的消息
            String exchangeName = "topicExchange";
            channel.queueBind(topicQueueError, exchangeName, "#.error");
    
    • 1
    • 2
    • 3
    • 4
    • 5

    运行代码,发送消息,运行结果如下:
    在这里插入图片描述

    在这里插入图片描述

    RPC 模式

    RPC 模式也就是使用队列实现远程过程调用。实际使用的很少,专业的RPC框架已经很多了~,这里就不介绍了。
    在这里插入图片描述

  • 相关阅读:
    Java之Map、三种Map遍历方式、HashMap、TreeMap、IO流、Properties、反射、获取类对象的三种方式、注解、元注解、工厂模式、装饰器
    每个人都应该去学写作
    双目立体匹配算法SGM步骤拆解
    从源码看webpack3打包流程
    C++类模板实战之手写精简版vector容器,详解版
    Linux系统中安装Docker
    显示控件——字符显示之文本显示
    MySQL(4)索引实践(2)
    网站一键设置置灰的方法
    PEP 257 - 文档字符串约定
  • 原文地址:https://blog.csdn.net/qq_43437874/article/details/127092078