• RabbitMQ(四)交换机Exchanges


    RabbitMQ(四)交换机Exchanges


    5 交换机

    5.1 Exchanges

    5.1.1 Exchanges 概念

    ​ RabbitMQ消息传递模型的核心思想:生产者生产的消息不会直接发送到队列上,生产者只能将消息发送到交换机(exchange),交换机的工作内容一方面接收来自生产者的消息,另一方面,将接收到的消息推入队列。另一方面将消息推入队列需要知道消息的处理策略,这就是由交换机的类型来决定了。

    5.1.2 Exchanges 的类型

    ​ 总共有以下几种类型

    ​ 直接类型(direct),主题(topic),标题(headers),扇出(fanout)

    Exchanges类型别名routingKey策略
    Fanout(扇出)广播模式空字符串将接收到的消息广播到它知道的所有队列中
    Direct(直接)直接模式black…消息支取到它绑定的routingKey队列中,当绑定的多个队列的Key都相同,那表现就与fanout类型相似了
    Topic(主题)主题模式单词组合,以点分开
    *(星号)可以代替一个单词,#(井号)可以替代零个或多个单词
    可以通过通配符的方式来发送消息
    当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
    如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是 direct 了

    5.1.3 默认Exchanges

    ​ 我们之前在发送消息的时候,之所以能实现的原因是使用了默认交换机,通过空字符串""进行标识。

    /**
     * 1. 交换机
     * 2. 队列名
     * 3. 参数 消息持久化(保存在磁盘上)
     * 4. 消息
     */
    channel.basicPublish("",QueueName.Ack_Queue.getName(), MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    第一个参数就是交换机的名称,空字符串则表示默认或无名交换机。

    UI管理界面中可以看到

    在这里插入图片描述

    5.2 临时队列

    ​ 我们之前去使用一个队列,都会有一个队列的名称,那如果我们连接rabbitMQ的时候都需要一个随机名称的空队列,那么我们可以创建一个具有随机名称的队列,但当我们一点断开了消费者的连接,队列就会被自动删除

    ​ 创建临时队列的方式如下

    /**
     *  声明一个临时队列
     *  生成一个零时队列,队列的名称是随机的
     *  当消费者断开与队列的连接时,队列就自动删除
     */
    String queueName = channel.queueDeclare().getQueue();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    创建出来 在UI中可以看到

    在这里插入图片描述

    5.3 绑定(bindings)

    ​ bingding是exchange和queue之间的桥梁,它告诉我们exchange和哪个队列进行了绑定关系,它是一个绑定的关系值。

    在这里插入图片描述

    5.4 Fanout 扇出模式

    ​ fanout这种类型非常的简单,它是将接收到的所有消息广播到它知道的所有的队列中。

    在这里插入图片描述

    5.4.1 Fanout 实战结构图

    在这里插入图片描述

    Logs和临时队列的绑定关系如下图

    在这里插入图片描述

    5.4.2 Fanout 实战代码

    1. 消费者1 ReceiveLogs01

      /**
       * 消费者负责消费消息
       */
      public class ReceiveLogs01 {
      
        public static void main(String[] args) throws Exception{
          Channel channel = RabbitMqUtil.getChannel();
          //声明一个交换机
          channel.exchangeDeclare(ExchangeName.Log_Exchange.getName(), "fanout");
          /**
               *  声明一个临时队列
               *  生成一个零时队列,队列的名称是随机的
               *  当消费者断开与队列的连接时,队列就自动删除
               */
          String queueName = channel.queueDeclare().getQueue();
      
          //绑定交换机与队列
          channel.queueBind(queueName,ExchangeName.Log_Exchange.getName(), "");
          //等待接收消息
          DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
          };
          System.out.println("消费者A-正在接收消息");
          channel.basicConsume(queueName,true,deliverCallback,(consumerTag,message)->{});
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    2. 消费者2 ReceiveLogs02

      /**
       * 消费者负责消费消息
       */
      public class ReceiveLogs02 {
      
        public static void main(String[] args) throws Exception{
          Channel channel = RabbitMqUtil.getChannel();
          //声明一个交换机
          channel.exchangeDeclare(ExchangeName.Log_Exchange.getName(), "fanout");
          /**
               *  声明一个临时队列
               *  生成一个零时队列,队列的名称是随机的
               *  当消费者断开与队列的连接时,队列就自动删除
               */
          String queueName = channel.queueDeclare().getQueue();
      
          //绑定交换机与队列
          channel.queueBind(queueName,ExchangeName.Log_Exchange.getName(), "");
          //等待接收消息
          DeliverCallback deliverCallback=(consumerTag,message)->{
            System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
          };
          System.out.println("消费者B-正在接收消息");
          channel.basicConsume(queueName,true,deliverCallback,(consumerTag,message)->{});
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
    3. 生产者 EmitLog

      /**
       * 发消息给交换机
       */
      public class EmitLog {
      
        public static void main(String[] args)throws Exception {
          Channel channel = RabbitMqUtil.getChannel();
          channel.exchangeDeclare(ExchangeName.Log_Exchange.getName(), "fanout");
          Scanner scanner = new Scanner(System.in);
      
          while (scanner.hasNext()){
            String message = scanner.next();
            channel.basicPublish(ExchangeName.Log_Exchange.getName(), "",null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息: "+message);
          }
      
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    5.4.3 实战截图

    1. 生产者

      在这里插入图片描述

    2. 消费者A

      在这里插入图片描述

    3. 消费者B

      在这里插入图片描述

    5.5 Direct 直接模式

    ​ 在上一个部分我们将信息分享给所有的消费者,但如果我们希望将消息纷发给不同的消费者的话,fanout这种交换类型就不能给我们带来很大的灵活性,这里我们就要使用Direct类型进行替换,这种工作方式的策略是:消息只会去到它绑定的routingKey队列中去。

    在这里插入图片描述

    5.5.1 Direct 实战结构图

    在这里插入图片描述

    Direct_Logs绑定队列关系如下

    在这里插入图片描述

    5.5.2 Direct 实战代码

    1. 消费者1 ReceiveLogsDirect01

      public class ReceiveLogsDirect01 {
      
          public static void main(String[] args) throws Exception{
              Channel channel = RabbitMqUtil.getChannel();
              //声明交换机
              channel.exchangeDeclare(ExchangeName.Direct_Logs.getName(), BuiltinExchangeType.DIRECT);
              //声明一个队列
              channel.queueDeclare(QueueName.Console_Queue.getName(), false,false,false,null);
              channel.queueBind(QueueName.Console_Queue.getName(), ExchangeName.Direct_Logs.getName(),"info");
              channel.queueBind(QueueName.Console_Queue.getName(), ExchangeName.Direct_Logs.getName(),"warning");
      
              //接收消息
              //等待接收消息
              DeliverCallback deliverCallback=(consumerTag, message)->{
                  System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
              };
      
              System.out.println("消费者A-正在接收消息 (接收info,warning)");
              //消费者接收消息
              channel.basicConsume(QueueName.Console_Queue.getName(), true,deliverCallback,consumerTag->{});
      
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
    2. 消费者2 ReceiveLogsDirect02

      public class ReceiveLogsDirect02 {
      
          public static void main(String[] args) throws Exception{
              Channel channel = RabbitMqUtil.getChannel();
              //声明交换机
              channel.exchangeDeclare(ExchangeName.Direct_Logs.getName(), BuiltinExchangeType.DIRECT);
              //声明一个队列
              channel.queueDeclare(QueueName.Disk_Queue.getName(), false,false,false,null);
              channel.queueBind(QueueName.Disk_Queue.getName(), ExchangeName.Direct_Logs.getName(),"error");
      //        channel.queueBind(QueueName.Disk_Queue.getName(), ExchangeName.Direct_Logs.getName(),"warning");
      
              //接收消息
              //等待接收消息
              DeliverCallback deliverCallback=(consumerTag, message)->{
                  System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
              };
      
              System.out.println("消费者B-正在接收消息 (接收error)");
              //消费者接收消息
              channel.basicConsume(QueueName.Disk_Queue.getName(), true,deliverCallback,consumerTag->{});
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
    3. 生产者 EmitLogTwo

      public class EmitLogTwo {
      
        public static void main(String[] args)throws Exception {
          Channel channel = RabbitMqUtil.getChannel();
          channel.exchangeDeclare(ExchangeName.Direct_Logs.getName(), BuiltinExchangeType.DIRECT);
          Scanner scanner = new Scanner(System.in);
      
          while (true){
            System.out.print("请输入你需要发送的日志类型:");
            String logType = scanner.next();
            if("warning".equals(logType)||"info".equals(logType)||"error".equals(logType)){
              System.out.print("请输入你需要发送的消息:");
              String message = scanner.next();
              channel.basicPublish(ExchangeName.Direct_Logs.getName(), logType,null,message.getBytes("UTF-8"));
              System.out.println("生产者发出消息: "+message);
            }else {
              System.out.println("类型输入错误!请重新输入");
              continue;
            }
          }
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22

    5.5.3 实战截图

    1. 生产者

      在这里插入图片描述

    2. 消费者A

      在这里插入图片描述

    3. 消费者B

      在这里插入图片描述

    5.5.4 多重绑定

    ​ 当exchange的类型为direct,但是它绑定的多个队列的key都相同,这种情况下虽然绑定的类型是direct,但是它的表现就和fanout有点类似了,和广播模式差不多,发送的消息会被相同绑定key的接收到。

    5.5.5 没有绑定关系的routingKey

    ​ 当exchange向一个没有绑定queue(队列)的routingKey发送消息时,由于没有绑定关系,所以消息最终会被丢弃。

    5.6 Topic 主题模式

    ​ 尽管direct交换机解决了部分纷发的局限性,topic类型在direct的基础上,可以在绑定routingkey的基础上可以使用通配符,更加的灵活。

    在这里插入图片描述

    Topic的要求

    ​ topic交换机的绑定key不可以随意填写,必须满足一定的要求

    1. 必须是一个单词列表,以点号隔开

    2. 可以使用通配符

      *(星号)可以代替一个单词

      #(井号)可以替代零个或多个单词

    5.6.1 Topic 实战结构图

    在这里插入图片描述

    Topic_Logs绑定队列关系如下

    在这里插入图片描述

    5.6.2 Topic 实战代码

    1. 消费者1 ReceiveLogsTopic01

      /**
       * 主题模式消费者
       */
      public class ReceiveLogsTopic01 {
      
        public static void main(String[] args)throws Exception{
          Channel channel = RabbitMqUtil.getChannel();
      
          //声明交换机
          channel.exchangeDeclare(ExchangeName.Topic_Logs.getName(), BuiltinExchangeType.TOPIC);
          //声明队列
          channel.queueDeclare(QueueName.Q1_Queue.getName(), false,false,false,null);
          //绑定队列
          channel.queueBind(QueueName.Q1_Queue.getName(), ExchangeName.Topic_Logs.getName(), "*.orange.*");
      
          System.out.println("主题模式消费者T1正在等待消息... (*.orange.*)");
      
          //接收消息
          channel.basicConsume(QueueName.Q1_Queue.getName(),true,(consumerTag,message)->{
            System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
          },consumerTag->{});
        }
      }
      
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
    2. 消费者2 ReceiveLogsTopic02

      /**
       * 主题模式消费者
       * 通配符 *表示一个单词,#代表一个或多个单
       */
      public class ReceiveLogsTopic02 {
      
        public static void main(String[] args)throws Exception{
          Channel channel = RabbitMqUtil.getChannel();
      
          //声明交换机
          channel.exchangeDeclare(ExchangeName.Topic_Logs.getName(), BuiltinExchangeType.TOPIC);
          //声明队列
          channel.queueDeclare(QueueName.Q2_Queue.getName(), false,false,false,null);
          //绑定队列
          channel.queueBind(QueueName.Q2_Queue.getName(), ExchangeName.Topic_Logs.getName(), "*.*.rabbit");
          channel.queueBind(QueueName.Q2_Queue.getName(), ExchangeName.Topic_Logs.getName(), "lazy.#");
      
          System.out.println("主题模式消费者T2正在等待消息... (*.*.rabbit)(lazy.#)");
      
          //接收消息
          channel.basicConsume(QueueName.Q2_Queue.getName(),true,(consumerTag,message)->{
            System.out.println("打印接收到的消息:"+new String(message.getBody(),"UTF-8"));
          },consumerTag->{});
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
    3. 生产者 EmitLogThree

      public class EmitLogThree {
      
        public static void main(String[] args)throws Exception {
          Channel channel = RabbitMqUtil.getChannel();
          channel.exchangeDeclare(ExchangeName.Topic_Logs.getName(), BuiltinExchangeType.TOPIC);
          Scanner scanner = new Scanner(System.in);
      
          while (true){
            System.out.print("请输入你需要发送的routingKey:");
            String logType = scanner.next();
            System.out.print("请输入你需要发送的消息:");
            String message = scanner.next();
            channel.basicPublish(ExchangeName.Topic_Logs.getName(), logType,null,message.getBytes("UTF-8"));
            System.out.println("生产者发出消息: "+message);
          }
      
        }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18

    5.6.3 实战截图

    1. 生产者

      在这里插入图片描述

    2. 消费者A

      在这里插入图片描述

    3. 消费者B

      在这里插入图片描述

    注意:当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像 fanout 了
    如果队列绑定键当中没有#和*出现,那么该队列绑定类型就像是 direct 了

  • 相关阅读:
    循环神经网络理论知识+卷积神经网络模型
    【蓝桥杯单片机】二、独立按键和矩阵按键
    力扣L12--- 125验证回文串(java版)-2024年3月15日
    记录一个音频PCM数据由双声道转单声道出错问题
    2023版:深度比较几种.NET Excel导出库的性能差异
    【c++】刷题常用技巧
    element-plus中图片预览插件源码改动
    【计算机网络】子网掩码、子网划分
    一天吃透Java面试题
    【李航统计学习笔记】第九章:EM算法
  • 原文地址:https://blog.csdn.net/qq_27331467/article/details/126131135