• RabbitMQ工作模式-路由模式


    官方文档参考:https://www.rabbitmq.com/tutorials/tutorial-four-python.html
    在这里插入图片描述

    使用direct类型的Exchange,发N条消息并使用不同的routingKey,消费者定义队列并将队列routingKey、Exchange绑定。此时使用direct模式Exchange必须要routingKey完成匹配的情况下消息才会转发到对应的队列中被消费。

    样例使用日志分发为样例。即按日志不同的级别,分发到不同的队列。每个队列只处理自己的对应的级别日志。

    创建生产者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.ThreadLocalRandom;
    
    public class Product {
    
      private static final String[] LOG_LEVEL = {"ERROR", "WARN", "INFO"};
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 声明交换机,交换器和消息队列的绑定不需要在这里处理。
        channel.exchangeDeclare(
            "ex.routing",
            BuiltinExchangeType.DIRECT,
            // 持久的标识
            false,
            // 自动删除的标识
            false,
            // 属性
            null);
    
        for (int i = 0; i < 30; i++) {
          String level = LOG_LEVEL[ThreadLocalRandom.current().nextInt(0, LOG_LEVEL.length)];
          String dataMsg = "[" + level + "] 消息发送 :" + i;
          // 发送消息
          channel.basicPublish("ex.routing", level, null, dataMsg.getBytes(StandardCharsets.UTF_8));
        }
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    创建ERROR的消费者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.nio.charset.StandardCharsets;
    
    public class ErrorConsumer {
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 声明队列并绑定
        channel.exchangeDeclare(
            "ex.routing",
            BuiltinExchangeType.DIRECT,
            // 持久的标识
            false,
            // 自动删除的标识
            false,
            // 属性
            null);
    
        // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
        channel.queueDeclare(
            "log.error",
            // 永久
            false,
            // 排他
            false,
            // 自动删除
            true,
            // 属性
            null);
    
        //消费者享有绑定到交换器的权力。
        channel.queueBind("log.error", "ex.routing", "ERROR");
    
        // 通过chanel消费消息
        channel.basicConsume(
            "log.error",
            (consumerTag, message) -> {
              System.out.println("ERROR收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
            },
            consumerTag -> {});
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    创建INFO级的消费者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.nio.charset.StandardCharsets;
    
    public class InfoConsumer {
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 声明队列并绑定
        channel.exchangeDeclare(
            "ex.routing",
            BuiltinExchangeType.DIRECT,
            // 持久的标识
            false,
            // 自动删除的标识
            true,
            // 属性
            null);
    
        // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
        channel.queueDeclare(
            "log.info",
            // 永久
            false,
            // 排他
            false,
            // 自动删除
            false,
            // 属性
            null);
    
        //消费者享有绑定到交换器的权力。
        channel.queueBind("log.info", "ex.routing", "INFO");
    
        // 通过chanel消费消息
        channel.basicConsume(
            "log.info",
            (consumerTag, message) -> {
              System.out.println("INFO收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
            },
            consumerTag -> {});
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    创建WARN级别的消息者

    import com.rabbitmq.client.BuiltinExchangeType;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import java.nio.charset.StandardCharsets;
    
    public class WarnConsumer {
    
      public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://root:123456@node1:5672/%2f");
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        // 声明队列并绑定
        channel.exchangeDeclare(
            "ex.routing",
            BuiltinExchangeType.DIRECT,
            // 持久的标识
            false,
            // 自动删除的标识
            false,
            // 属性
            null);
    
        // 此也可以声明为临时队列,但是如果消息很重要,不要声明临时队列。
        channel.queueDeclare(
            "log.warn",
            // 永久
            false,
            // 排他
            false,
            // 自动删除
            true,
            // 属性
            null);
    
        //消费者享有绑定到交换器的权力。
        channel.queueBind("log.warn", "ex.routing", "WARN");
    
        // 通过chanel消费消息
        channel.basicConsume(
            "log.warn",
            (consumerTag, message) -> {
              System.out.println("warn收到的消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
            },
            consumerTag -> {});
      }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50

    首先启动三个消费者:

    查看队列及交换机情况

    [root@nullnull-os ~]# rabbitmqctl list_exchanges --formatter pretty_table
    Listing exchanges for vhost / ...
    ┌────────────────────┬─────────┐
    │ name               │ type    │
    ├────────────────────┼─────────┤
    │ amq.fanout         │ fanout  │
    ├────────────────────┼─────────┤
    │ amq.rabbitmq.trace │ topic   │
    ├────────────────────┼─────────┤
    │ amq.headers        │ headers │
    ├────────────────────┼─────────┤
    │ amq.topic          │ topic   │
    ├────────────────────┼─────────┤
    │ amq.direct         │ direct  │
    ├────────────────────┼─────────┤
    │                    │ direct  │
    ├────────────────────┼─────────┤
    │ ex.routing         │ direct  │
    ├────────────────────┼─────────┤
    │ amq.match          │ headers │
    └────────────────────┴─────────┘
    [root@nullnull-os ~]# rabbitmqctl list_bindings --formatter pretty_table
    Listing bindings for vhost /...
    ┌─────────────┬─────────────┬──────────────────┬──────────────────┬─────────────┬───────────┐
    │ source_name │ source_kind │ destination_name │ destination_kind │ routing_key │ arguments │
    ├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
    │             │ exchange    │ log.info         │ queue            │ log.info    │           │
    ├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
    │             │ exchange    │ log.warn         │ queue            │ log.warn    │           │
    ├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
    │             │ exchange    │ log.error        │ queue            │ log.error   │           │
    ├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
    │ ex.routing  │ exchange    │ log.error        │ queue            │ ERROR       │           │
    ├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
    │ ex.routing  │ exchange    │ log.info         │ queue            │ INFO        │           │
    ├─────────────┼─────────────┼──────────────────┼──────────────────┼─────────────┼───────────┤
    │ ex.routing  │ exchange    │ log.warn         │ queue            │ WARN        │           │
    └─────────────┴─────────────┴──────────────────┴──────────────────┴─────────────┴───────────┘
    [root@nullnull-os ~]# 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39

    可以发现,交换器ex.routing 绑定了三个队列log.errorlog.info log.warn并指定了路由键。

    启动消费者,查看消息通否被正常消费。

    ERROR的消费者控制台输出

    ERROR收到的消息:[ERROR] 消息发送 :1
    ERROR收到的消息:[ERROR] 消息发送 :2
    ERROR收到的消息:[ERROR] 消息发送 :6
    ERROR收到的消息:[ERROR] 消息发送 :8
    ERROR收到的消息:[ERROR] 消息发送 :9
    ERROR收到的消息:[ERROR] 消息发送 :11
    ERROR收到的消息:[ERROR] 消息发送 :15
    ERROR收到的消息:[ERROR] 消息发送 :16
    ERROR收到的消息:[ERROR] 消息发送 :19
    ERROR收到的消息:[ERROR] 消息发送 :20
    ERROR收到的消息:[ERROR] 消息发送 :21
    ERROR收到的消息:[ERROR] 消息发送 :23
    ERROR收到的消息:[ERROR] 消息发送 :24
    ERROR收到的消息:[ERROR] 消息发送 :27
    ERROR收到的消息:[ERROR] 消息发送 :28
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    INFO的消费者控制台输出:

    INFO收到的消息:[INFO] 消息发送 :0
    INFO收到的消息:[INFO] 消息发送 :3
    INFO收到的消息:[INFO] 消息发送 :4
    INFO收到的消息:[INFO] 消息发送 :13
    INFO收到的消息:[INFO] 消息发送 :14
    INFO收到的消息:[INFO] 消息发送 :22
    INFO收到的消息:[INFO] 消息发送 :25
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    WARN的消费都控制台输出:

    warn收到的消息:[WARN] 消息发送 :5
    warn收到的消息:[WARN] 消息发送 :7
    warn收到的消息:[WARN] 消息发送 :10
    warn收到的消息:[WARN] 消息发送 :12
    warn收到的消息:[WARN] 消息发送 :17
    warn收到的消息:[WARN] 消息发送 :18
    warn收到的消息:[WARN] 消息发送 :26
    warn收到的消息:[WARN] 消息发送 :29
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    至此,验证已经完成。

  • 相关阅读:
    代码复现错误
    3. 一级缓存解析
    c++入门99题11-20
    我,在日本开密室逃脱,钱还没赚,人进“橘子”了……
    centos安装mysql5.7
    JVM:JIT实时编译器
    智慧公厕:科技赋予公共卫生新生命,提升城市管理品质
    哔哩哔哩面试经验分享
    【QT】信号和槽
    Nvidia AGX Orin MAX9296 GMSL 载板设计要点
  • 原文地址:https://blog.csdn.net/bug_null/article/details/132591454