• RabbitMQ初步到精通-第四章-RabbitMQ工作模式-PUB/SUB


    第四章-RabbitMQ工作模式-PUB/SUB

    1.模式介绍

    1.1 模式

    此模式称为发布订阅模式,从此模式开始,我们就不再使用默认的交换机了,开始定义我们自己的交换机。

    此发布订阅模式,使用的交换机类型为Fanout。定义好交换机,消息的传输路径变为,从Producer发出,发送至Fanout类型的交换机,交换机将消息分别推送给和自己绑定的Queue上,

    消费者再去消费对应的Queue,此处与前面模式一致。

     1.2 应用场景

    此种模式用在什么场景呢,凡是场景中触发一个动作后,后面的流程都需要根据此动作做出反应,而后面的流程一般>1个,流程之间也没有先后顺序,还需要快速执行,即可以使用此模式。

    举个栗子:

    1. 用户注册成功后发送通知,既要发短信,又发邮件,还发站内信,发信息这些流程也不存在先后顺序,即可以使用这种模式。用户注册后,推送消息到mq, 发短信、发邮件、发站内信分别去消费消息实现信息发送。

    2. 支付系统中用户支付成功后,后续要有一系列动作,例如要上B端账,调风控、结算、发消息等逻辑,也同样可以使用此模式应用其中。

    1.2 模拟

    首先注意到我们的Exchange 类型 是 fanout,

    其次在发送的时候 routingkey -填与不填都不再影响 Exchange的分发路由了。

     

    2.验证代码

    还是举例 小明洗澡的例子。小明一天在看科幻小说,突然睡着了,梦里梦见自己研究出了一种热水器转换开关,这种开关可以将流过的水复制一份。心想太棒了,这以后能省不少水。

    于是改造了下自己的太阳能热水器,热水器内部变成了两个水槽,加入此转换器后,分别给两个水槽上水,水管出水1L,两个水槽都能充满1L。改造成功后,便开始拉着自己的女朋友开始了洗澡实验。

    2.1 生产者

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 发布订阅模式一个生产者,一个交换机,两个队列,两个消费者
    5. * 声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定规则直接绑定。
    6. * 生产者创建一个exchange并且指定类型,和一个或多个队列绑定在一起。当生产者发送消息是会发送到exchange中,再由exchange到绑定的队列中
    7. * @createTime 2022/07/27 19:34:00
    8. */
    9. public class WaterProducer {
    10. public static final String PUBSUB_QUEUE_1 = "SolarWaterHeaterXM";
    11. public static final String PUBSUB_QUEUE_2 = "SolarWaterHeaterXL";
    12. //生产者
    13. public static void main(String[] args) throws Exception {
    14. //1、获取connection
    15. Connection connection = RabbitCommonConfig.getConnection();
    16. //2、创建channel
    17. Channel channel = connection.createChannel();
    18. for (int i = 1; i <= 10; i++) {
    19. sendMsg(channel, i);
    20. Thread.sleep(100);
    21. }
    22. //4、关闭管道和连接
    23. channel.close();
    24. connection.close();
    25. }
    26. private static void sendMsg(Channel channel, int k) throws IOException {
    27. //3、通过channel创建自己的exchange 并且绑定队列
    28. /**
    29. * 参数1:exchange的名称
    30. * 参数2:指定exchange的类型
    31. * FANOUT-Publish/Subscribe
    32. * DIRECT-Routing
    33. * TOPIC-Topics
    34. */
    35. channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);
    36. channel.queueBind(PUBSUB_QUEUE_1, "publish-exchange", "");
    37. channel.queueBind(PUBSUB_QUEUE_2, "publish-exchange", "");
    38. //4、发送消息到exchange
    39. String msg = k + "升";
    40. /**
    41. * 参数1:指定exchange,使用“”。默认的exchange
    42. * 参数2:指定路由的规则,使用具体的队列名称。exchange为""时,消息直接发送到队列中
    43. * 参数3:指定传递的消息携带的properties
    44. * 参数4:指定传递的消息,byte[]类型
    45. */
    46. channel.basicPublish("publish-exchange", "", null, msg.getBytes());
    47. System.out.println("水龙头放水成功!" + k + "升");
    48. }
    49. }

    2.2 消费者

    小明:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 发布订阅模式 一个生产者,一个交换机,两个队列,两个消费者
    5. * @createTime 2022/07/27 19:36:00
    6. */
    7. public class XMShowerConsumer {
    8. public static final String PUBSUB_QUEUE_1 = "SolarWaterHeaterXM";
    9. //消费者
    10. public static void main(String[] args) throws Exception {
    11. //1、获取连对象、
    12. Connection connection = RabbitCommonConfig.getConnection();
    13. //2、创建channel
    14. Channel channel = connection.createChannel();
    15. channel.basicQos(1);
    16. //3、创建队列
    17. /**
    18. * 参数1:queue 指定队列名称
    19. * 参数2:durable 是否开启持久化(true)
    20. * 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)
    21. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    22. * 参数5:arguments 指定队列携带的信息
    23. */
    24. channel.queueDeclare(PUBSUB_QUEUE_1, true, false, false, null);
    25. //4.开启监听Queue
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29. System.out.println("小明洗澡用水: " + new String(body, "UTF-8"));
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. //手动ACK(接收信息,指定是否批量操作)
    36. channel.basicAck(envelope.getDeliveryTag(),false);
    37. }
    38. };
    39. /**
    40. * 参数1:queue 指定消费哪个队列
    41. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    42. * 参数1:cancelCallback 指定消费回调
    43. *
    44. */
    45. //3.关闭自动ACK
    46. channel.basicConsume(PUBSUB_QUEUE_1,false,consumer);
    47. System.out.println("小明使用热水器中的XM水槽开始洗澡......");
    48. //5、键盘录入,让程序不结束!
    49. System.in.read();
    50. //6、释放资源
    51. channel.close();
    52. connection.close();
    53. }
    54. }

    小丽:

    1. /**
    2. * @author rabbit
    3. * @version 1.0.0
    4. * @Description 发布订阅模式 一个生产者,一个交换机,两个队列,两个消费者
    5. * @createTime 2022/07/27 19:36:00
    6. */
    7. public class XLShowerConsumer {
    8. public static final String PUBSUB_QUEUE_2 = "SolarWaterHeaterXL";
    9. //消费者
    10. public static void main(String[] args) throws Exception {
    11. //1、获取连对象、
    12. Connection connection = RabbitCommonConfig.getConnection();
    13. //2、创建channel
    14. Channel channel = connection.createChannel();
    15. channel.basicQos(1);
    16. //3、创建队列-helloworld
    17. /**
    18. * 参数1:queue 指定队列名称
    19. * 参数2:durable 是否开启持久化(true)
    20. * 参数3:exclusive 是否排外(conn.close()-》当前对列自动删除,当前队列只能被一个 消费者消费)
    21. * 参数4:autoDelete 如果这个队列没有其他消费者在消费,队列自动删除
    22. * 参数5:arguments 指定队列携带的信息
    23. */
    24. channel.queueDeclare(PUBSUB_QUEUE_2, true, false, false, null);
    25. //4.开启监听Queue
    26. DefaultConsumer consumer = new DefaultConsumer(channel) {
    27. @Override
    28. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    29. System.out.println("小丽洗澡用水: " + new String(body, "UTF-8"));
    30. try {
    31. Thread.sleep(1000);
    32. } catch (InterruptedException e) {
    33. e.printStackTrace();
    34. }
    35. //手动ACK(接收信息,指定是否批量操作)
    36. channel.basicAck(envelope.getDeliveryTag(), false);
    37. }
    38. };
    39. /**
    40. * 参数1:queue 指定消费哪个队列
    41. * 参数1:deliverCallback 指定是否ACK(true:收到消息会立即告诉RabbiMQ,false:手动告诉)
    42. * 参数1:cancelCallback 指定消费回调
    43. *
    44. */
    45. //3.关闭自动ACK
    46. channel.basicConsume(PUBSUB_QUEUE_2, false, consumer);
    47. System.out.println("小丽使用热水器中的XL水槽开始洗澡......");
    48. //5、键盘录入,让程序不结束!
    49. System.in.read();
    50. //6、释放资源
    51. channel.close();
    52. connection.close();
    53. }
    54. }

    2.3 结果

    生产者:

    1. 水龙头放水成功!1
    2. 水龙头放水成功!2
    3. 水龙头放水成功!3
    4. 水龙头放水成功!4
    5. 水龙头放水成功!5
    6. 水龙头放水成功!6
    7. 水龙头放水成功!7
    8. 水龙头放水成功!8
    9. 水龙头放水成功!9
    10. 水龙头放水成功!10

    消费者小明:

    1. 小明洗澡用水: 1
    2. 小明洗澡用水: 2
    3. 小明洗澡用水: 3
    4. 小明洗澡用水: 4
    5. 小明洗澡用水: 5
    6. 小明洗澡用水: 6
    7. 小明洗澡用水: 7
    8. 小明洗澡用水: 8
    9. 小明洗澡用水: 9
    10. 小明洗澡用水: 10

    消费者小丽:

    1. 小丽洗澡用水: 1
    2. 小丽洗澡用水: 2
    3. 小丽洗澡用水: 3
    4. 小丽洗澡用水: 4
    5. 小丽洗澡用水: 5
    6. 小丽洗澡用水: 6
    7. 小丽洗澡用水: 7
    8. 小丽洗澡用水: 8
    9. 小丽洗澡用水: 9
    10. 小丽洗澡用水: 10

    最终实现了,生产者出水10L,两个人都使用了10L水洗了澡,刚洗完,小明就睡醒了,原来是一个梦。

    3. 总结

    从此模式开始,我们接触到了Exchange的创建,及绑定,以及使用了Exchange的类型 Fanout。

    而以前我们在simple模式及work模式中,用到的默认Exchange类型都是Direct类型。

    核心代码-声明Exchange:

    channel.exchangeDeclare("publish-exchange", BuiltinExchangeType.FANOUT);

    核心代码-绑定queue

    channel.queueBind(PUBSUB_QUEUE_1, "publish-exchange", "");

    核心代码-发送给Fanout的Exchange

    channel.basicPublish("publish-exchange", "", null, msg.getBytes());

  • 相关阅读:
    MATLAB 多信号显示方案
    java 转换excel日期格式
    搭建分布式事务组件 seata 的Server 端和Client 端详解(小白都能看懂)
    c# 逆变 / 协变
    Java多线程-----线程安全
    【Android】AndroidStudio自动下载的Gradle大东西从系统盘里怎样转移出去
    .net core 读取 appsettings.json 值
    this.$emit使用方法【前端技术】
    LP Wizard生成带不规则焊盘的封装
    Video generation models as world simulators-视频生成模型作为世界模拟器
  • 原文地址:https://blog.csdn.net/blucastle/article/details/127929285