• RabbitMq(一)


    视频:尚硅谷RabbitMQ教程丨快速掌握MQ消息中间件_哔哩哔哩_bilibili

    涉及代码:

    https://github.com/user0819/rabbitMQ-hello

    https://github.com/user0819/rabbitmq-springboot

    一、概念

    1.1 MQ

    1.1.1 什么是MQ

    MQ(message queue),消息队列。

    是一种跨进程的通信机制,用于上下游传递消息。

    1.1.2 为什么要用MQ

    • 流量削峰
    • 应用解耦
    • 异步处理

    流量削峰

    在流量高峰期使用消息队列做缓冲,避免应用被激增的请求打挂,或者丢失了某部分的请求。

    应用解耦

    系统之间减少耦合关系。上游系统只需将消息丢进队列,无需关心下游系统何时处理及处理结果。

    异步处理

    有些服务间调用是异步的,例如 A 调用 B, A 需要知道 B 什么时候可以执行完。

    普通方式:

    • A 过一段时间去调用 B 的查询 api 查询。
    • A 提供一个 callback api, B 执行完之后调用 api 通知 A 服务。

    可以使用MQ, A 调用 B 服务后,只需要监听 B 处理完成的消息,能及时的得到异步处理成功的消息。

    1.1.3 几种主流MQ

    特性

    ActiveMQ

    RabbitMQ

    RocketMQ

    Kafka

    吞吐量

    万级

    万级

    十万级

    百万级

    时效性

    ms 级

    us级

    ms 级

    ms级

    可用性

    主从架构

    主从架构

    非常高

    分布式架构

    非常高

    分布式架构

    消息可靠性

    较低概率丢失数据

    基本不丢

    0 丢失

    0 丢失

    功能支持

    MQ 领域功能极其完备

    基于 erlang 开发

    并发能力很强

    性能极好

    延时很低

    MQ 功能较为完善

    分布式的

    扩展性好

    功能较为简单

    主要支持简单的 MQ 功能

    在大数据领域被大规模使用

    缺点

    维护变少

    高吞吐量场景使用较少

    商业版需收费

    学习成本高

    支持的客户端语言有限:主Java

    没有实现JMS等接口

    响应时间随队列/分区变长

    消费失败不支持重试

    代理宕机会导致消息乱序

    1.1.4 如何选择MQ

    1.Kafka

    基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。

    如果有日志采集功能, 首选 kafka。

    2.RocketMQ

    天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰。

    RoketMQ 在稳定性上可能更值得信赖,已经经历了阿里双十一多次考验

    3.RabbitMQ

    结合 erlang 语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便。

    如果数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

    1.2 AMQP

    AMQP 是什么

    AMQP,即Advanced Message Queuing Protocol。

    准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。

    RabbitMQ 就是 AMQP 协议的 Erlang 的实现 (当然 RabbitMQ 还支持 STOMP2、 MQTT3 等协议 )

    RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循的 AMQP 协议中相 应的概念。

    AMQP 协议 3 层

    • Module Layer: 协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
    • Session Layer: 中间层,主要负责客户端命令发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
    • TransportLayer: 最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

    AMQP 模型的组件

    • 交换器 (Exchange):消息代理服务器中用于把消息路由到队列的组件。
    • 队列 (Queue):用来存储消息的数据结构,位于硬盘或内存中。
    • 绑定 (Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

    1.3 RabbitMQ

    1.3.1 四大核心概念

    • 生产者:产生数据发送消息
    • 交换机:接收来自生产者的消息,将消息推送到队列中
    • 队列:存储消息
    • 消费者:接收消费消息

    1.3.2 核心部分

    1.3.3 名词介绍

    • Broker:接收和分发消息的应用,RabbitMQ Server 就是Message Broker 。
    • Virtual host:一个虚拟的分组,类似于网络中的 namespace 概念。当多个用户使用同一个 RabbitMQ server时,可以划分出多个 vhost,每个用户在自己的vhost 创建 exchange/queue 等。
    • Connection:publisher/consumer 和 broker 之间的 TCP 连接 。
    • Channel:Channel 是在 connection 内部建立的逻辑连接,channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
    • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。
    • Queue:消息最终被送到这里等待 consumer 取走 。
    • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。

    1.3.4 安装

    RabbitMQ使用Erlang语言开发等,所以必须先提供Erlang环境。

    • 安装Erlang
    • 安装RabbitMq

    官方安装说明:Downloading and Installing RabbitMQ — RabbitMQ

    Linux安装:

    Installing on RPM-based Linux (RedHat Enterprise Linux, CentOS, Fedora, openSUSE) — RabbitMQ

    Linux安装RabbitMQ详细教程_m0_67392273的博客-CSDN博客_linux安装rabbitmq

    1. 先准备rpm安装包
      1. erlang-25.0.3-1.el8.x86_64.rpm (https://github.com/rabbitmq/erlang-rpm/releases/tag/v25.0.3)
      2. socat-1.7.4.1-1.el8.x86_64.rpm (http://www.rpmfind.net/linux/rpm2html/search.php?query=socat(x86-64))
      3. rabbitmq-server-3.10.7-1.el8.noarch.rpm (https://www.rabbitmq.com/install-rpm.html#package-dependencies)
    2. 执行rpm安装命令
      1. [root@VM-4-16-centos rabbitmq]# rpm -ivh erlang-25.0.3-1.el8.x86_64.rpm
      2. [root@VM-4-16-centos rabbitmq]# rpm -ivh socat-1.7.4.1-1.el8.x86_64.rpm
      3. [root@VM-4-16-centos rabbitmq]# rpm -ivh rabbitmq-server-3.10.7-1.el8.noarch.rpm
    3. 开启管理界面命令
      rabbitmq-plugins enable rabbitmq_management
    4. 处理只能本地访问问题
      1. cd /etc/rabbitmq/
      2. vim rabbitmq.config
      3. [{rabbit,[{loopback_users,[]}]}].
    5. 启动服务
      1. # 启动rabbitmq命令:
      2. systemctl start rabbitmq-server
      3. # 查看状态命令:
      4. systemctl status rabbitmq-server
      5. # 停止命令:
      6. systemctl stop rabbitmq-server

    也可通过yum安装

    1. yum install erlang.x86_64
    2. yum install rabbitmq-server.noarch

    MAC安装

    The Homebrew RabbitMQ Formula — RabbitMQ

    使用homebrew安装

    brew install rabbitmq

    启动

    brew services start rabbitmq

    前台启动:

    CONF_ENV_FILE="/opt/homebrew/etc/rabbitmq/rabbitmq-env.conf" /opt/homebrew/opt/rabbitmq/sbin/rabbitmq-server

    安装后路径: /opt/homebrew/opt/rabbitmq

    Windows安装

    Installing on Windows — RabbitMQ

    进入后下载erlang和rabbitmq的exe安装包即可

    开启管理页面

    安装完成后,需要开启管理页面方便后期查看管理

    1. //开启管理页面(http://127.0.0.1:15672)
    2. rabbitmq-plugins enable rabbitmq_management

    1.3.5 创建用户

    1. //创建用户
    2. rabbitmqctl add_user admin admin
    3. //设置角色
    4. rabbitmqctl set_user_tags admin administrator
    5. //设置用户权限
    6. rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'
    7. //查看用户角色
    8. rabbitmqctl list_users

    二、简单示例

    maven依赖:

    1. com.rabbitmq
    2. amqp-client
    3. 5.8.0

    生产者:

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. //连接工厂
    3. ConnectionFactory factory = new ConnectionFactory();
    4. factory.setHost("127.0.0.1");
    5. factory.setPort(5672);
    6. factory.setUsername("admin");
    7. factory.setPassword("admin");
    8. //连接
    9. Connection connection = factory.newConnection();
    10. //信道
    11. Channel channel = connection.createChannel();
    12. //声明队列
    13. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    14. //发送消息
    15. channel.basicPublish("", QUEUE_NAME, null, "hello, world".getBytes(StandardCharsets.UTF_8));
    16. System.out.println("生成消息结束");
    17. }

    消费者:

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. Channel channel = RabbitMQUtil.getChannel();
    3. //分发回调
    4. DeliverCallback deliverCallback = (consumerTag, message) -> {
    5. System.out.println(consumerTag);
    6. System.out.println(new String(message.getBody()));
    7. };
    8. //取消回调
    9. CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
    10. //接收消息
    11. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    12. }

    三、Work Queues

    工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。

    相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。

    当有多个工作线程时,这些工作线程将一起处理这些任务。

    3.1 轮训分发消息示例

    1、启动两个工作线程

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. Channel channel = RabbitMQUtil.getChannel();
    3. DeliverCallback deliverCallback = (consumerTag, message) -> {
    4. System.out.println("接收到消息:" + consumerTag + " " + new String(message.getBody()));
    5. };
    6. CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
    7. //发送消息
    8. System.out.println("worker3等待消费");
    9. channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    10. }

    2、启动一个发送线程

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. Channel channel = RabbitMQUtil.getChannel();
    3. //声明队列
    4. channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    5. //发送消息
    6. Scanner scanner = new Scanner(System.in);
    7. while(scanner.hasNext()){
    8. String next = scanner.next();
    9. channel.basicPublish("", QUEUE_NAME, null, next.getBytes(StandardCharsets.UTF_8));
    10. System.out.println("消息发送成功");
    11. }
    12. }

    3、结果展示

    发送线程生成AA、BB、CC、DD,woker1处理AA、CC,worker2处理BB、DD。

    即消息会轮循的分发给多个工作线程处理。

    3.2 消息应答

    3.2.1. 概念

    消费者完成一个任务可能需要一段时间,如果消费者处理一个长的任务时只完成了部分突然挂掉,会发生什么情况?

    • 默认情况下,RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。
    • 在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息。

    为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制:

    消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理,rabbitmq 可以把该消息删除了。

    3.2.2. 自动应答

    消息发送后立即被认为已经传送成功。

    这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。

    另一方面这种模式消费者没有对传递的消息数量进行限制,使得消费者接收太多来不及处理的消息。这些消息的积压最终使得内存耗尽,最终崩溃。

    所以这种模式,仅适用在消费者高效处理消息的情况下使用。

    3.2.3. 手动消息应答的方法

    • Channel.basicAck(用于肯定确认) :告诉RabbitMQ 该消息成功处理,可以将其丢弃
    • Channel.basicNack(用于否定确认) :告诉RabbitMQ 该消息没有处理成功,不可以丢弃
    • Channel.basicReject(用于否定确认) :告诉RabbitMQ 该消息直接拒绝,可以将其丢弃

    3.2.4. Multiple 的解释

    手动应答的好处是可以批量应答并且减少网络拥堵。

    channel.basicAck(deliveryTag, false);

    multiple解释:

    • true 代表批量应答 channel 上未应答的消息
    • false 单个应答

    3.2.5. 消息自动重新入队

    如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认。

    RabbitMQ 将了解到消息未完全处理,并将对其重新排队。

    如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。

    这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

    3.2.6. 消息手动应答代码

    默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改为手动应答。

    消费者代码:

    1. public static void main(String[] args) throws IOException, TimeoutException {
    2. Channel channel = RabbitMQUtil.getChannel();
    3. DeliverCallback deliverCallback = (consumerTag, message) -> {
    4. System.out.println("接收到消息:" + consumerTag + " " + new String(message.getBody()));
    5. SleepUtil.sleep(3);
    6. //手动应答消息
    7. channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
    8. System.out.println("完成消费:" + new String(message.getBody()));
    9. };
    10. CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
    11. System.out.println("Consumer1等待消费");
    12. channel.basicConsume(QUEUE_NAME, false, deliverCallback, cancelCallback);
    13. }

    3.3.7. 预取值

    消息的发送是异步发送的,所以在任何时候channel 上肯定不止只有一个消息。

    因此这里就存在一个未确认的消息缓冲区,开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。

    通过使用 basicQos() 方法设置预取计数值。

    • 该值定义通道上允许的未确认消息的最大数量
    • 一旦数量达到配置的数量, RabbitMQ 将停止在通道上传递更多消息
    • 增加预取将提高向消费者传递消息的速度

    预取值为 1 是最保守的。当然这将使吞吐量变得很低。

    channel.basicQos(1);

    设置了预取值后,实际上也实现了不公平分发,而不再采用轮训分发。

    3.3 RabbitMQ持久化

    3.3.1. 概念

    默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息。

    确保消息不会丢失需要做两件事:将队列和消息都标记为持久化。

    3.3.2. 队列如何实现持久化

    之前创建的队列都是非持久化的,rabbitmq 重启的话,该队列就会被删除掉。

    如果要队列实现持久化,需要在声明队列的时候把 durable 参数设置为持久化。

    但是需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。

    1. //队列持久化
    2. //Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map arguments) throws IOException;
    3. channel.queueDeclare( QUEUE_NAME, true, false, false, null );

    3.3.3. 消息实现持久化

    要想让消息实现持久化,需要在消息生产者修改代码, 添加MessageProperties.PERSISTENT_TEXT_PLAIN这个属性。

    1. //消息持久化
    2. channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));

    将消息标记为持久化并不能完全保证不会丢失消息:

    尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。

    如果需要更强有力的持久化策略,则需要发布确认

    四、发布确认

    4.1 发布确认原理

    生产者将信道设置成 confirm 模式,在该信道上面发布的消息会被指派唯一的 ID(从 1 开始)。

    消息被投递到所有匹配的队列后,broker 会发送一个确认给生产者(包含消息唯一 ID),使得生产者知道消息到达队列。

    如果消息和队列是可持久化的,那么确认消息在消息写入磁盘后发出。

    • broker 回传给生产者的确认消息中 delivery-tag 域包含了消息的序列号。
    • broker 可以设置basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

    confirm 模式是异步的,生产者可以在等信道返回确认的同时继续发送下一条消息。

    当消息得到确认之后,生产者可以通过回调方法来处理该确认消息。

    如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息, 生产者同样可以在回调方法中处理该 nack 消息。

    4.2 发布确认策略

    4.2.1 开启发布确认

    发布确认默认是没有开启的,需要调用 channel 上的confirmSelect()方法。

    channel.confirmSelect();

    4.2.2 单个确认

    同步确认发布的方式,发布一个消息之后只有它被确认发布,后续的消息才能继续发布。

    waitForConfirmsOrDie(long)方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没被确认,那么方法将抛出异常。

    最大的缺点是: 发布速度特别的慢。

    1. /**
    2. * 单个确认
    3. * 每发送一条,就等待确认结果
    4. *
    5. * @throws Exception 异常
    6. */
    7. public static void sendMessageSingleConfirm() throws Exception {
    8. Connection connection = RabbitMQUtil.getConnection();
    9. Channel channel = RabbitMQUtil.getChannel(connection);
    10. String queueName = "SINGLE_CONFIRM_QUEUE";
    11. channel.queueDeclare(queueName, true, false, false, null);
    12. channel.confirmSelect();
    13. long start = System.currentTimeMillis();
    14. for (int i = 1; i <= MESSAGE_COUNT; i++) {
    15. String message = "message" + i;
    16. channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
    17. boolean flag = channel.waitForConfirms();
    18. System.out.println("消息【" + message + "】发送确认" + (flag ? "成功" : "失败"));
    19. }
    20. long end = System.currentTimeMillis();
    21. System.out.println("单个确认发送" + MESSAGE_COUNT + "条消息执行时间:" + (end - start) + "ms");
    22. RabbitMQUtil.closeChannel(channel);
    23. RabbitMQUtil.closeConnection(connection);
    24. }

    4.2.3 批量确认

    与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量。

    缺点是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。

    这种方案仍然是同步的,也一样阻塞消息的发布。

    1. /**
    2. * 批量确认
    3. * 每发送n条,就等待确认n条结果
    4. *
    5. * @throws Exception 异常
    6. */
    7. public static void sendMessageBatchConfirm() throws Exception {
    8. Connection connection = RabbitMQUtil.getConnection();
    9. Channel channel = RabbitMQUtil.getChannel(connection);
    10. String queueName = "BATCH_CONFIRM_QUEUE";
    11. channel.queueDeclare(queueName, true, false, false, null);
    12. channel.confirmSelect();
    13. long start = System.currentTimeMillis();
    14. for (int i = 1; i <= MESSAGE_COUNT; i++) {
    15. String message = "message" + i;
    16. channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
    17. if (i % 100 == 0) {
    18. boolean flag = channel.waitForConfirms();
    19. System.out.println("消息【" + (i - 100) + "~" + i + "】发送确认" + (flag ? "成功" : "失败"));
    20. }
    21. }
    22. long end = System.currentTimeMillis();
    23. System.out.println("批量确认发送" + MESSAGE_COUNT + "条消息执行时间:" + (end - start) + "ms");
    24. RabbitMQUtil.closeChannel(channel);
    25. RabbitMQUtil.closeConnection(connection);
    26. }

    4.2.4 异步确认

    生产者发送完消息后,不会阻塞等待确认消息是否发布成功。而是提供回调方法,在回调方法中处理成功或失败的消息。

    1. /**
    2. * 异步确认
    3. * 提供一个回调方法,客户端不主动等待结果,而是等服务端主动回调
    4. *
    5. * @throws Exception 异常
    6. */
    7. public static void sendMessageSyncConfirm() throws Exception {
    8. Connection connection = RabbitMQUtil.getConnection();
    9. Channel channel = RabbitMQUtil.getChannel(connection);
    10. String queueName = "SYNC_CONFIRM_QUEUE";
    11. channel.queueDeclare(queueName, true, false, false, null);
    12. channel.confirmSelect();
    13. ConcurrentSkipListMap messageMap = new ConcurrentSkipListMap<>();
    14. ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
    15. if (multiple) {
    16. //返回的是小于等于当前序列号的未确认消息
    17. ConcurrentNavigableMap confirmed = messageMap.headMap(sequenceNumber, true);
    18. //清除该部分未确认消息
    19. confirmed.clear();
    20. } else {
    21. //只清除当前序列号的消息
    22. messageMap.remove(sequenceNumber);
    23. }
    24. };
    25. ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
    26. String message = messageMap.get(sequenceNumber);
    27. System.out.println("发布的消息" + message + "未被确认,序列号" + sequenceNumber);
    28. };
    29. channel.addConfirmListener(ackCallback, nackCallback);
    30. long start = System.currentTimeMillis();
    31. for (int i = 1; i <= MESSAGE_COUNT; i++) {
    32. String message = "message" + i;
    33. messageMap.put(channel.getNextPublishSeqNo(), message);
    34. channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8));
    35. }
    36. System.out.println(messageMap.size());
    37. long end = System.currentTimeMillis();
    38. System.out.println("异步确认发送" + MESSAGE_COUNT + "条消息执行时间:" + (end - start) + "ms");
    39. SleepUtil.sleep(10);
    40. //todo 这会儿还没发出去的消息即发送失败的消息,可以重试发送
    41. System.out.println(messageMap.size());
    42. RabbitMQUtil.closeChannel(channel);
    43. RabbitMQUtil.closeConnection(connection);
    44. }

    4.2.5 速度比较

    • 单独确认:同步等待确认,简单,但吞吐量非常有限。
    • 批量确认:批量同步等待确认,简单,合理的吞吐量,一旦出现问题但很难推断出是哪条消息出现了问题。
    • 异步确认: 最佳性能和资源使用,在出现错误的情况下可以很好地控制。

    五、交换机

    5.1 exchange

    5.1.1 概念

    RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列,生产者只能将消息发送到交换机(exchange)。

    交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。

    交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列,还是把它们路由到许多队列中,还是说应该丢弃它们。

    5.1.2 类型

    总共有以下类型:

    • 扇出(fanout):将消息路由到所有和交换机绑定的队列中
    • 直接(direct):将消息路由到和交换机绑定指定routingKey的队列中
    • 主题(topic) :将消息路由到和交换机绑定routingKey符合规则的队列中
    • 标题(headers)

    5.1.3 默认交换机

    在本教程的前面部分我们对 exchange 一无所知,但仍然能够将消息发送到队列。

    之前能实现的原因是因为我们使用的是默认交换,我们通过空字符串(“”)进行标识。

    channel.basicPublish("", QUEUE_NAME, null, "hello, world".getBytes(StandardCharsets.UTF_8));

    第一个参数是交换机的名称。空字符串表示默认或无名称交换机。

    默认交换机是direct类型,和所有队列都通过队列名作为routingKey。

    Default exchange

    The default exchange is implicitly bound to every queue, with a routing key equal to the queue name. It is not possible to explicitly bind to, or unbind from the default exchange. It also cannot be deleted.

    5.2 临时队列

    队列的名称至关重要-需要指定消费者去消费哪个队列的消息。

    做测试时,每当连接到 RabbitMq都需要一个全新的空队列,为此可以创建一个随机名称的队列。

    其次一旦断开了消费者的连接,队列将被自动删除。

    创建临时队列的方式如下:

    String queueName = channel.queueDeclare().getQueue()

    创建出来之后长成这样

    5.3 绑定

    绑定(binding) 是 exchange 和 queue 之间的桥梁,它表示exchange 和哪个队列进行了绑定关系。

    比如说下面这张图是 X 与 Q1 和 Q2 进行了绑定。

    交换机和队列是通过routingKey绑定的。

    5.4 fanout

    5.4.1 fanout概念

    Fanout 这种类型非常简单,它将接收的消息广播到绑定的所有队列中。

    5.4.2 fanout示例

    消费者1/消费者2:

    1. public class Consumer1 {
    2. private static final String EXCHANGE_NAME = "FANOUT_PUBLISH_EXCHANGE";
    3. /**
    4. * fanout扇出模式,其实就是广播模式
    5. * 会把消息路由到所有与它绑定的队列中
    6. * 所以在这个模式中,routingKey其实没有作用
    7. */
    8. public static void main(String[] args) throws IOException, TimeoutException {
    9. Connection connection = RabbitMQUtil.getConnection();
    10. Channel channel = connection.createChannel();
    11. //声明交换机和队列(其实应该放在生产者侧,但为了简便放在这)
    12. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT.getType());
    13. String queueName = channel.queueDeclare().getQueue();
    14. //fanout模式中routingKey无作用
    15. channel.queueBind(queueName, EXCHANGE_NAME, "456");
    16. //消费确认
    17. DeliverCallback deliverCallback = (consumerTag, message) -> {
    18. System.out.println("Consumer1接收到消息:" + consumerTag + " " + new String(message.getBody()));
    19. };
    20. CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
    21. channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    22. }
    23. }

    生产者:

    1. public class Producer {
    2. private static final String EXCHANGE_NAME = "FANOUT_PUBLISH_EXCHANGE";
    3. /**
    4. * fanout扇出模式,其实就是广播模式
    5. * 会把消息路由到所有与它绑定的队列中
    6. * 所以在这个模式中,routingKey其实没有作用
    7. */
    8. public static void main(String[] args) throws IOException, TimeoutException {
    9. Connection connection = RabbitMQUtil.getConnection();
    10. Channel channel = connection.createChannel();
    11. //声明交换机
    12. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT.getType());
    13. //发送消息
    14. Scanner scanner = new Scanner(System.in);
    15. while (scanner.hasNextLine()){
    16. String message = scanner.nextLine();
    17. channel.basicPublish(EXCHANGE_NAME,"", null, message.getBytes(StandardCharsets.UTF_8));
    18. }
    19. }
    20. }

    结果:

    不管生产者生产什么消息,消费者1和消费者2都能收到。

    5.5 direct

    5.5.1 direct概念

    消息只去到它绑定的 routingKey 队列中去。

    在上面这张图中,我们可以看到 X 绑定了两个队列,绑定类型是 direct。

    队列Q1 绑定键为 orange, 队列 Q2 绑定键有两个:一个绑定键为 black,另一个绑定键为 green.

    在这种绑定情况下,生产者发布消息到 exchange 上:

    • 绑定键为 orange 的消息会被发布到队列 Q1
    • 绑定键为 blackgreen 和的消息会被发布到队列 Q2
    • 其他绑定键的消息将被丢弃。

    5.5.2 direct示例

    消费者1

    1. public class Consumer1 {
    2. private static final String EXCHANGE_NAME = "DIRECT_PUBLISH_EXCHANGE";
    3. /**
    4. * direct直接模式,其实就是普通路由模式
    5. * 会把消息路由到绑定该routingKey的队列中
    6. * 在这个模式中,直接根据消息的routingKey,将消息路由到绑定该routingKey的队列中
    7. */
    8. public static void main(String[] args) throws IOException, TimeoutException {
    9. Connection connection = RabbitMQUtil.getConnection();
    10. Channel channel = connection.createChannel();
    11. //声明交换机和队列(其实应该放在生产者侧,但为了简便放在这)
    12. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT.getType());
    13. String queueName = channel.queueDeclare().getQueue();
    14. //direct模式中routingKey很重要
    15. channel.queueBind(queueName, EXCHANGE_NAME, "router1");
    16. //消费确认
    17. DeliverCallback deliverCallback = (consumerTag, message) -> {
    18. System.out.println("Consumer1接收到消息:" + consumerTag + " " + new String(message.getBody()));
    19. };
    20. CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
    21. channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    22. }
    23. }

    消费者2:

    channel.queueBind(queueName, EXCHANGE_NAME, "router2"); channel.queueBind(queueName, EXCHANGE_NAME, "router22");

    生产者:

    1. public class Producer {
    2. private static final String EXCHANGE_NAME = "DIRECT_PUBLISH_EXCHANGE";
    3. /**
    4. * direct直接模式,其实就是普通路由模式
    5. * 会把消息路由到绑定该routingKey的队列中
    6. * 在这个模式中,直接根据消息的routingKey,将消息路由到绑定该routingKey的队列中
    7. */
    8. public static void main(String[] args) throws IOException, TimeoutException {
    9. Connection connection = RabbitMQUtil.getConnection();
    10. Channel channel = connection.createChannel();
    11. //声明交换机
    12. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT.getType());
    13. //发送消息
    14. channel.basicPublish(EXCHANGE_NAME, "router1", null, "message1".getBytes(StandardCharsets.UTF_8));
    15. channel.basicPublish(EXCHANGE_NAME, "router2", null, "message2".getBytes(StandardCharsets.UTF_8));
    16. channel.basicPublish(EXCHANGE_NAME, "router22", null, "message3".getBytes(StandardCharsets.UTF_8));
    17. channel.basicPublish(EXCHANGE_NAME, "router3", null, "message4".getBytes(StandardCharsets.UTF_8));
    18. }
    19. }

    结果:

    消费者1和2各自收到对应routingKey的消息

    5.6 topic

    5.6.1 topic概念

    发送到类型是 topic 交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以点号分隔开。

    这些单词可以是任意单词,比如:"stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit".这种类型的。然这个单词列表最多不能超过 255 个字节。

    在这个规则列表中,其中有两个替换符需要注意的

    • *(星号)可以代替一个单词
    • #(井号)可以替代零个或多个单词

    下图绑定关系如下:

    • Q1-->绑定的是:
      • 中间带 orange 带 3 个单词的字符串(*.orange.*)
    • Q2-->绑定的是:
      • 最后一个单词是 rabbit 的 3 个单词(*.*.rabbit)
      • 第一个单词是 lazy 的多个单词(lazy.#)

    看看他们之间数据接收情况是怎么样的

    • quick.orange.rabbit 被队列 Q1Q2 接收到
    • lazy.orange.elephant 被队列 Q1Q2 接收到
    • quick.orange.fox 被队列 Q1 接收到
    • lazy.brown.fox 被队列 Q2 接收到
    • lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
    • quick.brown.fox 不匹配任何绑定不会被任何队列接收到,会被丢弃
    • quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
    • lazy.orange.male.rabbit 是四个单词但匹配 Q2

    注意:

    • 当一个队列绑定键是#,那么这个队列将接收所有数据,就是 fanout
    • 当队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct

    5.6.2 topic示例

    消费者1

    1. /**
    2. * topic主题模式
    3. * 交换机和队列通过通配符的routingKey绑定
    4. * 消息的routingKey可能会匹配到多个绑定关系,那该消息就会被路由到多个队列中
    5. * topic模式就是通过routingKey实现模糊匹配
    6. */
    7. public static void main(String[] args) throws IOException, TimeoutException {
    8. Connection connection = RabbitMQUtil.getConnection();
    9. Channel channel = connection.createChannel();
    10. //声明交换机和队列(其实应该放在生产者侧,但为了简便放在这)
    11. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC.getType());
    12. String queueName = channel.queueDeclare().getQueue();
    13. //topic模式中routingKey是通配符:#代表零或多个单词,*代表单个单词
    14. channel.queueBind(queueName, EXCHANGE_NAME, "*.*.orange");
    15. channel.queueBind(queueName, EXCHANGE_NAME, "orange.#");
    16. //消费确认
    17. DeliverCallback deliverCallback = (consumerTag, message) -> {
    18. System.out.println("Consumer1接收到消息:" + consumerTag + " " + new String(message.getBody()));
    19. };
    20. CancelCallback cancelCallback = consumerTag -> System.out.println("消息取消消费");
    21. channel.basicConsume(queueName, true, deliverCallback, cancelCallback);
    22. }

    消费者2

    1. //topic模式中routingKey是通配符:#代表零或多个单词,*代表单个单词
    2. channel.queueBind(queueName, EXCHANGE_NAME, "lazy.*");
    3. channel.queueBind(queueName, EXCHANGE_NAME, "*.rabbit.#");

    生产者:

    1. public class Producer {
    2. private static final String EXCHANGE_NAME = "TOPIC_PUBLISH_EXCHANGE";
    3. /**
    4. * direct直接模式,其实就是普通路由模式
    5. * 会把消息路由到指定绑定该routingKey的队列中
    6. * 在这个模式中,routingKey直接决定消息入哪个队列
    7. */
    8. public static void main(String[] args) throws IOException, TimeoutException {
    9. Connection connection = RabbitMQUtil.getConnection();
    10. Channel channel = connection.createChannel();
    11. //声明交换机
    12. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC.getType());
    13. //发送消息
    14. //1.2都会收到 (注意匹配到了Consumer2的两条规则,但只会发送一次消息)
    15. channel.basicPublish(EXCHANGE_NAME, "lazy.rabbit.orange", null, "message1".getBytes(StandardCharsets.UTF_8));
    16. //2会收到
    17. channel.basicPublish(EXCHANGE_NAME, "lazy.123456", null, "message2".getBytes(StandardCharsets.UTF_8));
    18. //1会收到
    19. channel.basicPublish(EXCHANGE_NAME, "orange.123456", null, "message3".getBytes(StandardCharsets.UTF_8));
    20. //1会收到
    21. channel.basicPublish(EXCHANGE_NAME, "*.*.orange", null, "message4".getBytes(StandardCharsets.UTF_8));
    22. //都不会收到
    23. channel.basicPublish(EXCHANGE_NAME, "lazy**", null, "message4".getBytes(StandardCharsets.UTF_8));
    24. }
    25. }

    结果:

  • 相关阅读:
    后台管理----搜索,重置功能
    大数据技术在金融行业反洗钱业务的应用分析
    ExtJS - UI组件 - Grid
    【vue3+ts后台管理】登录页面完成
    【用户画像】ClickHouse中的SQL操作、副本介绍和配置、分片集群环境配置和使用
    软件著作权申请材料及申请流程?
    MySQL - 为什么InnoDB选择B+树索引?Change buffer?
    axios delete请求如何发给springboot
    谁懂啊!自制的科普安全手册居然火了
    oracle-long类型转clob类型及clob类型字段的导出导入
  • 原文地址:https://blog.csdn.net/guaituo0129/article/details/126441236