• RabbitMQ初步到精通-第四章-RabbitMQ工作模式-Routing


    第四章-RabbitMQ工作模式-Routing

    1.模式介绍

    1.1 模式

    路由模式-继续还是和Exchange打交道,上节提到的Exchange类型为Fanout,此次声明的类型为direct 与默认Exchange一致。但还有最核心的一点,上节未使用Routing key,此次模式中,会依赖于此Key。

    如下图:

    消费发送的时候,会指定routing key ,例如指定 black,那么消费发送至 X -则会根据routing key路由消息至队列Q2,被消费者C2进行消费。

    若此时有个key ,比如blue 同时指向了 Q1 和 Q2,那消息也会同时传送到Q1与Q2,这与广播模式很像了。

    1.2 场景

    业务中需要根据不同的条件对所产生的消息进行不同的独立消费,可以使用此模式。

    例如系统中产生不同level的日志,根据level的不同上送到不同的queue,进行相应的消费。

    info类型指定 info的queue,debug类型指定debug的queue,error类型的指定error的queue

    实际运用不多,有实际生产运用的小伙伴可以留言,学习下。

    1.3 模拟

    三个重点 1.声明direct类型的 交换机 2. 绑定好交换机与queue 3. 发消息指定路由键

    2.代码验证

    梦总会醒的,小明觉得复制水,不现实,搞个现实的开关吧。因为有天小丽对他说,洗水澡很没意思,想洗牛奶澡,要小明一定帮她实现。小明自己想了想觉得可以试试,但自己也想搞个新奇的,不喜欢牛奶,喜欢红酒,洗个红酒澡吧,一边洗,一边喝。醉洗 -- o(* ̄︶ ̄*)o。

    2.1 生产者

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 一个生产者,一个交换机,两个队列,两个消费者
    5. * 声明一个DIRECT类型的exchange,并且根据Routingkey绑定指定的队列
    6. * 生产者在创建DIRECT类型的exchange后绑定相应的队列,并且指定Routingkey。在发送消息是也要指定消息的Routingkey
    7. *

    8. * 另:TOPIC模式
    9. * 生产者创建Topic的exchange并且并且指定队列,这次绑定可以通过*和#匹配关键字,对指定RoutingKey内容进行匹配。
    10. * *(星号)可以代替一个单词。
    11. * #(哈希)可以替代零个或多个单词。
    12. * channel.queueBind("topics-queue-1", "topics-exchange", "zhang.*");
    13. * channel.basicPublish("topics-exchange", "zhang.sna", null, "张三".getBytes());
    14. * @createTime 2022/07/27 19:34:00
    15. */
    16. public class WaterProducer {
    17. public static final String PUBSUB_QUEUE_1 = "SolarWaterHeater-RedWine";
    18. public static final String PUBSUB_QUEUE_2 = "SolarWaterHeater-Milk";
    19. //生产者
    20. public static void main(String[] args) throws Exception {
    21. //1、获取connection
    22. Connection connection = RabbitCommonConfig.getConnection();
    23. //2、创建channel
    24. Channel channel = connection.createChannel();
    25. for (int i = 1; i <= 10; i++) {
    26. sendMsg(channel, i);
    27. Thread.sleep(1000);
    28. }
    29. //4、关闭管道和连接
    30. channel.close();
    31. connection.close();
    32. }
    33. private static void sendMsg(Channel channel, int k) throws IOException {
    34. //3、创建exchange并且指定类型
    35. channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
    36. //4、绑定队列 routing-queue-error routing-queue-info
    37. channel.queueBind(PUBSUB_QUEUE_2, "routing-exchange", "MILK");
    38. channel.queueBind(PUBSUB_QUEUE_1, "routing-exchange", "REDWINE");
    39. /**
    40. * 参数1:指定exchange,使用“”。默认的exchange
    41. * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
    42. * 参数3:指定传递的消息携带的properties
    43. * 参数4:指定传递的消息,byte[]类型
    44. */
    45. //5、发送消息并且指定接收的队列的routingkey
    46. channel.basicPublish("routing-exchange", "MILK", null, ("牛奶-" + k+"升").getBytes());
    47. channel.basicPublish("routing-exchange", "REDWINE", null, ("红酒-" + k+"升").getBytes());
    48. System.out.println("水龙头分别放牛奶和红酒成功!" + k + "升");
    49. }
    50. }

    2.2 消费者

    小明:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description Routing 模式 一个生产者,一个交换机,两个队列,两个消费者
    5. * @createTime 2022/07/27 19:36:00
    6. */
    7. public class XMShowerConsumer {
    8. public static final String PUBSUB_QUEUE_1 = "SolarWaterHeater-RedWine";
    9. //消费者
    10. public static void consumer() throws Exception {
    11. //1、获取连对象、
    12. Connection connection = RabbitCommonConfig.getConnection();
    13. //2、创建channel
    14. Channel channel = connection.createChannel();
    15. channel.basicQos(1);
    16. //3、创建队列-helloworld
    17. /**
    18. * 参数1:queue 指定队列名称
    19. * 参数2:durable 是否开启持久化(true)
    20. * 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)
    21. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    22. * 参数5:arguments 指定队列携带的信息
    23. */
    24. channel.queueDeclare(PUBSUB_QUEUE_1, true, false, false, null);
    25. //4.开启监听Queue
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29. System.out.println("小明洗澡用: " + new String(body, "UTF-8"));
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. //手动ACK(接收信息,指定是否批量操作)
    36. channel.basicAck(envelope.getDeliveryTag(), false);
    37. }
    38. };
    39. /**
    40. * 参数1:queue 指定消费哪个队列
    41. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    42. * 参数1:cancelCallback 指定消费回调
    43. *
    44. */
    45. //3.关闭自动ACK
    46. channel.basicConsume(PUBSUB_QUEUE_1, false, consumer);
    47. System.out.println("小明开始洗红酒澡......");
    48. //5、键盘录入,让程序不结束!
    49. System.in.read();
    50. //6、释放资源
    51. channel.close();
    52. connection.close();
    53. }
    54. public static void main(String[] args) throws Exception {
    55. XMShowerConsumer.consumer();
    56. }
    57. }

    小丽:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description Routing 模式 一个生产者,一个交换机,两个队列,两个消费者
    5. * @createTime 2022/07/27 19:36:00
    6. */
    7. public class XLShowerConsumer {
    8. public static final String PUBSUB_QUEUE_2 = "SolarWaterHeater-Milk";
    9. //消费者
    10. public static void consumer() throws Exception {
    11. //1、获取连对象、
    12. Connection connection = RabbitCommonConfig.getConnection();
    13. //2、创建channel
    14. Channel channel = connection.createChannel();
    15. channel.basicQos(1);
    16. //3、创建队列-helloworld
    17. /**
    18. * 参数1:queue 指定队列名称
    19. * 参数2:durable 是否开启持久化(true)
    20. * 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)
    21. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    22. * 参数5:arguments 指定队列携带的信息
    23. */
    24. channel.queueDeclare(PUBSUB_QUEUE_2, true, false, false, null);
    25. //4.开启监听Queue
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29. System.out.println("小丽洗澡用: " + new String(body, "UTF-8"));
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. //手动ACK(接收信息,指定是否批量操作)
    36. channel.basicAck(envelope.getDeliveryTag(), false);
    37. }
    38. };
    39. /**
    40. * 参数1:queue 指定消费哪个队列
    41. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    42. * 参数1:cancelCallback 指定消费回调
    43. *
    44. */
    45. //3.关闭自动ACK
    46. channel.basicConsume(PUBSUB_QUEUE_2, false, consumer);
    47. System.out.println("小丽开始洗牛奶澡......");
    48. //5、键盘录入,让程序不结束!
    49. System.in.read();
    50. //6、释放资源
    51. channel.close();
    52. connection.close();
    53. }
    54. public static void main(String[] args) throws Exception {
    55. XLShowerConsumer.consumer();
    56. }
    57. }

    2.3 结论验证

    生产者:

    1. 水龙头分别放牛奶和红酒成功!1
    2. 水龙头分别放牛奶和红酒成功!2
    3. 水龙头分别放牛奶和红酒成功!3
    4. 水龙头分别放牛奶和红酒成功!4
    5. 水龙头分别放牛奶和红酒成功!5
    6. 水龙头分别放牛奶和红酒成功!6
    7. 水龙头分别放牛奶和红酒成功!7
    8. 水龙头分别放牛奶和红酒成功!8
    9. 水龙头分别放牛奶和红酒成功!9
    10. 水龙头分别放牛奶和红酒成功!10

    小明:

    1. 小明洗澡用: 红酒-1
    2. 小明洗澡用: 红酒-2
    3. 小明洗澡用: 红酒-3
    4. 小明洗澡用: 红酒-4
    5. 小明洗澡用: 红酒-5
    6. 小明洗澡用: 红酒-6
    7. 小明洗澡用: 红酒-7
    8. 小明洗澡用: 红酒-8
    9. 小明洗澡用: 红酒-9
    10. 小明洗澡用: 红酒-10

    小丽

    1. 小丽洗澡用: 牛奶-1
    2. 小丽洗澡用: 牛奶-2
    3. 小丽洗澡用: 牛奶-3
    4. 小丽洗澡用: 牛奶-4
    5. 小丽洗澡用: 牛奶-5
    6. 小丽洗澡用: 牛奶-6
    7. 小丽洗澡用: 牛奶-7
    8. 小丽洗澡用: 牛奶-8
    9. 小丽洗澡用: 牛奶-9
    10. 小丽洗澡用: 牛奶-10

    通过开关控制,分别给热水器里注入红酒和牛奶,红酒和牛奶存储到各自独立的水槽中,供两个人洗澡使用。

    3.总结

    核心点:

    3.1 声明Direct 类型的Exchange

    3.2 绑定好对应的 Queue

    3.3 发送消息指定 Routing key

    核心代码:

    channel.exchangeDeclare("routing-exchange", BuiltinExchangeType.DIRECT);
    channel.queueBind(PUBSUB_QUEUE_2, "routing-exchange", "MILK");
    channel.basicPublish("routing-exchange", "MILK", null, ("牛奶-" + k+"升").getBytes());

    另:TOPIC模式

    topic模式不再单独展开

    topic模式最核心的内容是引入了通配符 * ,# 

    * *(星号)可以代替一个单词。 
    * #(哈希)可以替代零个或多个单词。
    

    另外 声明的 Exchage 类型为topic 。

    看下模拟情况:

    1.首先声明Topic类型的 交换机

    2.使用带通配符的 routing key 做好绑定关系

    3. 发消息时,指定好对应的routing key ,按预期会将消息流转到对应的queue中。

  • 相关阅读:
    Python+Pytest+Allure+Yaml+Jenkins+GitLab接口自动化测试框架详解
    ensp实操浮动静态路由
    Pinpoint--基础--02--架构设计
    分布式数据库Mongodb——实验一
    UE5中一机一码功能
    Hello World分析
    OpenMP 入门
    web前端期末大作业——HTML+CSS+JavaScript仿王者荣耀游戏网站设计与制作
    图像超分综述:超长文一网打尽图像超分的前世今生 (附核心代码)
    CPU的睿频、超线程、SIMD指令集等特性对密码算法性能的影响
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127929796