• RabbitMQ 一文读懂


    目录

    1、RabbitMQ 介绍

    应用场景

    其他消息队列

    选择RabbitMQ 原因

    2、AMQP消息队列其他相关知识

    什么是AMQP?

    什么是JMS?

    3、RabbitMQ 快速入门

    RabbitMQ的工作原理

    RabbitMQ 消息发送和接受流程梳理

    RabbitMQ 消息发送

    RabbitMQ 消息接受

    RabbitMQ 安装

    RabbitMQ 之Hello World

    第一步:创建Maven项目,添加RabbitMQ 消息队列依赖。

    第二步:创建RabbitMQ 消息生产者

    第三步: 创建RabbitMQ 消息消费者

    4、RabbitMQ 支持工作模式

    Work queues模式

    Publish/subscribe 模式

    Routing 路由模式

    Topics 模式

    Header 模式 

    RPC 模式

    5、RabbitMQ 高级特性

    RabbitMQ 消息有效期

    默认情况

    TTL(消息存活时间)

    TTL之单条消息过期

    TTL之队列消息过期

    六、RabbitMQ 死信队列

    RabbitMQ延迟队列

    RabbitMQ延迟队列实现延迟队列方式:

    RabbitMQ 集成插件实现

    基于插件实现延迟队列

    基于DXL实现延迟队列

    七、RabbitMQ 发送可靠性

    RabbitMQ 消息发送机制

     RabbitMQ 提供解决方案

      RabbitMQ 开启事务

    RabbitMQ 发送确认机制

    RabbitMQ 失败重试

    自带重试机制

    业务重试

     八、RabbitMQ 消费可靠性

    两种消费思路

    推(push)方式

    拉(pull)方式

    确保消费成功思路

    消息拒绝

    消息确认

    消息确认之自动确认

    消息确认之手动确认

     幂等性问题


    1、RabbitMQ 介绍

    MQ 全称为 Message Queue ,即消息队列, RabbitMQ是由 erlang 语言开发,基于 AMQP Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ 官方地址: http://www.rabbitmq.com/

    应用场景

    1 、任务异步处理。
            将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。提高了应用程序的响应时间。
    2 、应用程序解耦合
            MQ相当于一个中介,生产方通过 MQ 与消费方交互,它将应用程序进行解耦合。

    其他消息队列

    ActiveMQ RabbitMQ ZeroMQ Kafka MetaMQ RocketMQ Redis

    选择RabbitMQ 原因

    1 、使得简单,功能强大。
    2 、基于 AMQP 协议。
    3 、社区活跃,文档完善。
    4 、高并发性能好,这主要得益于 Erlang 语言。
    5 Spring Boot 默认已集成 RabbitMQ

    2、AMQP消息队列其他相关知识

    什么是AMQP?

            AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

    什么是JMS?

            JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    3、RabbitMQ 快速入门

    RabbitMQ的工作原理

    组成部分说明如下:
    • Broker:消息队列服务进程,此进程包括两个部分:ExchangeQueue
    • Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
    • Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的消费方。
    • Producer:消息生产者,即生产方客户端,生产方客户端将消息发送到MQ
    • Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

    RabbitMQ 消息发送和接受流程梳理

    RabbitMQ 消息发送

    1 、生产者和 Broker 建立 TCP 连接。
    2 、生产者和 Broker 建立通道。
    3 、生产者通过通道消息发送给 Broker ,由 Exchange 将消息进行转发。
    4 Exchange 将消息转发到指定的 Queue (队列)

    RabbitMQ 消息接受

    1 、消费者和 Broker 建立 TCP 连接
    2 、消费者和 Broker 建立通道
    3 、消费者监听指定的 Queue (队列)
    4 、当有消息到达 Queue Broker 默认将消息推送给消费者。
    5 、消费者接收到消息。

    RabbitMQ 安装

    请参考:Docker 安装RabbitMQ

    RabbitMQ 之Hello World

    第一步:创建Maven项目,添加RabbitMQ 消息队列依赖。

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-amqpartifactId>
    4. dependency>

    第二步:创建RabbitMQ 消息生产者

    1. package com.zzg.rabbitmq;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. public class ProduceMS {
    8. //队列名称
    9. private static final String QUEUE = "helloworld";
    10. public static void main(String[] args) throws IOException, TimeoutException {
    11. Connection connection = null;
    12. Channel channel = null;
    13. ConnectionFactory factory = new ConnectionFactory();
    14. factory.setHost("192.168.43.10");
    15. factory.setPort(5672);
    16. factory.setUsername("guest");
    17. factory.setPassword("guest");
    18. factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
    19. //创建与RabbitMQ服务的TCP连接
    20. connection = factory.newConnection();
    21. //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
    22. channel = connection.createChannel();
    23. /*** 声明队列,如果Rabbit中没有此队列将自动创建
    24. * param1:队列名称
    25. * param2:是否持久化
    26. * param3:队列是否独占此连接
    27. * param4:队列不再使用时是否自动删除此队列
    28. * param5:队列参数
    29. * */
    30. channel.queueDeclare(QUEUE, true, false, false, null);
    31. String message = "Hello World RabbitMQ" + System.currentTimeMillis();
    32. /*** 消息发布方法
    33. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    34. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    35. * param3:消息包含的属性
    36. * param4:消息体
    37. * */
    38. /*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 示绑定或解除绑定
    39. * 默认的交换机,routingKey等于队列名称
    40. * */
    41. channel.basicPublish("", QUEUE, null, message.getBytes());
    42. System.out.println("Send Message is:'" + message + "'");
    43. if (channel != null) {
    44. channel.close();
    45. }
    46. if (connection != null) {
    47. connection.close();
    48. }
    49. }
    50. }

    第三步: 创建RabbitMQ 消息消费者

    1. package com.zzg.rabbitmq;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class ConsumerMS {
    6. private static final String QUEUE = "helloworld";
    7. public static void main(String[] args) throws IOException, TimeoutException {
    8. ConnectionFactory factory = new ConnectionFactory();
    9. //设置RabbitMQ所在服务器的ip和端口
    10. factory.setHost("192.168.43.10");
    11. factory.setPort(5672);
    12. Connection connection = factory.newConnection();
    13. Channel channel = connection.createChannel();
    14. //声明队列
    15. channel.queueDeclare(QUEUE, true, false, false, null);
    16. //定义消费方法
    17. DefaultConsumer consumer = new DefaultConsumer(channel) {
    18. /*** 消费者接收消息调用此方法
    19. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    20. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
    21. * @param properties
    22. * @param body
    23. * @throws IOException
    24. */
    25. @Override
    26. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    27. //交换机
    28. String exchange = envelope.getExchange();
    29. //路由key
    30. String routingKey = envelope.getRoutingKey();
    31. //消息id
    32. long deliveryTag = envelope.getDeliveryTag();
    33. //消息内容
    34. String msg = new String(body, "utf-8");
    35. System.out.println("receive message.." + msg);
    36. }
    37. };
    38. /*** 监听队列String queue, boolean autoAck,Consumer callback
    39. * 参数明细
    40. * 1、队列名称
    41. * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复
    42. * 3、消费消息的方法,消费者接收到消息后调用此方法
    43. */
    44. channel.basicConsume(QUEUE, true, consumer);
    45. }
    46. }

    4、RabbitMQ 支持工作模式

    RabbitMQ 有以下几种工作模式 :
    • Work queues
    • Publish/Subscribe
    • Routing
    • Topics
    • Header
    • RPC

    Work queues模式

            多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者;

    缺少图片

    特点:

    1、一条消息只会被一个消费端接收;

    2、队列采用轮询的方式将消息是平均发送给消费者的;

    3、消费者在处理完某条消息后,才会收到下一条消息

    Work Queues模式之生产者

    1. package com.zzg.rabbitmq.work.queues;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. /*
    8. 生产者核心步骤
    9. 1、声明队列
    10. 2、创建连接
    11. 3、创建通道
    12. 4、通道声明队列
    13. 5、制定消息
    14. 6、发送消息,使用默认交换机
    15. */
    16. public class WorkQueueProduceMS {
    17. //声明队列
    18. private static final String QUEUE = "queue";
    19. public static void main(String[] args) {
    20. Connection connection = null;
    21. Channel channel = null;
    22. try {
    23. ConnectionFactory connectionFactory = new ConnectionFactory();
    24. connectionFactory.setHost("192.168.43.10");//mq服务ip地址
    25. connectionFactory.setPort(5672);//mq client连接端口
    26. connectionFactory.setUsername("guest");//mq登录用户名
    27. connectionFactory.setPassword("guest");//mq登录密码
    28. connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
    29. //创建与RabbitMQ服务的TCP连接
    30. connection = connectionFactory.newConnection();
    31. //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
    32. channel = connection.createChannel();
    33. //通道绑定队列
    34. /**
    35. * 声明队列,如果Rabbit中没有此队列将自动创建
    36. * param1:队列名称
    37. * param2:是否持久化
    38. * param3:队列是否独占此连接
    39. * param4:队列不再使用时是否自动删除此队列
    40. * param5:队列参数
    41. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    42. *
    43. */
    44. channel.queueDeclare(QUEUE, true, false, false, null);//通道绑定邮件队列
    45. for (int i = 0; i < 10; i++) {
    46. String message = new String("mq 发送消息。。。");
    47. /**
    48. * 消息发布方法
    49. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    50. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    51. * param3:消息包含的属性
    52. * param4:消息体
    53. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    54. * 默认的交换机,routingKey等于队列名称
    55. */
    56. //String exchange, String routingKey, BasicProperties props, byte[] body
    57. channel.basicPublish("", QUEUE, null, message.getBytes("utf-8"));
    58. System.out.println("mq消息发送成功!");
    59. }
    60. } catch (Exception e) {
    61. e.printStackTrace();
    62. } finally {
    63. try {
    64. channel.close();
    65. } catch (IOException e) {
    66. e.printStackTrace();
    67. } catch (TimeoutException e) {
    68. e.printStackTrace();
    69. }
    70. try {
    71. connection.close();
    72. } catch (IOException e) {
    73. e.printStackTrace();
    74. }
    75. }
    76. }
    77. }

    Work Queues模式之消费者

    1. package com.zzg.rabbitmq.work.queues;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /*
    6. 消费者核心步骤
    7. 1、声明队列
    8. 2、创建连接
    9. 3、创建通道
    10. 4、通道声明队列
    11. 5、重写消息消费方法
    12. 6、执行消息方法
    13. */
    14. public class WorkQueueConsumerMS {
    15. private static final String QUEUE = "queue";
    16. public static void main(String[] args) {
    17. Connection connection = null;
    18. Channel channel = null;
    19. try {
    20. ConnectionFactory connectionFactory = new ConnectionFactory();
    21. connectionFactory.setHost("192.168.43.10");
    22. connectionFactory.setPort(5672);
    23. connection = connectionFactory.newConnection();
    24. channel = connection.createChannel();
    25. //通道绑定队列
    26. /**
    27. * 声明队列,如果Rabbit中没有此队列将自动创建
    28. * param1:队列名称
    29. * param2:是否持久化
    30. * param3:队列是否独占此连接
    31. * param4:队列不再使用时是否自动删除此队列
    32. * param5:队列参数
    33. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    34. *
    35. */
    36. channel.queueDeclare(QUEUE, true, false, false, null);//通道绑定邮件队列
    37. //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    38. DefaultConsumer consumer = new DefaultConsumer(channel) {
    39. /**
    40. * 消费者接收消息调用此方法
    41. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    42. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    43. (收到消息失败后是否需要重新发送)
    44. * @param properties
    45. * @param body
    46. * @throws IOException
    47. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    48. */
    49. @Override
    50. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    51. //交换机
    52. String exchange = envelope.getExchange();
    53. //路由key
    54. String routingKey = envelope.getRoutingKey();
    55. envelope.getDeliveryTag();
    56. String msg = new String(body, "utf-8");
    57. System.out.println("mq收到的消息是:" + msg);
    58. }
    59. };
    60. System.out.println("消费者启动成功!");
    61. channel.basicConsume(QUEUE, true, consumer);
    62. } catch (IOException e) {
    63. e.printStackTrace();
    64. } catch (TimeoutException e) {
    65. e.printStackTrace();
    66. }
    67. }
    68. }

    生产端启动后,控制台打印信息如下:

    1. mq消息发送成功!
    2. mq消息发送成功!
    3. mq消息发送成功!
    4. mq消息发送成功!
    5. mq消息发送成功!
    6. mq消息发送成功!
    7. mq消息发送成功!
    8. mq消息发送成功!
    9. mq消息发送成功!

    RabbitMQ中的已有消息:

    缺失图片

    消费端启动后,控制台打印信息如下:

    1. 消费者启动成功!
    2. mq收到的消息是:mq 发送消息。。。
    3. mq收到的消息是:mq 发送消息。。。
    4. mq收到的消息是:mq 发送消息。。。
    5. mq收到的消息是:mq 发送消息。。。
    6. mq收到的消息是:mq 发送消息。。。
    7. mq收到的消息是:mq 发送消息。。。
    8. mq收到的消息是:mq 发送消息。。。
    9. mq收到的消息是:mq 发送消息。。。
    10. mq收到的消息是:mq 发送消息。。。
    11. mq收到的消息是:mq 发送消息。。。

    Publish/subscribe 模式

            这种模式又称为发布订阅模式,相对于Work queues模式,该模式多了一个交换机,生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息。

    缺少图片

    特点

    1、每个消费者监听自己的队列;

    2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
    到消息

    应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

    publish/subscribe 模式之生产者

    1. package com.zzg.rabbitmq.publish.subscribe;
    2. import com.rabbitmq.client.BuiltinExchangeType;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * 生产端核心步骤:
    10. *

    11. * 1、声明队列,声明交换机
    12. *

    13. * 2、创建连接
    14. *

    15. * 3、创建通道
    16. *

    17. * 4、通道声明交换机
    18. *

    19. * 5、通道声明队列
    20. *

    21. * 6、通过通道使队列绑定到交换机
    22. *

    23. * 7、制定消息
    24. *

    25. * 8、发送消息
    26. */
    27. public class PublicSubProduceMS {
    28. //声明两个队列和一个交换机
    29. //Publish/subscribe发布订阅模式
    30. private static final String QUEUE_EMAIL = "queueEmail";
    31. private static final String QUEUE_SMS = "queueSms";
    32. private static final String EXCHANGE = "messageChange";
    33. public static void main(String[] args) {
    34. Connection connection = null;
    35. Channel channel = null;
    36. try {
    37. ConnectionFactory connectionFactory = new ConnectionFactory();
    38. connectionFactory.setHost("192.168.43.10");//mq服务ip地址
    39. connectionFactory.setPort(5672);//mq client连接端口
    40. connectionFactory.setUsername("guest");//mq登录用户名
    41. connectionFactory.setPassword("guest");//mq登录密码
    42. connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
    43. //创建与RabbitMQ服务的TCP连接
    44. connection = connectionFactory.newConnection();
    45. //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
    46. channel = connection.createChannel();
    47. //通道绑定交换机
    48. /**
    49. * 参数明细
    50. * 1、交换机名称
    51. * 2、交换机类型,fanout、topic、direct、headers
    52. */
    53. //Publish/subscribe发布订阅模式
    54. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
    55. //通道绑定队列
    56. /**
    57. * 声明队列,如果Rabbit中没有此队列将自动创建
    58. * param1:队列名称
    59. * param2:是否持久化
    60. * param3:队列是否独占此连接
    61. * param4:队列不再使用时是否自动删除此队列
    62. * param5:队列参数
    63. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    64. *
    65. */
    66. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    67. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
    68. //交换机和队列绑定
    69. /**
    70. * 参数明细
    71. * 1、队列名称
    72. * 2、交换机名称
    73. * 3、路由key
    74. */
    75. //Publish/subscribe发布订阅模式
    76. channel.queueBind(QUEUE_EMAIL, EXCHANGE, "");
    77. channel.queueBind(QUEUE_SMS, EXCHANGE, "");
    78. for (int i = 0; i < 10; i++) {
    79. String message = new String("mq 发送消息。。。");
    80. /**
    81. * 消息发布方法
    82. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    83. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    84. * param3:消息包含的属性
    85. * param4:消息体
    86. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    87. * 默认的交换机,routingKey等于队列名称
    88. */
    89. //String exchange, String routingKey, BasicProperties props, byte[] body
    90. //Publish/subscribe发布订阅模式
    91. channel.basicPublish(EXCHANGE, "", null, message.getBytes());
    92. System.out.println("mq消息发送成功!");
    93. }
    94. } catch (Exception e) {
    95. e.printStackTrace();
    96. } finally {
    97. try {
    98. channel.close();
    99. } catch (IOException e) {
    100. e.printStackTrace();
    101. } catch (TimeoutException e) {
    102. e.printStackTrace();
    103. }
    104. try {
    105. connection.close();
    106. } catch (IOException e) {
    107. e.printStackTrace();
    108. }
    109. }
    110. }
    111. }

    publish/subscribe 模式之消费者(邮件)

    1. package com.zzg.rabbitmq.publish.subscribe;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * 消费者核心步骤
    7. * 1、声明队列,声明交换机
    8. *

    9. * 2、创建连接
    10. *

    11. * 3、创建通道
    12. *

    13. * 4、通道声明交换机
    14. *

    15. * 5、通道声明队列
    16. *

    17. * 6、通过通道使队列绑定到交换机
    18. *

    19. * 7、重写消息消费方法
    20. *

    21. * 8、执行消息方法
    22. */
    23. public class PublicSubEMailComsumerMS {
    24. //Publish/subscribe发布订阅模式
    25. private static final String QUEUE_EMAIL = "queueEmail";
    26. private static final String EXCHANGE = "messageChange";
    27. public static void main(String[] args) {
    28. Connection connection = null;
    29. Channel channel = null;
    30. try {
    31. ConnectionFactory connectionFactory = new ConnectionFactory();
    32. connectionFactory.setHost("192.168.43.10");
    33. connectionFactory.setPort(5672);
    34. connection = connectionFactory.newConnection();
    35. channel = connection.createChannel();
    36. //通道绑定交换机
    37. /**
    38. * 参数明细
    39. * 1、交换机名称
    40. * 2、交换机类型,fanout、topic、direct、headers
    41. */
    42. //Publish/subscribe发布订阅模式
    43. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
    44. //通道绑定队列
    45. /**
    46. * 声明队列,如果Rabbit中没有此队列将自动创建
    47. * param1:队列名称
    48. * param2:是否持久化
    49. * param3:队列是否独占此连接
    50. * param4:队列不再使用时是否自动删除此队列
    51. * param5:队列参数
    52. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    53. *
    54. */
    55. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    56. //交换机和队列绑定
    57. /**
    58. * 参数明细
    59. * 1、队列名称
    60. * 2、交换机名称
    61. * 3、路由key
    62. */
    63. //Publish/subscribe发布订阅模式
    64. channel.queueBind(QUEUE_EMAIL, EXCHANGE, "");
    65. //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    66. DefaultConsumer consumer = new DefaultConsumer(channel) {
    67. /**
    68. * 消费者接收消息调用此方法
    69. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    70. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    71. (收到消息失败后是否需要重新发送)
    72. * @param properties
    73. * @param body
    74. * @throws IOException
    75. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    76. */
    77. @Override
    78. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    79. //交换机
    80. String exchange = envelope.getExchange();
    81. //路由key
    82. String routingKey = envelope.getRoutingKey();
    83. envelope.getDeliveryTag();
    84. String msg = new String(body, "utf-8");
    85. System.out.println("mq收到的消息是:" + msg);
    86. }
    87. };
    88. System.out.println("消费者启动成功!");
    89. channel.basicConsume(QUEUE_EMAIL, true, consumer);
    90. } catch (IOException e) {
    91. e.printStackTrace();
    92. } catch (TimeoutException e) {
    93. e.printStackTrace();
    94. }
    95. }
    96. }

    publish/subscribe 模式之消费者(短信)

    1. package com.zzg.rabbitmq.publish.subscribe;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * 消费者核心步骤
    7. * 1、声明队列,声明交换机
    8. *

    9. * 2、创建连接
    10. *

    11. * 3、创建通道
    12. *

    13. * 4、通道声明交换机
    14. *

    15. * 5、通道声明队列
    16. *

    17. * 6、通过通道使队列绑定到交换机
    18. *

    19. * 7、重写消息消费方法
    20. *

    21. * 8、执行消息方法
    22. */
    23. public class PubSubSMSComsumerMS {
    24. //Publish/subscribe发布订阅模式
    25. private static final String QUEUE_SMS = "queueSms";
    26. private static final String EXCHANGE = "messageChange";
    27. public static void main(String[] args) {
    28. Connection connection = null;
    29. Channel channel = null;
    30. try {
    31. ConnectionFactory connectionFactory = new ConnectionFactory();
    32. connectionFactory.setHost("192.168.43.10");
    33. connectionFactory.setPort(5672);
    34. connection = connectionFactory.newConnection();
    35. channel = connection.createChannel();
    36. //通道绑定交换机
    37. /**
    38. * 参数明细
    39. * 1、交换机名称
    40. * 2、交换机类型,fanout、topic、direct、headers
    41. */
    42. //Publish/subscribe发布订阅模式
    43. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
    44. //通道绑定队列
    45. /**
    46. * 声明队列,如果Rabbit中没有此队列将自动创建
    47. * param1:队列名称
    48. * param2:是否持久化
    49. * param3:队列是否独占此连接
    50. * param4:队列不再使用时是否自动删除此队列
    51. * param5:队列参数
    52. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    53. *
    54. */
    55. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
    56. //交换机和队列绑定
    57. /**
    58. * 参数明细
    59. * 1、队列名称
    60. * 2、交换机名称
    61. * 3、路由key
    62. */
    63. //Publish/subscribe发布订阅模式
    64. channel.queueBind(QUEUE_SMS, EXCHANGE, "");
    65. DefaultConsumer consumer = new DefaultConsumer(channel) {
    66. /**
    67. * 消费者接收消息调用此方法
    68. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    69. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    70. (收到消息失败后是否需要重新发送)
    71. * @param properties
    72. * @param body
    73. * @throws IOException
    74. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    75. */
    76. @Override
    77. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    78. //交换机
    79. String exchange = envelope.getExchange();
    80. //路由key
    81. String routingKey = envelope.getRoutingKey();
    82. envelope.getDeliveryTag();
    83. String msg = new String(body, "utf-8");
    84. System.out.println("mq收到的消息是:" + msg);
    85. }
    86. };
    87. System.out.println("消费者启动成功!");
    88. channel.basicConsume(QUEUE_SMS, true, consumer);
    89. } catch (IOException e) {
    90. e.printStackTrace();
    91. } catch (TimeoutException e) {
    92. e.printStackTrace();
    93. }
    94. }
    95. }
    publish/subscribework queues有什么区别?
    区别:
    1 work queues 不用定义交换机,而 publish/subscribe 需要定义交换机。
    2 publish/subscribe 的生产方是面向交换机发送消息, work queues 的生产方是面向队列发送消息 ( 底层使用默认交换机)
    3 publish/subscribe 需要设置队列和交换机的绑定, work queues 不需要设置,实质上 work queues 会将队列绑定到默认的交换机 。
    相同点:
    所以两者实现的发布 / 订阅的效果是一样的,多个消费端监听同一个队列不会重复消费消息。
    工作中用publish/subscribe还是work queues?
            建议使用 publish/subscribe ,发布订阅模式比工作队列模式更强大,并且发布订阅模式可以指定自己专用的交换机。

    Routing 路由模式

            Routing 模式又称路由模式,该种模式除了要绑定交换机外,发消息的时候还要制定routing key,即路由key,队列通过通道绑定交换机的时候,需要指定自己的routing key,这样,生产端发送消息的时候也会指定routing key,通过routing key就可以把相应的消息发送到绑定相应routing key的队列中去。

    缺失图片

    特点:

    1、每个消费者监听自己的队列,并且设置routingkey;
    2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;

    应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

    routing路由模式之生产者

    1. package com.zzg.rabbitmq.routing;
    2. import com.rabbitmq.client.BuiltinExchangeType;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * 生产者核心步骤
    10. * 1、声明队列,声明交换机
    11. *

    12. * 2、创建连接
    13. *

    14. * 3、创建通道
    15. *

    16. * 4、通道声明交换机
    17. *

    18. * 5、通道声明队列
    19. *

    20. * 6、通过通道使队列绑定到交换机并指定该队列的routingkey
    21. *

    22. * 7、制定消息
    23. *

    24. * 8、发送消息并指定routingkey
    25. */
    26. public class RoutingProduceMS {
    27. //声明两个队列和一个交换机
    28. //Routing 路由模式
    29. private static final String QUEUE_EMAIL = "queueEmail";
    30. private static final String QUEUE_SMS = "queueSms";
    31. private static final String EXCHANGE = "routingMessageChange";
    32. public static void main(String[] args) {
    33. Connection connection = null;
    34. Channel channel = null;
    35. try {
    36. ConnectionFactory connectionFactory = new ConnectionFactory();
    37. connectionFactory.setHost("192.168.43.10");//mq服务ip地址
    38. connectionFactory.setPort(5672);//mq client连接端口
    39. connectionFactory.setUsername("guest");//mq登录用户名
    40. connectionFactory.setPassword("guest");//mq登录密码
    41. connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
    42. //创建与RabbitMQ服务的TCP连接
    43. connection = connectionFactory.newConnection();
    44. //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
    45. channel = connection.createChannel();
    46. //通道绑定交换机
    47. /**
    48. * 参数明细
    49. * 1、交换机名称
    50. * 2、交换机类型,fanout、topic、direct、headers
    51. */
    52. //Routing 路由模式
    53. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
    54. //通道绑定队列
    55. /**
    56. * 声明队列,如果Rabbit中没有此队列将自动创建
    57. * param1:队列名称
    58. * param2:是否持久化
    59. * param3:队列是否独占此连接
    60. * param4:队列不再使用时是否自动删除此队列
    61. * param5:队列参数
    62. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    63. *
    64. */
    65. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    66. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
    67. //交换机和队列绑定
    68. /**
    69. * 参数明细
    70. * 1、队列名称
    71. * 2、交换机名称
    72. * 3、路由key
    73. */
    74. //Routing 路由模式
    75. channel.queueBind(QUEUE_EMAIL, EXCHANGE, QUEUE_EMAIL);
    76. channel.queueBind(QUEUE_SMS, EXCHANGE, QUEUE_SMS);
    77. //给email队列发消息
    78. for (int i = 0; i < 10; i++) {
    79. String message = new String("mq 发送email消息。。。");
    80. /**
    81. * 消息发布方法
    82. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    83. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    84. * param3:消息包含的属性
    85. * param4:消息体
    86. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    87. * 默认的交换机,routingKey等于队列名称
    88. */
    89. //String exchange, String routingKey, BasicProperties props, byte[] body
    90. //Routing 路由模式
    91. channel.basicPublish(EXCHANGE, QUEUE_EMAIL, null, message.getBytes());
    92. System.out.println("mq消息发送成功!");
    93. }
    94. //给sms队列发消息
    95. for (int i = 0; i < 10; i++) {
    96. String message = new String("mq 发送sms消息。。。");
    97. /**
    98. * 消息发布方法
    99. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    100. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    101. * param3:消息包含的属性
    102. * param4:消息体
    103. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    104. * 默认的交换机,routingKey等于队列名称
    105. */
    106. //String exchange, String routingKey, BasicProperties props, byte[] body
    107. //Routing 路由模式
    108. channel.basicPublish(EXCHANGE, QUEUE_SMS, null, message.getBytes());
    109. System.out.println("mq消息发送成功!");
    110. }
    111. } catch (Exception e) {
    112. e.printStackTrace();
    113. } finally {
    114. try {
    115. channel.close();
    116. } catch (IOException e) {
    117. e.printStackTrace();
    118. } catch (TimeoutException e) {
    119. e.printStackTrace();
    120. }
    121. try {
    122. connection.close();
    123. } catch (IOException e) {
    124. e.printStackTrace();
    125. }
    126. }
    127. }
    128. }

    routing路由模式之消费者(邮件)

    1. package com.zzg.rabbitmq.routing;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * 消费者核心步骤
    7. * 1、声明队列,声明交换机
    8. *

    9. * 2、创建连接
    10. *

    11. * 3、创建通道
    12. *

    13. * 4、通道声明交换机
    14. *

    15. * 5、通道声明队列
    16. *

    17. * 6、通过通道使队列绑定到交换机并指定routingkey
    18. *

    19. * 7、重写消息消费方法
    20. *

    21. * 8、执行消息方法
    22. */
    23. public class RoutingEMailConsumerMS {
    24. //Routing 路由模式
    25. private static final String QUEUE_EMAIL = "queueEmail";
    26. private static final String EXCHANGE = "routingMessageChange";
    27. public static void main(String[] args) {
    28. Connection connection = null;
    29. Channel channel = null;
    30. try {
    31. ConnectionFactory connectionFactory = new ConnectionFactory();
    32. connectionFactory.setHost("192.168.43.10");
    33. connectionFactory.setPort(5672);
    34. connection = connectionFactory.newConnection();
    35. channel = connection.createChannel();
    36. //通道绑定交换机
    37. /**
    38. * 参数明细
    39. * 1、交换机名称
    40. * 2、交换机类型,fanout、topic、direct、headers
    41. */
    42. //Routing 路由模式
    43. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
    44. //通道绑定队列
    45. /**
    46. * 声明队列,如果Rabbit中没有此队列将自动创建
    47. * param1:队列名称
    48. * param2:是否持久化
    49. * param3:队列是否独占此连接
    50. * param4:队列不再使用时是否自动删除此队列
    51. * param5:队列参数
    52. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    53. *
    54. */
    55. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    56. //交换机和队列绑定
    57. /**
    58. * 参数明细
    59. * 1、队列名称
    60. * 2、交换机名称
    61. * 3、路由key
    62. */
    63. //Routing 路由模式
    64. channel.queueBind(QUEUE_EMAIL, EXCHANGE, QUEUE_EMAIL);
    65. //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    66. DefaultConsumer consumer = new DefaultConsumer(channel) {
    67. /**
    68. * 消费者接收消息调用此方法
    69. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    70. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    71. (收到消息失败后是否需要重新发送)
    72. * @param properties
    73. * @param body
    74. * @throws IOException
    75. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    76. */
    77. @Override
    78. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    79. //交换机
    80. String exchange = envelope.getExchange();
    81. //路由key
    82. String routingKey = envelope.getRoutingKey();
    83. envelope.getDeliveryTag();
    84. String msg = new String(body, "utf-8");
    85. System.out.println("mq收到的消息是:" + msg);
    86. }
    87. };
    88. System.out.println("消费者启动成功!");
    89. channel.basicConsume(QUEUE_EMAIL, true, consumer);
    90. } catch (IOException e) {
    91. e.printStackTrace();
    92. } catch (TimeoutException e) {
    93. e.printStackTrace();
    94. }
    95. }
    96. }

    routing路由模式之消费者(短信)

    1. package com.zzg.rabbitmq.routing;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * 消费者核心步骤
    7. * 1、声明队列,声明交换机
    8. *

    9. * 2、创建连接
    10. *

    11. * 3、创建通道
    12. *

    13. * 4、通道声明交换机
    14. *

    15. * 5、通道声明队列
    16. *

    17. * 6、通过通道使队列绑定到交换机并指定routingkey
    18. *

    19. * 7、重写消息消费方法
    20. *

    21. * 8、执行消息方法
    22. */
    23. public class RoutingSMSConsumerMS {
    24. //Routing 路由模式
    25. private static final String QUEUE_SMS = "queueSms";
    26. private static final String EXCHANGE = "routingMessageChange";
    27. public static void main(String[] args) {
    28. Connection connection = null;
    29. Channel channel = null;
    30. try {
    31. ConnectionFactory connectionFactory = new ConnectionFactory();
    32. connectionFactory.setHost("192.168.43.10");
    33. connectionFactory.setPort(5672);
    34. connection = connectionFactory.newConnection();
    35. channel = connection.createChannel();
    36. //通道绑定交换机
    37. /**
    38. * 参数明细
    39. * 1、交换机名称
    40. * 2、交换机类型,fanout、topic、direct、headers
    41. */
    42. //Routing 路由模式
    43. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
    44. //通道绑定队列
    45. /**
    46. * 声明队列,如果Rabbit中没有此队列将自动创建
    47. * param1:队列名称
    48. * param2:是否持久化
    49. * param3:队列是否独占此连接
    50. * param4:队列不再使用时是否自动删除此队列
    51. * param5:队列参数
    52. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    53. *
    54. */
    55. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
    56. //交换机和队列绑定
    57. /**
    58. * 参数明细
    59. * 1、队列名称
    60. * 2、交换机名称
    61. * 3、路由key
    62. */
    63. //Routing 路由模式
    64. channel.queueBind(QUEUE_SMS, EXCHANGE, QUEUE_SMS);
    65. DefaultConsumer consumer = new DefaultConsumer(channel) {
    66. /**
    67. * 消费者接收消息调用此方法
    68. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    69. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    70. (收到消息失败后是否需要重新发送)
    71. * @param properties
    72. * @param body
    73. * @throws IOException
    74. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    75. */
    76. @Override
    77. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    78. //交换机
    79. String exchange = envelope.getExchange();
    80. //路由key
    81. String routingKey = envelope.getRoutingKey();
    82. envelope.getDeliveryTag();
    83. String msg = new String(body, "utf-8");
    84. System.out.println("mq收到的消息是:" + msg);
    85. }
    86. };
    87. System.out.println("消费者启动成功!");
    88. channel.basicConsume(QUEUE_SMS, true, consumer);
    89. } catch (IOException e) {
    90. e.printStackTrace();
    91. } catch (TimeoutException e) {
    92. e.printStackTrace();
    93. }
    94. }
    95. }
    Routing模式和Publish/subscibe有啥区别?
            Routing模式要求队列在绑定交换机时要指定 routingkey ,消息会转发到符合 routingkey 的队列。

    Topics 模式

            Topics 模式和Routing 路由模式最大的区别就是,Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的。

    缺失图片

    特点:

    1、每个消费者监听自己的队列,并且设置带统配符的routingkey

    2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列

    应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;

    topics模式之生产者

    1. package com.zzg.rabbitmq.topic;
    2. import com.rabbitmq.client.BuiltinExchangeType;
    3. import com.rabbitmq.client.Channel;
    4. import com.rabbitmq.client.Connection;
    5. import com.rabbitmq.client.ConnectionFactory;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. /**
    9. * 生产者核心步骤
    10. * 1、声明队列,声明交换机
    11. *

    12. * 2、创建连接
    13. *

    14. * 3、创建通道
    15. *

    16. * 4、通道声明交换机
    17. *

    18. * 5、通道声明队列
    19. *

    20. * 6、通过通道使队列绑定到交换机并指定该队列的routingkey(通配符)
    21. *

    22. * 7、制定消息
    23. *

    24. * 8、发送消息并指定routingkey(通配符)
    25. */
    26. public class TopicProduceMS {
    27. //声明两个队列和一个交换机
    28. //Topics 模式
    29. private static final String QUEUE_EMAIL = "queueEmail";
    30. private static final String QUEUE_SMS = "queueSms";
    31. private static final String EXCHANGE = "topicMessageChange";
    32. public static void main(String[] args) {
    33. Connection connection = null;
    34. Channel channel = null;
    35. try {
    36. ConnectionFactory connectionFactory = new ConnectionFactory();
    37. connectionFactory.setHost("192.168.43.10");//mq服务ip地址
    38. connectionFactory.setPort(5672);//mq client连接端口
    39. connectionFactory.setUsername("guest");//mq登录用户名
    40. connectionFactory.setPassword("guest");//mq登录密码
    41. connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
    42. //创建与RabbitMQ服务的TCP连接
    43. connection = connectionFactory.newConnection();
    44. //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
    45. channel = connection.createChannel();
    46. //通道绑定交换机
    47. /**
    48. * 参数明细
    49. * 1、交换机名称
    50. * 2、交换机类型,fanout、topic、direct、headers
    51. */
    52. //Topics 模式
    53. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
    54. //通道绑定队列
    55. /**
    56. * 声明队列,如果Rabbit中没有此队列将自动创建
    57. * param1:队列名称
    58. * param2:是否持久化
    59. * param3:队列是否独占此连接
    60. * param4:队列不再使用时是否自动删除此队列
    61. * param5:队列参数
    62. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    63. *
    64. */
    65. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    66. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
    67. //交换机和队列绑定
    68. /**
    69. * 参数明细
    70. * 1、队列名称
    71. * 2、交换机名称
    72. * 3、路由key
    73. */
    74. channel.queueBind(QUEUE_EMAIL, EXCHANGE, "inform.#.email.#");
    75. channel.queueBind(QUEUE_SMS, EXCHANGE, "inform.#.sms.#");
    76. //给email队列发消息
    77. for (int i = 0; i < 10; i++) {
    78. String message = new String("mq 发送email消息。。。");
    79. /**
    80. * 消息发布方法
    81. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    82. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    83. * param3:消息包含的属性
    84. * param4:消息体
    85. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    86. * 默认的交换机,routingKey等于队列名称
    87. */
    88. //String exchange, String routingKey, BasicProperties props, byte[] body
    89. channel.basicPublish(EXCHANGE, "inform.email", null, message.getBytes());
    90. System.out.println("mq email 消息发送成功!");
    91. }
    92. //给sms队列发消息
    93. for (int i = 0; i < 10; i++) {
    94. String message = new String("mq 发送sms消息。。。");
    95. /**
    96. * 消息发布方法
    97. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    98. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    99. * param3:消息包含的属性
    100. * param4:消息体
    101. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    102. * 默认的交换机,routingKey等于队列名称
    103. */
    104. //String exchange, String routingKey, BasicProperties props, byte[] body
    105. channel.basicPublish(EXCHANGE, "inform.sms", null, message.getBytes());
    106. System.out.println("mq sms 消息发送成功!");
    107. }
    108. //给email和sms队列发消息
    109. for (int i = 0; i < 10; i++) {
    110. String message = new String("mq 发送email sms消息。。。");
    111. /**
    112. * 消息发布方法
    113. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    114. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    115. * param3:消息包含的属性
    116. * param4:消息体
    117. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    118. * 默认的交换机,routingKey等于队列名称
    119. */
    120. //String exchange, String routingKey, BasicProperties props, byte[] body
    121. channel.basicPublish(EXCHANGE, "inform.email.sms", null, message.getBytes());
    122. System.out.println("mq email sms 消息发送成功!");
    123. }
    124. } catch (Exception e) {
    125. e.printStackTrace();
    126. } finally {
    127. try {
    128. channel.close();
    129. } catch (IOException e) {
    130. e.printStackTrace();
    131. } catch (TimeoutException e) {
    132. e.printStackTrace();
    133. }
    134. try {
    135. connection.close();
    136. } catch (IOException e) {
    137. e.printStackTrace();
    138. }
    139. }
    140. }
    141. }

    topics模式之消费者(邮件)

    1. package com.zzg.rabbitmq.topic;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. /**
    6. * 消费者核心步骤
    7. */
    8. public class TopicEMailConsumerMS {
    9. private static final String QUEUE_EMAIL = "queueEmail";
    10. private static final String EXCHANGE = "topicMessageChange";
    11. public static void main(String[] args) {
    12. Connection connection = null;
    13. Channel channel = null;
    14. try {
    15. ConnectionFactory connectionFactory = new ConnectionFactory();
    16. connectionFactory.setHost("192.168.43.10");
    17. connectionFactory.setPort(5672);
    18. connection = connectionFactory.newConnection();
    19. channel = connection.createChannel();
    20. //通道绑定交换机
    21. /**
    22. * 参数明细
    23. * 1、交换机名称
    24. * 2、交换机类型,fanout、topic、direct、headers
    25. */
    26. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
    27. //通道绑定队列
    28. /**
    29. * 声明队列,如果Rabbit中没有此队列将自动创建
    30. * param1:队列名称
    31. * param2:是否持久化
    32. * param3:队列是否独占此连接
    33. * param4:队列不再使用时是否自动删除此队列
    34. * param5:队列参数
    35. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    36. *
    37. */
    38. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    39. //交换机和队列绑定
    40. /**
    41. * 参数明细
    42. * 1、队列名称
    43. * 2、交换机名称
    44. * 3、路由key
    45. */
    46. channel.queueBind(QUEUE_EMAIL, EXCHANGE, "inform.#.email.#");
    47. //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    48. DefaultConsumer consumer = new DefaultConsumer(channel) {
    49. /**
    50. * 消费者接收消息调用此方法
    51. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    52. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    53. (收到消息失败后是否需要重新发送)
    54. * @param properties
    55. * @param body
    56. * @throws IOException
    57. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    58. */
    59. @Override
    60. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    61. //交换机
    62. String exchange = envelope.getExchange();
    63. //路由key
    64. String routingKey = envelope.getRoutingKey();
    65. envelope.getDeliveryTag();
    66. String msg = new String(body, "utf-8");
    67. System.out.println("mq收到的消息是:" + msg);
    68. }
    69. };
    70. System.out.println("消费者启动成功!");
    71. channel.basicConsume(QUEUE_EMAIL, true, consumer);
    72. } catch (IOException e) {
    73. e.printStackTrace();
    74. } catch (TimeoutException e) {
    75. e.printStackTrace();
    76. }
    77. }
    78. }

    topics模式之消费者(短信)

    1. package com.zzg.rabbitmq.topic;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class TopicSMSConsumerMS {
    6. private static final String QUEUE_SMS = "queueSms";
    7. private static final String EXCHANGE = "topicMessageChange";
    8. public static void main(String[] args) {
    9. Connection connection = null;
    10. Channel channel = null;
    11. try {
    12. ConnectionFactory connectionFactory = new ConnectionFactory();
    13. connectionFactory.setHost("192.168.43.10");
    14. connectionFactory.setPort(5672);
    15. connection = connectionFactory.newConnection();
    16. channel = connection.createChannel();
    17. //通道绑定交换机
    18. /**
    19. * 参数明细
    20. * 1、交换机名称
    21. * 2、交换机类型,fanout、topic、direct、headers
    22. */
    23. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
    24. //通道绑定队列
    25. /**
    26. * 声明队列,如果Rabbit中没有此队列将自动创建
    27. * param1:队列名称
    28. * param2:是否持久化
    29. * param3:队列是否独占此连接
    30. * param4:队列不再使用时是否自动删除此队列
    31. * param5:队列参数
    32. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    33. *
    34. */
    35. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定邮件队列
    36. //交换机和队列绑定
    37. /**
    38. * 参数明细
    39. * 1、队列名称
    40. * 2、交换机名称
    41. * 3、路由key
    42. */
    43. channel.queueBind(QUEUE_SMS, EXCHANGE, "inform.#.sms.#");
    44. //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    45. DefaultConsumer consumer = new DefaultConsumer(channel) {
    46. /**
    47. * 消费者接收消息调用此方法
    48. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    49. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    50. (收到消息失败后是否需要重新发送)
    51. * @param properties
    52. * @param body
    53. * @throws IOException
    54. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    55. */
    56. @Override
    57. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    58. //交换机
    59. String exchange = envelope.getExchange();
    60. //路由key
    61. String routingKey = envelope.getRoutingKey();
    62. envelope.getDeliveryTag();
    63. String msg = new String(body, "utf-8");
    64. System.out.println("mq收到的消息是:" + msg);
    65. }
    66. };
    67. System.out.println("消费者启动成功!");
    68. channel.basicConsume(QUEUE_SMS, true, consumer);
    69. } catch (IOException e) {
    70. e.printStackTrace();
    71. } catch (TimeoutException e) {
    72. e.printStackTrace();
    73. }
    74. }
    75. }
    使用Routing工作模式能否实现Topic模式?
            使用Routing 模式也可以实现本案例Topic模式,共设置三个 routingkey ,分别是 email sms all email 队列绑定 email 和all, sms 队列绑定 sms all ,这样就可以实现上边案例的功能,实现过程比 topics 复杂。
            Topic模式更多加强大,它可以实现 Routing publish/subscirbe 模式的功能

    Header 模式 

    header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。

    案例:

            根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。

            根据假设使用场景,需要一个生产端,两个消费端,不同的是,生产端声明交换机时,交换机的类型不同,是headers类型,生产端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,发送消息时也是使用header中的 key/value(键值对)匹配队列。

            消费端同样是声明交换机时,交换机的类型不同,是headers类型,消费端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,消费消息时也是使用header中的 key/value(键值对)匹配队列。

    Header模式之生产者

    1. package com.zzg.rabbitmq.header;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.Hashtable;
    5. import java.util.Map;
    6. import java.util.concurrent.TimeoutException;
    7. public class HeaderProduceMS {
    8. //声明两个队列和一个交换机
    9. //Header 模式
    10. private static final String QUEUE_EMAIL = "queueEmail";
    11. private static final String QUEUE_SMS = "queueSms";
    12. private static final String EXCHANGE = "HeaderMessageChange";
    13. public static void main(String[] args) {
    14. Connection connection = null;
    15. Channel channel = null;
    16. try {
    17. ConnectionFactory connectionFactory = new ConnectionFactory();
    18. connectionFactory.setHost("192.168.43.10");//mq服务ip地址
    19. connectionFactory.setPort(5672);//mq client连接端口
    20. connectionFactory.setUsername("guest");//mq登录用户名
    21. connectionFactory.setPassword("guest");//mq登录密码
    22. connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
    23. //创建与RabbitMQ服务的TCP连接
    24. connection = connectionFactory.newConnection();
    25. //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
    26. channel = connection.createChannel();
    27. //通道绑定交换机
    28. /**
    29. * 参数明细
    30. * 1、交换机名称
    31. * 2、交换机类型,fanout、topic、direct、headers
    32. */
    33. //Header 模式
    34. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
    35. //通道绑定队列
    36. /**
    37. * 声明队列,如果Rabbit中没有此队列将自动创建
    38. * param1:队列名称
    39. * param2:是否持久化
    40. * param3:队列是否独占此连接
    41. * param4:队列不再使用时是否自动删除此队列
    42. * param5:队列参数
    43. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    44. *
    45. */
    46. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    47. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
    48. //交换机和队列绑定
    49. /**
    50. * 参数明细
    51. * 1、队列名称
    52. * 2、交换机名称
    53. * 3、路由key
    54. * 4、
    55. * String queue, String exchange, String routingKey, Map arguments
    56. */
    57. Map headers_email = new Hashtable();
    58. headers_email.put("inform_type", "email");
    59. Map headers_sms = new Hashtable();
    60. headers_sms.put("inform_type", "sms");
    61. channel.queueBind(QUEUE_EMAIL, EXCHANGE, "", headers_email);
    62. channel.queueBind(QUEUE_SMS, EXCHANGE, "", headers_sms);
    63. //给email队列发消息
    64. for (int i = 0; i < 10; i++) {
    65. String message = new String("mq 发送email消息。。。");
    66. Map headers = new Hashtable();
    67. headers.put("inform_type", "email");//匹配email通知消费者绑定的header
    68. /**
    69. * 消息发布方法
    70. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    71. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    72. * param3:消息包含的属性
    73. * param4:消息体
    74. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    75. * 默认的交换机,routingKey等于队列名称
    76. */
    77. //String exchange, String routingKey, BasicProperties props, byte[] body
    78. AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
    79. properties.headers(headers);
    80. //Email通知
    81. channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());
    82. System.out.println("mq email 消息发送成功!");
    83. }
    84. //给sms队列发消息
    85. for (int i = 0; i < 10; i++) {
    86. String message = new String("mq 发送sms消息。。。");
    87. Map headers = new Hashtable();
    88. headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
    89. /**
    90. * 消息发布方法
    91. * param1:Exchange的名称,如果没有指定,则使用Default Exchange
    92. * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
    93. * param3:消息包含的属性
    94. * param4:消息体
    95. * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
    96. * 默认的交换机,routingKey等于队列名称
    97. */
    98. //String exchange, String routingKey, BasicProperties props, byte[] body
    99. AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
    100. properties.headers(headers);
    101. //sms通知
    102. channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());
    103. System.out.println("mq sms 消息发送成功!");
    104. }
    105. } catch (Exception e) {
    106. e.printStackTrace();
    107. } finally {
    108. try {
    109. channel.close();
    110. } catch (IOException e) {
    111. e.printStackTrace();
    112. } catch (TimeoutException e) {
    113. e.printStackTrace();
    114. }
    115. try {
    116. connection.close();
    117. } catch (IOException e) {
    118. e.printStackTrace();
    119. }
    120. }
    121. }
    122. }

    Header模式之消费者(邮件)

    1. package com.zzg.rabbitmq.header;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.Hashtable;
    5. import java.util.Map;
    6. import java.util.concurrent.TimeoutException;
    7. public class HeaderEMailConsumerMS {
    8. private static final String QUEUE_EMAIL = "queueEmail";
    9. private static final String EXCHANGE = "HeaderMessageChange";
    10. public static void main(String[] args) {
    11. Connection connection = null;
    12. Channel channel = null;
    13. try {
    14. ConnectionFactory connectionFactory = new ConnectionFactory();
    15. connectionFactory.setHost("192.168.43.10");
    16. connectionFactory.setPort(5672);
    17. connection = connectionFactory.newConnection();
    18. channel = connection.createChannel();
    19. //通道绑定交换机
    20. /**
    21. * 参数明细
    22. * 1、交换机名称
    23. * 2、交换机类型,fanout、topic、direct、headers
    24. */
    25. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
    26. //通道绑定队列
    27. /**
    28. * 声明队列,如果Rabbit中没有此队列将自动创建
    29. * param1:队列名称
    30. * param2:是否持久化
    31. * param3:队列是否独占此连接
    32. * param4:队列不再使用时是否自动删除此队列
    33. * param5:队列参数
    34. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    35. *
    36. */
    37. channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
    38. //交换机和队列绑定
    39. /**
    40. * 参数明细
    41. * 1、队列名称
    42. * 2、交换机名称
    43. * 3、路由key
    44. * 4、
    45. * String queue, String exchange, String routingKey, Map arguments
    46. */
    47. Map headers_email = new Hashtable();
    48. headers_email.put("inform_email", "email");
    49. channel.queueBind(QUEUE_EMAIL, EXCHANGE, "", headers_email);
    50. //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    51. DefaultConsumer consumer = new DefaultConsumer(channel) {
    52. /**
    53. * 消费者接收消息调用此方法
    54. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    55. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    56. (收到消息失败后是否需要重新发送)
    57. * @param properties
    58. * @param body
    59. * @throws IOException
    60. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    61. */
    62. @Override
    63. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    64. //交换机
    65. String exchange = envelope.getExchange();
    66. //路由key
    67. String routingKey = envelope.getRoutingKey();
    68. envelope.getDeliveryTag();
    69. String msg = new String(body, "utf-8");
    70. System.out.println("mq收到的消息是:" + msg);
    71. }
    72. };
    73. System.out.println("消费者启动成功!");
    74. channel.basicConsume(QUEUE_EMAIL, true, consumer);
    75. } catch (IOException e) {
    76. e.printStackTrace();
    77. } catch (TimeoutException e) {
    78. e.printStackTrace();
    79. }
    80. }
    81. }

    Header模式之消费者(短信)

    1. package com.zzg.rabbitmq.header;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.Hashtable;
    5. import java.util.Map;
    6. import java.util.concurrent.TimeoutException;
    7. public class HeaderSMSConsumerMS {
    8. private static final String QUEUE_SMS = "queueSms";
    9. private static final String EXCHANGE = "HeaderMessageChange";
    10. public static void main(String[] args) {
    11. Connection connection = null;
    12. Channel channel = null;
    13. try {
    14. ConnectionFactory connectionFactory = new ConnectionFactory();
    15. connectionFactory.setHost("192.168.43.10");
    16. connectionFactory.setPort(5672);
    17. connection = connectionFactory.newConnection();
    18. channel = connection.createChannel();
    19. //通道绑定交换机
    20. /**
    21. * 参数明细
    22. * 1、交换机名称
    23. * 2、交换机类型,fanout、topic、direct、headers
    24. */
    25. channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
    26. //通道绑定队列
    27. /**
    28. * 声明队列,如果Rabbit中没有此队列将自动创建
    29. * param1:队列名称
    30. * param2:是否持久化
    31. * param3:队列是否独占此连接
    32. * param4:队列不再使用时是否自动删除此队列
    33. * param5:队列参数
    34. * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments
    35. *
    36. */
    37. channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定邮件队列
    38. //交换机和队列绑定
    39. /**
    40. * 参数明细
    41. * 1、队列名称
    42. * 2、交换机名称
    43. * 3、路由key
    44. * 4、
    45. * String queue, String exchange, String routingKey, Map arguments
    46. */
    47. Map headers_email = new Hashtable();
    48. headers_email.put("inform_email", "sms");
    49. channel.queueBind(QUEUE_SMS, EXCHANGE, "", headers_email);
    50. //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    51. DefaultConsumer consumer = new DefaultConsumer(channel) {
    52. /**
    53. * 消费者接收消息调用此方法
    54. * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
    55. * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
    56. (收到消息失败后是否需要重新发送)
    57. * @param properties
    58. * @param body
    59. * @throws IOException
    60. * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
    61. */
    62. @Override
    63. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    64. //交换机
    65. String exchange = envelope.getExchange();
    66. //路由key
    67. String routingKey = envelope.getRoutingKey();
    68. envelope.getDeliveryTag();
    69. String msg = new String(body, "utf-8");
    70. System.out.println("mq收到的消息是:" + msg);
    71. }
    72. };
    73. System.out.println("消费者启动成功!");
    74. channel.basicConsume(QUEUE_SMS, true, consumer);
    75. } catch (IOException e) {
    76. e.printStackTrace();
    77. } catch (TimeoutException e) {
    78. e.printStackTrace();
    79. }
    80. }
    81. }

    RPC 模式

    缺失图片

     RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

    1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

    2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

    3、服务端将RPC方法 的结果发送到RPC响应队列。

    4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

    5、RabbitMQ 高级特性

    RabbitMQ 消息有效期

            大家有没有设想过RabbitMQ 消息长时间没有处理,消息是否会过期?,带着上述疑问开始本章节的学习。

    默认情况

            默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。

    TTL(消息存活时间)

    TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信,关于死信以及死信队列我们在下一个章节会说到。

    TTL 设置方式

    1. 在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。

    2. 在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。

    思考:如果队列声明了过期时间且消息也设置过期时间,以谁的为准?

              以时间短为准。

    TTL之单条消息过期

    第一步:创建Maven项目,添加Web 和RabbitMQ jar包依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-webartifactId>
    4. dependency>
    5. <dependency>
    6. <groupId>org.springframework.bootgroupId>
    7. <artifactId>spring-boot-starter-amqpartifactId>
    8. dependency>

    第二步:添加RabbitMQ 配置对象

    1. package com.zzg.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. @Configuration
    9. public class QueueConfig {
    10. public static final String TTL_QUEUE_DEMO = "ttl_queue_demo";
    11. public static final String TTL_EXCHANGE_DEMO = "ttl_exchange_demo";
    12. public static final String TTL_ROUTING_KEY = "ttl_routing_key";
    13. @Bean
    14. Queue queue() {
    15. return new Queue(TTL_QUEUE_DEMO, true, false, false);
    16. }
    17. @Bean
    18. DirectExchange directExchange() {
    19. return new DirectExchange(TTL_EXCHANGE_DEMO, true, false);
    20. }
    21. @Bean
    22. Binding binding() {
    23. return BindingBuilder.bind(queue())
    24. .to(directExchange())
    25. .with(TTL_ROUTING_KEY);
    26. }
    27. }

     此配置对象主要解决:

    1. 首先配置一个消息队列,new 一个 Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列。

    2. 配置一个 DirectExchange 交换机。

    3. 将交换机和队列绑定到一起。

    第三步:添加消息过期Controller ,通过PostMan 工具模拟请求

    1. package com.zzg.controller;
    2. import com.zzg.config.QueueConfig;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageBuilder;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.web.bind.annotation.GetMapping;
    8. import org.springframework.web.bind.annotation.RequestMapping;
    9. import org.springframework.web.bind.annotation.RestController;
    10. @RestController
    11. @RequestMapping("/ttl")
    12. public class MessageTTLController {
    13. @Autowired
    14. RabbitTemplate rabbitTemplate;
    15. @GetMapping("/message")
    16. public void hello() {
    17. Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
    18. // 设置消息过期时间
    19. .setExpiration("10000")
    20. .build();
    21. rabbitTemplate.convertAndSend(QueueConfig.TTL_QUEUE_DEMO, message);
    22. }
    23. }

    TTL之队列消息过期

    仅仅需要修改RabbitMQ 配置对象和消息过期Controller

    第一:修改RabbitMQ 配置对象,设置队列过期 时间

    1. @Bean
    2. Queue queue() {
    3. Map<String, Object> args = new HashMap<>();
    4. // 消息队列设置过期时间
    5. args.put("x-message-ttl", 10000);
    6. return new Queue(TTL_QUEUE_DEMO, true, false, false, args);
    7. }

    其他都不需要修改,直接复用。

    第二步:修改消息过期Controller,移除单条消息过期时间

    1. package com.zzg.controller;
    2. import com.zzg.config.QueueConfig;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageBuilder;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.web.bind.annotation.GetMapping;
    8. import org.springframework.web.bind.annotation.RequestMapping;
    9. import org.springframework.web.bind.annotation.RestController;
    10. @RestController
    11. @RequestMapping("/ttl")
    12. public class MessageTTLController {
    13. @Autowired
    14. RabbitTemplate rabbitTemplate;
    15. @GetMapping("/message")
    16. public void hello() {
    17. // Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
    18. // // 设置消息过期时间
    19. // .setExpiration("10000")
    20. // .build();
    21. Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
    22. .build();
    23. rabbitTemplate.convertAndSend(QueueConfig.TTL_QUEUE_DEMO, message);
    24. }
    25. }

    思考:如果将消息过期时间设置为0,这表示消息不能立马消费则立即被丢掉。

    六、RabbitMQ 死信队列

    死信交换机,Dead-Letter-Exchange 即 DLX。

    死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:

    • 消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false

    • 消息过期

    • 队列达到最大长度

    当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。

    DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。

    死信队列实践

    第一步:创建Maven项目,添加Web 和RabbitMQ jar包依赖

    1. <dependency>
    2. <groupId>org.springframework.bootgroupId>
    3. <artifactId>spring-boot-starter-webartifactId>
    4. dependency>
    5. <dependency>
    6. <groupId>org.springframework.bootgroupId>
    7. <artifactId>spring-boot-starter-amqpartifactId>
    8. dependency>

    第二步:添加RabbitMQ 配置对象 (普通队列和死信队列)

    1. package com.zzg.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import java.util.HashMap;
    9. import java.util.Map;
    10. /**
    11. * 普通消息队列配置
    12. */
    13. @Configuration
    14. public class RabbitMQConfig {
    15. public static final String ROUTING_QUEUE_DEMO = "routing_queue_demos";
    16. public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange_demos";
    17. public static final String ROUTING_KEY = "routing_keys";
    18. @Bean
    19. Queue queue() {
    20. Map args = new HashMap<>();
    21. //设置消息过期时间
    22. args.put("x-message-ttl", 0);
    23. //设置死信交换机
    24. args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);
    25. //设置死信 routing_key
    26. args.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);
    27. return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);
    28. }
    29. @Bean
    30. DirectExchange directExchange() {
    31. return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
    32. }
    33. @Bean
    34. Binding binding() {
    35. return BindingBuilder.bind(queue())
    36. .to(directExchange())
    37. .with(ROUTING_KEY);
    38. }
    39. }

    温馨提示:为普通消息队列配置死信 队列

    @Bean
    Queue queue() {
        Map args = new HashMap<>();
        //设置消息过期时间
        args.put("x-message-ttl", 0);
        //设置死信交换机
        args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);
        //设置死信 routing_key
        args.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);
        return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);
    }
    

    两个参数:

    • x-dead-letter-exchange:配置死信交换机。

    • x-dead-letter-routing-key:配置死信 routing_key

    发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。

    1. package com.zzg.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * RabbitMQ 死信队列配置
    10. */
    11. @Configuration
    12. public class RabbitMQDSLConfig {
    13. public static final String DLX_EXCHANGE_NAME = "dlx_exchange_names";
    14. public static final String DLX_QUEUE_NAME = "dlx_queue_names";
    15. public static final String DLX_ROUTING_KEY = "dlx_routing_keys";
    16. /**
    17. * 配置死信交换机
    18. *
    19. * @return
    20. */
    21. @Bean
    22. DirectExchange dlxDirectExchange() {
    23. return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
    24. }
    25. /**
    26. * 配置死信队列
    27. * @return
    28. */
    29. @Bean
    30. Queue dlxQueue() {
    31. return new Queue(DLX_QUEUE_NAME);
    32. }
    33. /**
    34. * 绑定死信队列和死信交换机
    35. * @return
    36. */
    37. @Bean
    38. Binding dlxBinding() {
    39. return BindingBuilder.bind(dlxQueue())
    40. .to(dlxDirectExchange())
    41. .with(DLX_ROUTING_KEY);
    42. }
    43. }

    第三步:配置死信队列监听器

    1. package com.zzg.components;
    2. import com.zzg.config.RabbitMQDSLConfig;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. * 死信队列处理组件
    7. */
    8. @Component
    9. public class DSLComponent {
    10. @RabbitListener(queues = RabbitMQDSLConfig.DLX_QUEUE_NAME)
    11. public void handler(String message){
    12. System.out.println("dlx msg = " + message);
    13. }
    14. }

    第四步:编写Controller,实现死信队列功能

    1. package com.zzg.controller;
    2. import com.zzg.config.RabbitMQConfig;
    3. import org.springframework.amqp.core.Message;
    4. import org.springframework.amqp.core.MessageBuilder;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.beans.factory.annotation.Autowired;
    7. import org.springframework.web.bind.annotation.GetMapping;
    8. import org.springframework.web.bind.annotation.RequestMapping;
    9. import org.springframework.web.bind.annotation.RestController;
    10. @RestController
    11. @RequestMapping("/ttl")
    12. public class MessageTTLController {
    13. @Autowired
    14. RabbitTemplate rabbitTemplate;
    15. @GetMapping("/message")
    16. public void hello() {
    17. Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
    18. .build();
    19. rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
    20. }
    21. }

    RabbitMQ延迟队列

    延迟队列场景:

    •         在电商项目中,当我们下单之后,一般需要 20 分钟之内或者 30 分钟之内付款,否则订单就会进入异常处理逻辑中,被取消,那么进入到异常处理逻辑中,就可以当成是一个延迟队列。
    •         用户下单外卖以后,距离超时时间还有 10 分钟时提醒外卖小哥即将超时。

    RabbitMQ延迟队列实现延迟队列方式:

    • 利用 RabbitMQ 自带的消息过期和私信队列机制,实现定时任务。

    • 使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。

    RabbitMQ 集成插件实现

    我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源项目,我们直接下载即可。

    https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

    选择适合自己的版本,我这里选择3.9.0 版。

    下载完成后,通过SFTP 上传CentOS服务器,再将此文件拷贝至RabbitMQ 容器中。

    docker cp /home/rabbitmq_delayed_message_exchange-3.9.0.ez 4b0032:/plugins
    

    第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置

    再执行如下命令进入到 RabbitMQ 容器中:

     docker exec -it 4b0032 /bin/bash

    进入到容器之后,执行如下命令启用插件:

     rabbitmq-plugins enable rabbitmq_delayed_message_exchange

    启用成功之后通过如下命令查看所有安装的插件:

    rabbitmq-plugins list

    MobeXterm 操作详细截图:

    1. [root@localhost home]# docker cp /home/rabbitmq_delayed_message_exchange-3.9.0.ez 4b0032:/plugins
    2. [root@localhost home]# docker exec -it 4b0032 /bin/bash
    3. root@4b0032878886:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
    4. Enabling plugins on node rabbit@4b0032878886:
    5. rabbitmq_delayed_message_exchange
    6. The following plugins have been configured:
    7. rabbitmq_delayed_message_exchange
    8. rabbitmq_management
    9. rabbitmq_management_agent
    10. rabbitmq_prometheus
    11. rabbitmq_web_dispatch
    12. Applying plugin configuration to rabbit@4b0032878886...
    13. The following plugins have been enabled:
    14. rabbitmq_delayed_message_exchange
    15. started 1 plugins.
    16. root@4b0032878886:/# rabbitmq-plugins list
    17. Listing plugins with pattern ".*" ...
    18. Configured: E = explicitly enabled; e = implicitly enabled
    19. | Status: * = running on rabbit@4b0032878886
    20. |/
    21. [ ] rabbitmq_amqp1_0 3.9.11
    22. [ ] rabbitmq_auth_backend_cache 3.9.11
    23. [ ] rabbitmq_auth_backend_http 3.9.11
    24. [ ] rabbitmq_auth_backend_ldap 3.9.11
    25. [ ] rabbitmq_auth_backend_oauth2 3.9.11
    26. [ ] rabbitmq_auth_mechanism_ssl 3.9.11
    27. [ ] rabbitmq_consistent_hash_exchange 3.9.11
    28. [E*] rabbitmq_delayed_message_exchange 3.9.0
    29. [ ] rabbitmq_event_exchange 3.9.11
    30. [ ] rabbitmq_federation 3.9.11
    31. [ ] rabbitmq_federation_management 3.9.11
    32. [ ] rabbitmq_jms_topic_exchange 3.9.11
    33. [E*] rabbitmq_management 3.9.11
    34. [e*] rabbitmq_management_agent 3.9.11
    35. [ ] rabbitmq_mqtt 3.9.11
    36. [ ] rabbitmq_peer_discovery_aws 3.9.11
    37. [ ] rabbitmq_peer_discovery_common 3.9.11
    38. [ ] rabbitmq_peer_discovery_consul 3.9.11
    39. [ ] rabbitmq_peer_discovery_etcd 3.9.11
    40. [ ] rabbitmq_peer_discovery_k8s 3.9.11
    41. [E*] rabbitmq_prometheus 3.9.11
    42. [ ] rabbitmq_random_exchange 3.9.11
    43. [ ] rabbitmq_recent_history_exchange 3.9.11
    44. [ ] rabbitmq_sharding 3.9.11
    45. [ ] rabbitmq_shovel 3.9.11
    46. [ ] rabbitmq_shovel_management 3.9.11
    47. [ ] rabbitmq_stomp 3.9.11
    48. [ ] rabbitmq_stream 3.9.11
    49. [ ] rabbitmq_stream_management 3.9.11
    50. [ ] rabbitmq_top 3.9.11
    51. [ ] rabbitmq_tracing 3.9.11
    52. [ ] rabbitmq_trust_store 3.9.11
    53. [e*] rabbitmq_web_dispatch 3.9.11
    54. [ ] rabbitmq_web_mqtt 3.9.11
    55. [ ] rabbitmq_web_mqtt_examples 3.9.11
    56. [ ] rabbitmq_web_stomp 3.9.11
    57. [ ] rabbitmq_web_stomp_examples 3.9.11
    58. root@4b0032878886:/# exit

    基于插件实现延迟队列

    第一:创建Maven项目,添加Web和RabbitMQ 依赖。

               省略...

    第二:添加RabbitMQ 配置对象 (延迟队列)

    1. package com.zzg.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.CustomExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import java.util.HashMap;
    9. import java.util.Map;
    10. /**
    11. * 延迟队列配置对象
    12. */
    13. @Configuration
    14. public class RabbitMQDelayedConfig {
    15. public static final String QUEUE_NAME = "delay_queue";
    16. public static final String EXCHANGE_NAME = "delay_exchange";
    17. public static final String EXCHANGE_TYPE = "x-delayed-message";
    18. @Bean
    19. Queue queue() {
    20. return new Queue(QUEUE_NAME, true, false, false);
    21. }
    22. @Bean
    23. CustomExchange customExchange() {
    24. Map args = new HashMap<>();
    25. args.put("x-delayed-type", "direct");
    26. return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);
    27. }
    28. @Bean
    29. Binding binding() {
    30. return BindingBuilder.bind(queue())
    31. .to(customExchange()).with(QUEUE_NAME).noargs();
    32. }
    33. }

    延迟队列使用交换机为CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:

    • 交换机名称。

    • 交换机类型,这个地方是固定的。

    • 交换机是否持久化。

    • 如果没有队列绑定到交换机,交换机是否删除。

    • 其他参数。

    最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。

    第三步:创建延迟队列的消费者

    1. package com.zzg.components;
    2. import com.zzg.config.RabbitMQDelayedConfig;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. * 延迟队列处理组件
    7. */
    8. @Component
    9. public class DelayedComponent {
    10. @RabbitListener(queues = RabbitMQDelayedConfig.QUEUE_NAME)
    11. public void handleMsg(String message) {
    12. System.out.println("delayed msg = " + message);
    13. }
    14. }

    第四步:创建Controller,并且在头部设置消息延迟时间

    1. @GetMapping("/delayed")
    2. public void delayed() {
    3. Message message = MessageBuilder.withBody("Delayed RabbitMQ".getBytes())
    4. // 消息头中设置消息的延迟时间。
    5. .setHeader("x-delay", 3000)
    6. .build();
    7. rabbitTemplate.convertAndSend(RabbitMQDelayedConfig.EXCHANGE_NAME, RabbitMQDelayedConfig.QUEUE_NAME, message);
    8. }

    基于DXL实现延迟队列

    延迟队列实现的思路也很简单,就是我们所说的 DLX(死信交换机)+TTL(消息超时时间)。

    功能需求说明: 

            假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。

     功能代码

    第一步:创建Maven项目,添加Web 和RabbitMQ依赖

            省略...

    第二步: 添加RabbitMQ配置对象(普通队列和死信队列)

    1. package com.zzg.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. import java.util.HashMap;
    9. import java.util.Map;
    10. /**
    11. * 普通消息队列配置
    12. */
    13. @Configuration
    14. public class RabbitMQConfig {
    15. public static final String ROUTING_QUEUE_DEMO = "routing_queue_demos";
    16. public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange_demos";
    17. public static final String ROUTING_KEY = "routing_keys";
    18. @Bean
    19. Queue queue() {
    20. Map args = new HashMap<>();
    21. //设置消息过期时间
    22. args.put("x-message-ttl", 1000 * 3);
    23. //设置死信交换机
    24. args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);
    25. //设置死信 routing_key
    26. args.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);
    27. return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);
    28. }
    29. @Bean
    30. DirectExchange directExchange() {
    31. return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
    32. }
    33. @Bean
    34. Binding binding() {
    35. return BindingBuilder.bind(queue())
    36. .to(directExchange())
    37. .with(ROUTING_KEY);
    38. }
    39. }
    1. package com.zzg.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.context.annotation.Bean;
    7. import org.springframework.context.annotation.Configuration;
    8. /**
    9. * RabbitMQ 死信队列配置
    10. */
    11. @Configuration
    12. public class RabbitMQDSLConfig {
    13. public static final String DLX_EXCHANGE_NAME = "dlx_exchange_names";
    14. public static final String DLX_QUEUE_NAME = "dlx_queue_names";
    15. public static final String DLX_ROUTING_KEY = "dlx_routing_keys";
    16. /**
    17. * 配置死信交换机
    18. *
    19. * @return
    20. */
    21. @Bean
    22. DirectExchange dlxDirectExchange() {
    23. return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
    24. }
    25. /**
    26. * 配置死信队列
    27. * @return
    28. */
    29. @Bean
    30. Queue dlxQueue() {
    31. return new Queue(DLX_QUEUE_NAME);
    32. }
    33. /**
    34. * 绑定死信队列和死信交换机
    35. * @return
    36. */
    37. @Bean
    38. Binding dlxBinding() {
    39. return BindingBuilder.bind(dlxQueue())
    40. .to(dlxDirectExchange())
    41. .with(DLX_ROUTING_KEY);
    42. }
    43. }

    第三步:为死信队列配置消息监听器

    1. package com.zzg.components;
    2. import com.zzg.config.RabbitMQDSLConfig;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. * 死信队列处理组件
    7. */
    8. @Component
    9. public class DSLComponent {
    10. @RabbitListener(queues = RabbitMQDSLConfig.DLX_QUEUE_NAME)
    11. public void handler(String message){
    12. System.out.println("dlx msg = " + message);
    13. }
    14. }

    第四步:添加Controller, 触发消息发送

    1. @GetMapping("/message")
    2. public void hello() {
    3. Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
    4. .build();
    5. rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
    6. }

    控制台输出:

    1. 2022-08-02 14:30:25.100 INFO 6960 --- [nio-8087-exec-3] o.s.web.servlet.DispatcherServlet : Completed initialization in 4 ms
    2. dlx msg = Hello RabbitMQ

    七、RabbitMQ 发送可靠性

    温馨提示:本章节内容主要讨论如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。

    RabbitMQ 消息发送机制

            RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。        

    要确保消息发送的可靠性,主要从两方面去确认:

    1. 消息成功到达 Exchange

    2. 消息成功到达 Queue

    如果能确认这两步,那么我们就可以认为消息发送成功了。

    如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。

    经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:

    1. 确认消息到达 Exchange。

    2. 确认消息到达 Queue。

    3. 开启定时任务,定时投递那些发送失败的消息。

     RabbitMQ 提供解决方案

    保证消息的成功发送的三个步骤,前两个RabbitQ 提供了对应的解决办法,第三个步骤需要我们自己实现。

    如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:

    1. 开启事务机制

    2. 发送方确认机制

    温馨提示

    两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报如下错误:

    ​ 

      RabbitMQ 开启事务

    第一步:创建Maven项目,添加Web 和RabbitMQ依赖

            省略...

    第二步: 添加RabbitMQ配置对象(普通队列+ 事务)

    1. package com.zzg.config;
    2. import org.springframework.amqp.core.Binding;
    3. import org.springframework.amqp.core.BindingBuilder;
    4. import org.springframework.amqp.core.DirectExchange;
    5. import org.springframework.amqp.core.Queue;
    6. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    7. import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
    8. import org.springframework.context.annotation.Bean;
    9. import org.springframework.context.annotation.Configuration;
    10. import java.util.HashMap;
    11. import java.util.Map;
    12. /**
    13. * 普通消息队列配置
    14. */
    15. @Configuration
    16. public class RabbitMQConfig {
    17. public static final String ROUTING_QUEUE_DEMO = "routing_queue";
    18. public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange";
    19. public static final String ROUTING_KEY = "routing_key_transactional";
    20. @Bean
    21. Queue queue() {
    22. return new Queue(ROUTING_QUEUE_DEMO, true, false, false);
    23. }
    24. @Bean
    25. DirectExchange directExchange() {
    26. return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
    27. }
    28. @Bean
    29. Binding binding() {
    30. return BindingBuilder.bind(queue())
    31. .to(directExchange())
    32. .with(ROUTING_KEY);
    33. }
    34. @Bean
    35. RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    36. return new RabbitTransactionManager(connectionFactory);
    37. }
    38. }

    第三步:在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:

    1. package com.zzg.service;
    2. import com.zzg.config.RabbitMQConfig;
    3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    4. import org.springframework.beans.factory.annotation.Autowired;
    5. import org.springframework.stereotype.Service;
    6. import org.springframework.transaction.annotation.Transactional;
    7. @Service
    8. public class MessageServiceImpl {
    9. @Autowired
    10. RabbitTemplate rabbitTemplate;
    11. @Transactional
    12. public void send(){
    13. rabbitTemplate.setChannelTransacted(true);
    14. rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_EXCHANGE_DEMO,RabbitMQConfig.ROUTING_QUEUE_DEMO,"transactional Rabbitmq!".getBytes());
    15. int i = 1 / 0;
    16. }
    17. }

    这里注意两点:

    1. 发送消息的方法上添加 @Transactional 注解标记事务。

    2. 调用 setChannelTransacted 方法设置为 true 开启事务模式。

    这就 OK 了。

    在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。

    当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:

    1. 客户端发出请求,将信道设置为事务模式。

    2. 服务端给出回复,同意将信道设置为事务模式。

    3. 客户端发送消息。

    4. 客户端提交事务。

    5. 服务端给出响应,确认事务提交。

    上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。

    所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一起来看下。

    RabbitMQ 发送确认机制

    移除刚刚关于事务的代码,然后在 application.yml中配置开启消息发送方确认机制。

    1. spring:
    2. rabbitmq:
    3. host: 192.168.43.10
    4. port: 5672
    5. username: guest
    6. password: guest
    7. virtual-host: /
    8. publisher-confirm-type: correlated
    9. publisher-returns: true

    publisher-confirm-type:配置消息到达交换器的确认回调,publisher-returns:配置消息到达队列的回调。

    publisher-confirm-type属性的配置有三个取值:

    1. none:表示禁用发布确认模式,默认即此。

    2. correlated:表示成功发布消息到交换器后会触发的回调方法。

    3. simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。

    RabbitMQ配置对象,开启两个监听 

    1. package com.zzg.config;
    2. import org.springframework.amqp.core.*;
    3. import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    4. import org.springframework.amqp.rabbit.connection.CorrelationData;
    5. import org.springframework.amqp.rabbit.core.RabbitTemplate;
    6. import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
    7. import org.springframework.beans.factory.annotation.Autowired;
    8. import org.springframework.context.annotation.Bean;
    9. import org.springframework.context.annotation.Configuration;
    10. import javax.annotation.PostConstruct;
    11. import java.util.HashMap;
    12. import java.util.Map;
    13. /**
    14. * 普通消息队列配置
    15. */
    16. @Configuration
    17. public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
    18. public static final String ROUTING_QUEUE_DEMO = "routing_queue";
    19. public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange";
    20. public static final String ROUTING_KEY = "routing_key_transactional";
    21. @Autowired
    22. RabbitTemplate rabbitTemplate;
    23. @Bean
    24. Queue queue() {
    25. return new Queue(ROUTING_QUEUE_DEMO, true, false, false);
    26. }
    27. @Bean
    28. DirectExchange directExchange() {
    29. return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
    30. }
    31. @Bean
    32. Binding binding() {
    33. return BindingBuilder.bind(queue())
    34. .to(directExchange())
    35. .with(ROUTING_KEY);
    36. }
    37. @PostConstruct
    38. public void initRabbitTemplate() {
    39. rabbitTemplate.setConfirmCallback(this);
    40. rabbitTemplate.setReturnCallback(this);
    41. }
    42. @Override
    43. public void confirm(CorrelationData correlationData, boolean ack, String s) {
    44. if (ack) {
    45. System.out.println(correlationData.getId() +":消息成功到达交换器");
    46. }else{
    47. System.out.println(correlationData.getId() +":消息发送失败");
    48. }
    49. }
    50. @Override
    51. public void returnedMessage(Message message, int i, String s, String s1, String s2) {
    52. System.out.println(message.getMessageProperties().getMessageId() +", 消息未成功路由到队列");
    53. }
    54. }

    关于这个配置类,我说如下几点:

    1. 定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。

    2. 定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。

    这就可以了。

    尝试将消息发送到一个不存在的交换机中,代码如下:

    1. @GetMapping("/message")
    2. public void hello() {
    3. // Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
    4. // .build();
    5. // rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
    6. // exchange 不存在
    7. rabbitTemplate.convertAndSend("RabbitConfig.EXCHANGE_NAME",RabbitMQConfig.ROUTING_QUEUE_DEMO,"Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
    8. }

    注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下错误:

    1. 2022-08-02 15:33:26.051 ERROR 12100 --- [.168.43.10:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'RabbitConfig.EXCHANGE_NAME' in vhost '/', class-id=60, method-id=40)
    2. 29cd1572-2db9-4236-b985-632597ab210e:消息发送失败

    给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:

    1. @GetMapping("/message")
    2. public void hello() {
    3. // Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
    4. // .build();
    5. // rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
    6. // exchange 不存在
    7. //rabbitTemplate.convertAndSend("RabbitConfig.EXCHANGE_NAME",RabbitMQConfig.ROUTING_QUEUE_DEMO,"Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
    8. // queue 不存在
    9. rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_EXCHANGE_DEMO,"RabbitMQConfig.ROUTING_QUEUE_DEMO","Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
    10. }

    控制台输出错误信息:

    1. 2022-08-02 15:36:07.112 INFO 1536 --- [nio-8087-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms
    2. null, 消息未成功路由到队列
    3. a1906bf2-c4e9-44b5-adc6-d2e3f9d95728:消息成功到达交换器

    RabbitMQ 失败重试

    失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。

    自带重试机制

            事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:

    1. spring.rabbitmq.template.retry.enabled=true
    2. spring.rabbitmq.template.retry.initial-interval=1000ms
    3. spring.rabbitmq.template.retry.max-attempts=10
    4. spring.rabbitmq.template.retry.max-interval=10000ms
    5. spring.rabbitmq.template.retry.multiplier=2

    从上往下配置含义依次是:

    • 开启重试机制。

    • 重试起始间隔时间。

    • 最大重试次数。

    • 最大重试间隔时间。

    • 间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)

    配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。

    业务重试

    业务重试主要是针对消息没有到达交换器的情况。

    如果消息没有成功到达交换器,根据我们上面的讲解,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!

    整体思路是这样:

    1. 首先创建一张表,用来记录发送到中间件上的消息,像下面这样:

     每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:

    • status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。

    • tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。

    • count:表示消息重试次数。

    其他字段都很好理解,我就不一一啰嗦了。

    1. 在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。

    2. 在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。

    3. 另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。

    当然这种思路有两个弊端:

    1. 去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。

    2. 按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。

    当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。

     八、RabbitMQ 消费可靠性

    本章节主要讨论:如何确保消息消费成功,并且确保幂等。

    两种消费思路

    RabbitMQ 的消息消费,整体上来说有两种不同的思路:

    • 推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。

    • 拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。

    推(push)方式

    这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下:

    1. package com.zzg.components;
    2. import com.zzg.config.RabbitMQDelayedConfig;
    3. import org.springframework.amqp.rabbit.annotation.RabbitListener;
    4. import org.springframework.stereotype.Component;
    5. /**
    6. * 延迟队列处理组件
    7. */
    8. @Component
    9. public class DelayedComponent {
    10. @RabbitListener(queues = RabbitMQDelayedConfig.QUEUE_NAME)
    11. public void handleMsg(String message) {
    12. System.out.println("delayed msg = " + message);
    13. }
    14. }

    当监听的队列中有消息时,就会触发该方法。 

    拉(pull)方式

    1. @Test
    2. public void test01() throws UnsupportedEncodingException {
    3. Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.QUEUE_NAME);
    4. System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
    5. }

    调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。

    如果需要从消息队列中持续获得消息,就推荐使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。

    确保消费成功思路

    为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。

    • 当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。

    • 当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。

    来看一张图:

    如上图所示,在 RabbitMQ 的 web 管理页面:

    • Ready 表示待消费的消息数量。

    • Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。

    这是我们可以从 UI 层面观察消息的消费情况确认情况。

    当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:

    • 待消费的消息

    • 已经投递给消费者,但是还没有被消费者确认的消息

    换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。

    综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。

    消息拒绝

    当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:

    1. @Component
    2. public class ConsumerDemo {
    3. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    4. public void handle(Channel channel, Message message) {
    5. //获取消息编号
    6. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    7. try {
    8. //拒绝消息
    9. channel.basicReject(deliveryTag, true);
    10. } catch (IOException e) {
    11. e.printStackTrace();
    12. }
    13. }
    14. }

    消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:

    1. 获取消息编号 deliveryTag。

    2. 调用 basicReject 方法拒绝消息。

    调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。

    需要注意的是,basicReject 方法一次只能拒绝一条消息。

    消息确认

    消息确认分为自动确认和手动确认

    消息确认之自动确认

    在 Spring Boot 中,默认情况下,消息消费就是自动确认。

    1. @Component
    2. public class ConsumerDemo {
    3. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    4. public void handle2(String msg) {
    5. System.out.println("msg = " + msg);
    6. int i = 1 / 0;
    7. }
    8. }

            通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。

    消息确认之手动确认

     手动确认又分为两种:推模式手动确认与拉模式手动确认

    推模式手动确认

    要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:

    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    这个配置表示将消息的确认模式改为手动确认。

    消费者端代码修改

    1. @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
    2. public void handle3(Message message,Channel channel) {
    3. long deliveryTag = message.getMessageProperties().getDeliveryTag();
    4. try {
    5. //消息消费的代码写到这里
    6. String s = new String(message.getBody());
    7. System.out.println("s = " + s);
    8. //消费完成后,手动 ack
    9. channel.basicAck(deliveryTag, false);
    10. } catch (Exception e) {
    11. //手动 nack
    12. try {
    13. channel.basicNack(deliveryTag, false, true);
    14. } catch (IOException ex) {
    15. ex.printStackTrace();
    16. }
    17. }
    18. }

    将消费者要做的事情放到一个 try..catch 代码块中。

    如果消息正常消费成功,则执行 basicAck 完成确认。

    如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。

    这里涉及到两个方法:

    • basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。

    • basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。

    当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题。

    拉模式手动确认

    拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下:

    1. public void receive2() {
    2. Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
    3. long deliveryTag = 0L;
    4. try {
    5. GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
    6. deliveryTag = getResponse.getEnvelope().getDeliveryTag();
    7. System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
    8. channel.basicAck(deliveryTag, false);
    9. } catch (IOException e) {
    10. try {
    11. channel.basicNack(deliveryTag, false, true);
    12. } catch (IOException ex) {
    13. ex.printStackTrace();
    14. }
    15. }
    16. }

    涉及到的 basicAck 和 basicNack 方法跟前面的一样,我就不再赘述。

     幂等性问题

    问题场景:消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致我们在消费消息时,一定要处理好幂等性问题。

    幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路。

    采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:

    • id-0(正在执行业务)

    • id-1(执行业务成功)

    如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。

    极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。

  • 相关阅读:
    Cesium 源码解析 Model(四)
    QsciScintilla等编辑器实现不同区域鼠标右键处理方式不同的方法
    深入理解并发编程同步工具类
    论文《Controllable Multi-Interest Framework for Recommendation》
    python中的字符串也是可迭代对象吗?
    深入多线程锁
    【c#】关于web api发布
    15-bag的录制,回放与解析
    java ssm课题研究经费管理系统
    C++ Qt开发:如何使用信号与槽
  • 原文地址:https://blog.csdn.net/zhouzhiwengang/article/details/126097313