• 【MQ工作队列模式】


    1 、模式介绍
    Work Queues :与入门程序的简单模式相比,多了一个或一些消费端,
    多个消费端共同消费同一个队列中的消息。
    应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处
    理的速度。
    小结 :
    1 、在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关
    系是 竞争 的关系
    2 Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任
    务处理的速度。例如:短信服务部署多个,
    只需要有一个节点成功发送即可。

     

    2 、代码实现
    Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复
    制,并多复制一个消费者进行多
    个消费者同时对消费消息的测试。
    1 、生产者
    生产者代码 Producer_WorkQueues:
    1. package com.dxw.producer;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. /**
    8. * 生产者:发送消息
    9. */
    10. public class Producer_WorkQueues {
    11. public static void main(String[] args) throws
    12. IOException, TimeoutException {
    13. //1、创建连接工厂
    14. ConnectionFactory factory = new
    15. ConnectionFactory();
    16. //2、设置参数
    17. factory.setHost("localhost");//ip 默认localhost
    18. factory.setPort(5672);//端口 默认5672
    19. factory.setVirtualHost("/dxw");//虚拟机 默认/
    20. factory.setUsername("dxw");//用户名 默认guest
    21. factory.setPassword("1234");//密码 默认guest
    22. //3、创建连接
    23. Connection connection = factory.newConnection();
    24. //4、创建Channel
    25. Channel channel = connection.createChannel();
    26. //5、创建队列
    27. /*
    28. * 参数解释:
    29. * queueDeclare(String queue,
    30. * boolean durable,
    31. * boolean exclusive,
    32. * boolean autoDelete,
    33. * Map arguments)
    34. * 1. queue:队列名称
    35. * 如果没有一个名字叫hello_world的队列,则会创建该队
    36. 列,如果有则不会创建
    37. * 2. durable:是否持久化,当mq重启之后,队列中消息还在
    38. * 3. exclusive:
    39. * 是否独占。只能有一个消费者监听这队列
    40. * 当Connection关闭时,是否删除队列
    41. * 4. autoDelete:是否自动删除。当没有Consumer时,自动
    42. 删除掉
    43. * 5. arguments:参数。
    44. */
    45. channel.queueDeclare("work_queues",true,false,false,null)
    46. ;
    47. //6、发送消息
    48. /*
    49. * 参数解释:
    50. * basicPublish(String exchange,
    51. * String routingKey,
    52. * BasicProperties props,
    53. * byte[] body)
    54. * 1. exchange:交换机名称。简单模式下交换机会使用默认的
    55. ""
    56. * 2. routingKey:路由名称
    57. * 3. props:配置信息
    58. * 4. body:发送消息数据
    59. 启动生产者观察控制台
    60. 2、消费者
    61. 第一个消费者代码Consumer_WorkQueues1:
    62. */
    63. for(int i=1;i<=10;i++){
    64. String body = i+"hello rabbitmq~~~";
    65. channel.basicPublish("","work_queues",null,body.getBytes(
    66. ));
    67. }
    68. //7、释放资源
    69. //channel.close();
    70. //connection.close();
    71. }
    72. }
    启动生产者观察控制台

     

    2 、消费者
    第一个消费者代码 Consumer_WorkQueues1:
    1. package com.dxw.consumer;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * 消费者:接收消息
    7. */
    8. public class Consumer_WorkQueues1 {
    9. public static void main(String[] args) throws
    10. IOException, TimeoutException {
    11. //1、创建连接工厂
    12. ConnectionFactory factory = new
    13. ConnectionFactory();
    14. //2. 设置参数
    15. factory.setHost("localhost");//ip 默认值 localhost
    16. factory.setPort(5672); //端口 默认值 5672
    17. factory.setVirtualHost("/dxw");//虚拟机 默认/
    18. factory.setUsername("dxw");//用户名 默认guest
    19. factory.setPassword("1234");//密码 默认guest
    20. //3. 创建连接 Connection
    21. Connection connection = factory.newConnection();
    22. //4. 创建Channel
    23. Channel channel = connection.createChannel();
    24. //5、创建队列
    25. /*
    26. * 参数解释:
    27. * queueDeclare(String queue,
    28. * boolean durable,
    29. * boolean exclusive,
    30. * boolean autoDelete,
    31. * Map arguments)
    32. * 1. queue:队列名称
    33. * 如果没有一个名字叫hello_world的队列,则会创建该
    34. 队列,如果有则不会创建
    35. * 2. durable:是否持久化,当mq重启之后,队列中消息还在
    36. * 3. exclusive:
    37. * 是否独占。只能有一个消费者监听这队列
    38. * 当Connection关闭时,是否删除队列
    39. * 4. autoDelete:是否自动删除。当没有Consumer时,自
    40. 动删除掉
    41. * 5. arguments:参数。
    42. */
    43. channel.queueDeclare("work_queues",true,false,false,null)
    44. ;
    45. //6、接收消息
    46. Consumer consumer = new DefaultConsumer(channel){
    47. /*
    48. 回调方法,当收到消息后,会自动执行该方法
    49. 1. consumerTag:标识
    50. 2. envelope:获取一些信息,交换机,路由key...
    51. 3. properties:配置信息
    52. 4. body:数据
    53. */
    54. @Override
    55. public void handleDelivery(String consumerTag,
    56. Envelope envelope, AMQP.BasicProperties properties, byte[]
    57. body) throws IOException {
    58. /*System.out.println("consumerTag:"+consumerTag);
    59. System.out.println("Exchange:"+envelope.getExchange());
    60. System.out.println("RoutingKey:"+envelope.getRoutingKey(
    61. ));
    62. System.out.println("properties:"+properties);*/
    63. System.out.println("body:"+new
    64. String(body));
    65. }
    66. };
    67. /*
    68. * 参数解释:
    69. * basicConsume(String queue, boolean autoAck,
    70. Consumer callback)
    71. * 1. queue:队列名称
    72. * 2. autoAck:是否自动确认
    73. * 3. callback:回调对象
    74. */
    75. channel.basicConsume("work_queues",true,consumer);
    76. //关闭资源?不要
    77. }
    78. }
    第二个消费者代码 Consumer_WorkQueues2:
    1. package com.dxw.consumer;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * 消费者:接收消息
    7. */
    8. public class Consumer_WorkQueues2 {
    9. public static void main(String[] args) throws
    10. IOException, TimeoutException {
    11. //1、创建连接工厂
    12. ConnectionFactory factory = new
    13. ConnectionFactory();
    14. //2. 设置参数
    15. factory.setHost("localhost");//ip 默认值 localhost
    16. factory.setPort(5672); //端口 默认值 5672
    17. factory.setVirtualHost("/dxw");//虚拟机 默认/
    18. factory.setUsername("dxw");//用户名 默认guest
    19. factory.setPassword("1234");//密码 默认guest
    20. //3. 创建连接 Connection
    21. Connection connection = factory.newConnection();
    22. //4. 创建Channel
    23. Channel channel = connection.createChannel();
    24. //5、创建队列
    25. /*
    26. * 参数解释:
    27. * queueDeclare(String queue,
    28. * boolean durable,
    29. * boolean exclusive,
    30. * boolean autoDelete,
    31. * Map arguments)
    32. * 1. queue:队列名称
    33. * 如果没有一个名字叫hello_world的队列,则会创建该
    34. 队列,如果有则不会创建
    35. * 2. durable:是否持久化,当mq重启之后,队列中消息还在
    36. * 3. exclusive:
    37. * 是否独占。只能有一个消费者监听这队列
    38. * 当Connection关闭时,是否删除队列
    39. * 4. autoDelete:是否自动删除。当没有Consumer时,自
    40. 动删除掉
    41. * 5. arguments:参数。
    42. */
    43. channel.queueDeclare("work_queues",true,false,false,null)
    44. ;
    45. //6、接收消息
    46. Consumer consumer = new DefaultConsumer(channel){
    47. /*
    48. 回调方法,当收到消息后,会自动执行该方法
    49. 1. consumerTag:标识
    50. 2. envelope:获取一些信息,交换机,路由key...
    51. 3. properties:配置信息
    52. 4. body:数据
    53. */
    54. @Override
    55. public void handleDelivery(String consumerTag,
    56. Envelope envelope, AMQP.BasicProperties properties, byte[]
    57. body) throws IOException {
    58. /*System.out.println("consumerTag:"+consumerTag);
    59. System.out.println("Exchange:"+envelope.getExchange());
    60. System.out.println("RoutingKey:"+envelope.getRoutingKey(
    61. ));
    62. System.out.println("properties:"+properties);*/
    63. System.out.println("body:"+new
    64. String(body));
    65. }
    66. };
    67. /*
    68. * 参数解释:
    69. * basicConsume(String queue, boolean autoAck,
    70. Consumer callback)
    71. * 1. queue:队列名称
    72. * 2. autoAck:是否自动确认
    73. * 3. callback:回调对象
    74. 注意:先启动两个消费者,然后再启动生产者,然后观察两个生产者控制台输出
    75. 3、Pub/Sub订阅模式
    76. 1、模式介绍
    77. 在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
    78. ⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是
    79. 发给X(交换机)
    80. ⚫ C:消费者,消息的接收者,会一直等待消息到来
    81. ⚫ Queue:消息队列,接收消息、缓存消息
    82. ⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方
    83. 面,知道如何处理消息,例如递交给某个特别队 列、递交给所有队列、
    84. */
    85. channel.basicConsume("work_queues",true,consumer);
    86. //关闭资源?不要
    87. }
    88. }
    注意 : 先启动两个消费者 , 然后再启动生产者 , 然后观察两个生产者控制台输出

     

  • 相关阅读:
    7个设计师必备的Figma汉化插件,高效设计超简单!
    pmp考试如何复习
    Python例题练习1
    对象、数组深拷贝与浅拷贝
    hello world的本质是什么?
    Web前端—移动Web第一天(平面转换、渐变、综合案例--播客网页设计)
    HbuildX使用、HBuilder X 快捷键
    java集合类史上最细讲解 - HashMap篇
    DS5上ARM编译器样例工程改为GCC编译
    基于Python实现的神经网络分类MNIST数据集
  • 原文地址:https://blog.csdn.net/m0_72254454/article/details/127908512