目录
第一步:创建Maven项目,添加RabbitMQ 消息队列依赖。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
1 、生产者和 Broker 建立 TCP 连接。2 、生产者和 Broker 建立通道。3 、生产者通过通道消息发送给 Broker ,由 Exchange 将消息进行转发。4 、 Exchange 将消息转发到指定的 Queue (队列)
1 、消费者和 Broker 建立 TCP 连接2 、消费者和 Broker 建立通道3 、消费者监听指定的 Queue (队列)4 、当有消息到达 Queue 时 Broker 默认将消息推送给消费者。5 、消费者接收到消息。
-
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
- package com.zzg.rabbitmq;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class ProduceMS {
- //队列名称
- private static final String QUEUE = "helloworld";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- Connection connection = null;
- Channel channel = null;
-
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.43.10");
- factory.setPort(5672);
- factory.setUsername("guest");
- factory.setPassword("guest");
- factory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
- //创建与RabbitMQ服务的TCP连接
- connection = factory.newConnection();
- //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
- channel = connection.createChannel();
- /*** 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * */
- channel.queueDeclare(QUEUE, true, false, false, null);
- String message = "Hello World RabbitMQ" + System.currentTimeMillis();
- /*** 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * */
- /*** 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显 示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- * */
- channel.basicPublish("", QUEUE, null, message.getBytes());
- System.out.println("Send Message is:'" + message + "'");
-
- if (channel != null) {
- channel.close();
- }
- if (connection != null) {
- connection.close();
- }
- }
- }
- package com.zzg.rabbitmq;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class ConsumerMS {
- private static final String QUEUE = "helloworld";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- //设置RabbitMQ所在服务器的ip和端口
- factory.setHost("192.168.43.10");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明队列
- channel.queueDeclare(QUEUE, true, false, false, null);
- //定义消费方法
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /*** 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志 (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- //消息id
- long deliveryTag = envelope.getDeliveryTag();
- //消息内容
- String msg = new String(body, "utf-8");
- System.out.println("receive message.." + msg);
-
- }
- };
- /*** 监听队列String queue, boolean autoAck,Consumer callback
- * 参数明细
- * 1、队列名称
- * 2、是否自动回复,设置为true为表示消息接收到自动向mq回复接收到了,mq接收到回复会删除消息,设置 为false则需要手动回复
- * 3、消费消息的方法,消费者接收到消息后调用此方法
- */
- channel.basicConsume(QUEUE, true, consumer);
-
- }
- }
多个消费端消费同一个队列中的消息,队列采用轮询的方式将消息是平均发送给消费者;
缺少图片
特点:
1、一条消息只会被一个消费端接收;
2、队列采用轮询的方式将消息是平均发送给消费者的;
3、消费者在处理完某条消息后,才会收到下一条消息
Work Queues模式之生产者
- package com.zzg.rabbitmq.work.queues;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*
- 生产者核心步骤
- 1、声明队列
- 2、创建连接
- 3、创建通道
- 4、通道声明队列
- 5、制定消息
- 6、发送消息,使用默认交换机
- */
- public class WorkQueueProduceMS {
- //声明队列
- private static final String QUEUE = "queue";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");//mq服务ip地址
- connectionFactory.setPort(5672);//mq client连接端口
- connectionFactory.setUsername("guest");//mq登录用户名
- connectionFactory.setPassword("guest");//mq登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
- //创建与RabbitMQ服务的TCP连接
- connection = connectionFactory.newConnection();
- //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
- channel = connection.createChannel();
-
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE, true, false, false, null);//通道绑定邮件队列
-
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送消息。。。");
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish("", QUEUE, null, message.getBytes("utf-8"));
- System.out.println("mq消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- }
- }
Work Queues模式之消费者
- package com.zzg.rabbitmq.work.queues;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /*
- 消费者核心步骤
- 1、声明队列
- 2、创建连接
- 3、创建通道
- 4、通道声明队列
- 5、重写消息消费方法
- 6、执行消息方法
- */
- public class WorkQueueConsumerMS {
- private static final String QUEUE = "queue";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE, true, false, false, null);//通道绑定邮件队列
-
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
-
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE, true, consumer);
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
生产端启动后,控制台打印信息如下:
- mq消息发送成功!
- mq消息发送成功!
- mq消息发送成功!
- mq消息发送成功!
- mq消息发送成功!
- mq消息发送成功!
- mq消息发送成功!
- mq消息发送成功!
- mq消息发送成功!
RabbitMQ中的已有消息:
缺失图片
消费端启动后,控制台打印信息如下:
- 消费者启动成功!
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
- mq收到的消息是:mq 发送消息。。。
这种模式又称为发布订阅模式,相对于Work queues模式,该模式多了一个交换机,生产端先把消息发送到交换机,再由交换机把消息发送到绑定的队列中,每个绑定的队列都能收到由生产端发送的消息。
缺少图片
特点:
1、每个消费者监听自己的队列;
2、生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收
到消息
应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;
publish/subscribe 模式之生产者
- package com.zzg.rabbitmq.publish.subscribe;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 生产端核心步骤:
- *
- * 1、声明队列,声明交换机
- *
- * 2、创建连接
- *
- * 3、创建通道
- *
- * 4、通道声明交换机
- *
- * 5、通道声明队列
- *
- * 6、通过通道使队列绑定到交换机
- *
- * 7、制定消息
- *
- * 8、发送消息
- */
- public class PublicSubProduceMS {
- //声明两个队列和一个交换机
- //Publish/subscribe发布订阅模式
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "messageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");//mq服务ip地址
- connectionFactory.setPort(5672);//mq client连接端口
- connectionFactory.setUsername("guest");//mq登录用户名
- connectionFactory.setPassword("guest");//mq登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
- //创建与RabbitMQ服务的TCP连接
- connection = connectionFactory.newConnection();
- //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Publish/subscribe发布订阅模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- //Publish/subscribe发布订阅模式
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, "");
- channel.queueBind(QUEUE_SMS, EXCHANGE, "");
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送消息。。。");
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- //Publish/subscribe发布订阅模式
- channel.basicPublish(EXCHANGE, "", null, message.getBytes());
- System.out.println("mq消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
publish/subscribe 模式之消费者(邮件)
- package com.zzg.rabbitmq.publish.subscribe;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者核心步骤
- * 1、声明队列,声明交换机
- *
- * 2、创建连接
- *
- * 3、创建通道
- *
- * 4、通道声明交换机
- *
- * 5、通道声明队列
- *
- * 6、通过通道使队列绑定到交换机
- *
- * 7、重写消息消费方法
- *
- * 8、执行消息方法
- */
- public class PublicSubEMailComsumerMS {
- //Publish/subscribe发布订阅模式
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String EXCHANGE = "messageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Publish/subscribe发布订阅模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- //Publish/subscribe发布订阅模式
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, "");
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
publish/subscribe 模式之消费者(短信)
- package com.zzg.rabbitmq.publish.subscribe;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者核心步骤
- * 1、声明队列,声明交换机
- *
- * 2、创建连接
- *
- * 3、创建通道
- *
- * 4、通道声明交换机
- *
- * 5、通道声明队列
- *
- * 6、通过通道使队列绑定到交换机
- *
- * 7、重写消息消费方法
- *
- * 8、执行消息方法
- */
- public class PubSubSMSComsumerMS {
- //Publish/subscribe发布订阅模式
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "messageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Publish/subscribe发布订阅模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- //Publish/subscribe发布订阅模式
- channel.queueBind(QUEUE_SMS, EXCHANGE, "");
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
-
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS, true, consumer);
-
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
-
- }
- }
Routing 模式又称路由模式,该种模式除了要绑定交换机外,发消息的时候还要制定routing key,即路由key,队列通过通道绑定交换机的时候,需要指定自己的routing key,这样,生产端发送消息的时候也会指定routing key,通过routing key就可以把相应的消息发送到绑定相应routing key的队列中去。
缺失图片
特点:
1、每个消费者监听自己的队列,并且设置routingkey;
2、生产者将消息发给交换机,由交换机根据routingkey来转发消息到指定的队列;
应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;
routing路由模式之生产者
- package com.zzg.rabbitmq.routing;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 生产者核心步骤
- * 1、声明队列,声明交换机
- *
- * 2、创建连接
- *
- * 3、创建通道
- *
- * 4、通道声明交换机
- *
- * 5、通道声明队列
- *
- * 6、通过通道使队列绑定到交换机并指定该队列的routingkey
- *
- * 7、制定消息
- *
- * 8、发送消息并指定routingkey
- */
- public class RoutingProduceMS {
- //声明两个队列和一个交换机
- //Routing 路由模式
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "routingMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");//mq服务ip地址
- connectionFactory.setPort(5672);//mq client连接端口
- connectionFactory.setUsername("guest");//mq登录用户名
- connectionFactory.setPassword("guest");//mq登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
- //创建与RabbitMQ服务的TCP连接
- connection = connectionFactory.newConnection();
- //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Routing 路由模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- //Routing 路由模式
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, QUEUE_EMAIL);
- channel.queueBind(QUEUE_SMS, EXCHANGE, QUEUE_SMS);
- //给email队列发消息
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送email消息。。。");
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- //Routing 路由模式
- channel.basicPublish(EXCHANGE, QUEUE_EMAIL, null, message.getBytes());
- System.out.println("mq消息发送成功!");
- }
- //给sms队列发消息
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送sms消息。。。");
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- //Routing 路由模式
- channel.basicPublish(EXCHANGE, QUEUE_SMS, null, message.getBytes());
- System.out.println("mq消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
routing路由模式之消费者(邮件)
- package com.zzg.rabbitmq.routing;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者核心步骤
- * 1、声明队列,声明交换机
- *
- * 2、创建连接
- *
- * 3、创建通道
- *
- * 4、通道声明交换机
- *
- * 5、通道声明队列
- *
- * 6、通过通道使队列绑定到交换机并指定routingkey
- *
- * 7、重写消息消费方法
- *
- * 8、执行消息方法
- */
- public class RoutingEMailConsumerMS {
- //Routing 路由模式
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String EXCHANGE = "routingMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Routing 路由模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- //Routing 路由模式
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, QUEUE_EMAIL);
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
-
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
routing路由模式之消费者(短信)
- package com.zzg.rabbitmq.routing;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者核心步骤
- * 1、声明队列,声明交换机
- *
- * 2、创建连接
- *
- * 3、创建通道
- *
- * 4、通道声明交换机
- *
- * 5、通道声明队列
- *
- * 6、通过通道使队列绑定到交换机并指定routingkey
- *
- * 7、重写消息消费方法
- *
- * 8、执行消息方法
- */
- public class RoutingSMSConsumerMS {
- //Routing 路由模式
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "routingMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Routing 路由模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- //Routing 路由模式
- channel.queueBind(QUEUE_SMS, EXCHANGE, QUEUE_SMS);
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
Topics 模式和Routing 路由模式最大的区别就是,Topics 模式发送消息和消费消息的时候是通过通配符去进行匹配的。
缺失图片
特点:
1、每个消费者监听自己的队列,并且设置带统配符的routingkey
2、生产者将消息发给broker,由交换机根据routingkey来转发消息到指定的队列
应用场景:用户通知,当用户充值成功或转账完成系统通知用户,通知方式有短信、邮件多种方法;
topics模式之生产者
- package com.zzg.rabbitmq.topic;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 生产者核心步骤
- * 1、声明队列,声明交换机
- *
- * 2、创建连接
- *
- * 3、创建通道
- *
- * 4、通道声明交换机
- *
- * 5、通道声明队列
- *
- * 6、通过通道使队列绑定到交换机并指定该队列的routingkey(通配符)
- *
- * 7、制定消息
- *
- * 8、发送消息并指定routingkey(通配符)
- */
- public class TopicProduceMS {
- //声明两个队列和一个交换机
- //Topics 模式
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "topicMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");//mq服务ip地址
- connectionFactory.setPort(5672);//mq client连接端口
- connectionFactory.setUsername("guest");//mq登录用户名
- connectionFactory.setPassword("guest");//mq登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
- //创建与RabbitMQ服务的TCP连接
- connection = connectionFactory.newConnection();
- //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Topics 模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, "inform.#.email.#");
- channel.queueBind(QUEUE_SMS, EXCHANGE, "inform.#.sms.#");
- //给email队列发消息
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送email消息。。。");
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish(EXCHANGE, "inform.email", null, message.getBytes());
- System.out.println("mq email 消息发送成功!");
- }
- //给sms队列发消息
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送sms消息。。。");
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish(EXCHANGE, "inform.sms", null, message.getBytes());
- System.out.println("mq sms 消息发送成功!");
- }
- //给email和sms队列发消息
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送email sms消息。。。");
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- channel.basicPublish(EXCHANGE, "inform.email.sms", null, message.getBytes());
- System.out.println("mq email sms 消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- }
- }
topics模式之消费者(邮件)
- package com.zzg.rabbitmq.topic;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- /**
- * 消费者核心步骤
- */
- public class TopicEMailConsumerMS {
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String EXCHANGE = "topicMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, "inform.#.email.#");
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
-
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
topics模式之消费者(短信)
- package com.zzg.rabbitmq.topic;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class TopicSMSConsumerMS {
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "topicMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.TOPIC);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定邮件队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- */
- channel.queueBind(QUEUE_SMS, EXCHANGE, "inform.#.sms.#");
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
-
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
header模式与routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(键值对)匹配队列。
案例:
根据用户的通知设置去通知用户,设置接收Email的用户只接收Email,设置接收sms的用户只接收sms,设置两种通知类型都接收的则两种通知都有效。
根据假设使用场景,需要一个生产端,两个消费端,不同的是,生产端声明交换机时,交换机的类型不同,是headers类型,生产端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,发送消息时也是使用header中的 key/value(键值对)匹配队列。
消费端同样是声明交换机时,交换机的类型不同,是headers类型,消费端队列绑定交换机时,不使用routingkey,而是使用header中的 key/value(键值对)匹配队列,消费消息时也是使用header中的 key/value(键值对)匹配队列。
Header模式之生产者
- package com.zzg.rabbitmq.header;
-
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.Hashtable;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
-
- public class HeaderProduceMS {
- //声明两个队列和一个交换机
- //Header 模式
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "HeaderMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");//mq服务ip地址
- connectionFactory.setPort(5672);//mq client连接端口
- connectionFactory.setUsername("guest");//mq登录用户名
- connectionFactory.setPassword("guest");//mq登录密码
- connectionFactory.setVirtualHost("/");//rabbitmq默认虚拟机名称为“/”,虚拟机相当于一个独立的mq服务器
- //创建与RabbitMQ服务的TCP连接
- connection = connectionFactory.newConnection();
- //创建与Exchange的通道,每个连接可以创建多个通道,每个通道代表一个会话任务
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- //Header 模式
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定短信队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- * 4、
- * String queue, String exchange, String routingKey, Map
arguments - */
- Map
headers_email = new Hashtable(); - headers_email.put("inform_type", "email");
- Map
headers_sms = new Hashtable(); - headers_sms.put("inform_type", "sms");
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, "", headers_email);
- channel.queueBind(QUEUE_SMS, EXCHANGE, "", headers_sms);
- //给email队列发消息
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送email消息。。。");
- Map
headers = new Hashtable(); - headers.put("inform_type", "email");//匹配email通知消费者绑定的header
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
- properties.headers(headers);
- //Email通知
- channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());
- System.out.println("mq email 消息发送成功!");
- }
- //给sms队列发消息
- for (int i = 0; i < 10; i++) {
- String message = new String("mq 发送sms消息。。。");
- Map
headers = new Hashtable(); - headers.put("inform_type", "sms");//匹配sms通知消费者绑定的header
- /**
- * 消息发布方法
- * param1:Exchange的名称,如果没有指定,则使用Default Exchange
- * param2:routingKey,消息的路由Key,是用于Exchange(交换机)将消息转发到指定的消息队列
- * param3:消息包含的属性
- * param4:消息体
- * 这里没有指定交换机,消息将发送给默认交换机,每个队列也会绑定那个默认的交换机,但是不能显示绑定或解除绑定
- * 默认的交换机,routingKey等于队列名称
- */
- //String exchange, String routingKey, BasicProperties props, byte[] body
- AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();
- properties.headers(headers);
- //sms通知
- channel.basicPublish(EXCHANGE, "", properties.build(), message.getBytes());
- System.out.println("mq sms 消息发送成功!");
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- try {
- channel.close();
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- try {
- connection.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }
Header模式之消费者(邮件)
- package com.zzg.rabbitmq.header;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.Hashtable;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
-
- public class HeaderEMailConsumerMS {
- private static final String QUEUE_EMAIL = "queueEmail";
- private static final String EXCHANGE = "HeaderMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_EMAIL, true, false, false, null);//通道绑定邮件队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- * 4、
- * String queue, String exchange, String routingKey, Map
arguments - */
- Map
headers_email = new Hashtable(); - headers_email.put("inform_email", "email");
- channel.queueBind(QUEUE_EMAIL, EXCHANGE, "", headers_email);
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
-
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_EMAIL, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
Header模式之消费者(短信)
- package com.zzg.rabbitmq.header;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.Hashtable;
- import java.util.Map;
- import java.util.concurrent.TimeoutException;
-
- public class HeaderSMSConsumerMS {
- private static final String QUEUE_SMS = "queueSms";
- private static final String EXCHANGE = "HeaderMessageChange";
-
- public static void main(String[] args) {
- Connection connection = null;
- Channel channel = null;
- try {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setHost("192.168.43.10");
- connectionFactory.setPort(5672);
- connection = connectionFactory.newConnection();
- channel = connection.createChannel();
- //通道绑定交换机
- /**
- * 参数明细
- * 1、交换机名称
- * 2、交换机类型,fanout、topic、direct、headers
- */
- channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.HEADERS);
- //通道绑定队列
- /**
- * 声明队列,如果Rabbit中没有此队列将自动创建
- * param1:队列名称
- * param2:是否持久化
- * param3:队列是否独占此连接
- * param4:队列不再使用时是否自动删除此队列
- * param5:队列参数
- * String queue, boolean durable, boolean exclusive, boolean autoDelete, Map
arguments - *
- */
- channel.queueDeclare(QUEUE_SMS, true, false, false, null);//通道绑定邮件队列
- //交换机和队列绑定
- /**
- * 参数明细
- * 1、队列名称
- * 2、交换机名称
- * 3、路由key
- * 4、
- * String queue, String exchange, String routingKey, Map
arguments - */
- Map
headers_email = new Hashtable(); - headers_email.put("inform_email", "sms");
- channel.queueBind(QUEUE_SMS, EXCHANGE, "", headers_email);
- //String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- /**
- * 消费者接收消息调用此方法
- * @param consumerTag 消费者的标签,在channel.basicConsume()去指定
- * @param envelope 消息包的内容,可从中获取消息id,消息routingkey,交换机,消息和重传标志
- (收到消息失败后是否需要重新发送)
- * @param properties
- * @param body
- * @throws IOException
- * String consumerTag, Envelope envelope, BasicProperties properties, byte[] body
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- //交换机
- String exchange = envelope.getExchange();
- //路由key
- String routingKey = envelope.getRoutingKey();
- envelope.getDeliveryTag();
- String msg = new String(body, "utf-8");
- System.out.println("mq收到的消息是:" + msg);
- }
-
- };
- System.out.println("消费者启动成功!");
- channel.basicConsume(QUEUE_SMS, true, consumer);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- }
- }
缺失图片
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3、服务端将RPC方法 的结果发送到RPC响应队列。
4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
大家有没有设想过RabbitMQ 消息长时间没有处理,消息是否会过期?,带着上述疑问开始本章节的学习。
默认情况下,消息是不会过期的,也就是我们平日里在消息发送时,如果不设置任何消息过期的相关参数,那么消息是不会过期的,即使消息没被消费掉,也会一直存储在队列中。
TTL(Time-To-Live),消息存活的时间,即消息的有效期。如果我们希望消息能够有一个存活时间,那么我们可以通过设置 TTL 来实现这一需求。如果消息的存活时间超过了 TTL 并且还没有被消息,此时消息就会变成死信,关于死信以及死信队列,我们在下一个章节会说到。
TTL 设置方式:
在声明队列的时候,我们可以在队列属性中设置消息的有效期,这样所有进入该队列的消息都会有一个相同的有效期。
在发送消息的时候设置消息的有效期,这样不同的消息就具有不同的有效期。
思考:如果队列声明了过期时间且消息也设置过期时间,以谁的为准?
以时间短为准。
第一步:创建Maven项目,添加Web 和RabbitMQ jar包依赖
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
第二步:添加RabbitMQ 配置对象
- package com.zzg.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- public class QueueConfig {
- public static final String TTL_QUEUE_DEMO = "ttl_queue_demo";
- public static final String TTL_EXCHANGE_DEMO = "ttl_exchange_demo";
- public static final String TTL_ROUTING_KEY = "ttl_routing_key";
-
- @Bean
- Queue queue() {
- return new Queue(TTL_QUEUE_DEMO, true, false, false);
- }
-
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange(TTL_EXCHANGE_DEMO, true, false);
- }
-
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(directExchange())
- .with(TTL_ROUTING_KEY);
- }
- }
此配置对象主要解决:
首先配置一个消息队列,new 一个 Queue:第一个参数是消息队列的名字;第二个参数表示消息是否持久化;第三个参数表示消息队列是否排他,一般我们都是设置为 false,即不排他;第四个参数表示如果该队列没有任何订阅的消费者的话,该队列会被自动删除,一般适用于临时队列。
配置一个 DirectExchange 交换机。
将交换机和队列绑定到一起。
第三步:添加消息过期Controller ,通过PostMan 工具模拟请求
- package com.zzg.controller;
-
- import com.zzg.config.QueueConfig;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/ttl")
- public class MessageTTLController {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @GetMapping("/message")
- public void hello() {
- Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
- // 设置消息过期时间
- .setExpiration("10000")
- .build();
- rabbitTemplate.convertAndSend(QueueConfig.TTL_QUEUE_DEMO, message);
- }
-
-
- }
仅仅需要修改RabbitMQ 配置对象和消息过期Controller
第一:修改RabbitMQ 配置对象,设置队列过期 时间
- @Bean
- Queue queue() {
- Map<String, Object> args = new HashMap<>();
- // 消息队列设置过期时间
- args.put("x-message-ttl", 10000);
- return new Queue(TTL_QUEUE_DEMO, true, false, false, args);
- }
其他都不需要修改,直接复用。
第二步:修改消息过期Controller,移除单条消息过期时间
- package com.zzg.controller;
-
- import com.zzg.config.QueueConfig;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/ttl")
- public class MessageTTLController {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @GetMapping("/message")
- public void hello() {
- // Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
- // // 设置消息过期时间
- // .setExpiration("10000")
- // .build();
-
- Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
- .build();
- rabbitTemplate.convertAndSend(QueueConfig.TTL_QUEUE_DEMO, message);
- }
-
-
- }
思考:如果将消息过期时间设置为0,这表示消息不能立马消费则立即被丢掉。
死信交换机,Dead-Letter-Exchange 即 DLX。
死信交换机用来接收死信消息(Dead Message)的,那什么是死信消息呢?一般消息变成死信消息有如下几种情况:
消息被拒绝(Basic.Reject/Basic.Nack) ,井且设置requeue 参数为false
消息过期
队列达到最大长度
当消息在一个队列中变成了死信消息后,此时就会被发送到 DLX,绑定 DLX 的消息队列则称为死信队列。
DLX 本质上也是一个普普通通的交换机,我们可以为任意队列指定 DLX,当该队列中存在死信时,RabbitMQ 就会自动的将这个死信发布到 DLX 上去,进而被路由到另一个绑定了 DLX 的队列上(即死信队列)。
死信队列实践
第一步:创建Maven项目,添加Web 和RabbitMQ jar包依赖
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-webartifactId>
- dependency>
- <dependency>
- <groupId>org.springframework.bootgroupId>
- <artifactId>spring-boot-starter-amqpartifactId>
- dependency>
第二步:添加RabbitMQ 配置对象 (普通队列和死信队列)
- package com.zzg.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 普通消息队列配置
- */
- @Configuration
- public class RabbitMQConfig {
- public static final String ROUTING_QUEUE_DEMO = "routing_queue_demos";
- public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange_demos";
- public static final String ROUTING_KEY = "routing_keys";
-
- @Bean
- Queue queue() {
- Map
args = new HashMap<>(); - //设置消息过期时间
- args.put("x-message-ttl", 0);
- //设置死信交换机
- args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);
- //设置死信 routing_key
- args.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);
- return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);
- }
-
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
- }
-
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(directExchange())
- .with(ROUTING_KEY);
- }
- }
温馨提示:为普通消息队列配置死信 队列
@Bean
Queue queue() {
Map args = new HashMap<>();
//设置消息过期时间
args.put("x-message-ttl", 0);
//设置死信交换机
args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);
//设置死信 routing_key
args.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);
return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);
}
两个参数:
x-dead-letter-exchange:配置死信交换机。
x-dead-letter-routing-key:配置死信 routing_key。
发送到这个消息队列上的消息,如果发生了 nack、reject 或者过期等问题,就会被发送到 DLX 上,进而进入到与 DLX 绑定的消息队列上。
- package com.zzg.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * RabbitMQ 死信队列配置
- */
- @Configuration
- public class RabbitMQDSLConfig {
- public static final String DLX_EXCHANGE_NAME = "dlx_exchange_names";
- public static final String DLX_QUEUE_NAME = "dlx_queue_names";
- public static final String DLX_ROUTING_KEY = "dlx_routing_keys";
-
- /**
- * 配置死信交换机
- *
- * @return
- */
- @Bean
- DirectExchange dlxDirectExchange() {
- return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
- }
- /**
- * 配置死信队列
- * @return
- */
- @Bean
- Queue dlxQueue() {
- return new Queue(DLX_QUEUE_NAME);
- }
- /**
- * 绑定死信队列和死信交换机
- * @return
- */
- @Bean
- Binding dlxBinding() {
- return BindingBuilder.bind(dlxQueue())
- .to(dlxDirectExchange())
- .with(DLX_ROUTING_KEY);
- }
- }
第三步:配置死信队列监听器
- package com.zzg.components;
-
- import com.zzg.config.RabbitMQDSLConfig;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 死信队列处理组件
- */
- @Component
- public class DSLComponent {
- @RabbitListener(queues = RabbitMQDSLConfig.DLX_QUEUE_NAME)
- public void handler(String message){
- System.out.println("dlx msg = " + message);
- }
- }
第四步:编写Controller,实现死信队列功能
- package com.zzg.controller;
-
- import com.zzg.config.RabbitMQConfig;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.core.MessageBuilder;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.web.bind.annotation.RequestMapping;
- import org.springframework.web.bind.annotation.RestController;
-
- @RestController
- @RequestMapping("/ttl")
- public class MessageTTLController {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @GetMapping("/message")
- public void hello() {
- Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
- .build();
- rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
- }
- }
延迟队列场景:
利用 RabbitMQ 自带的消息过期和私信队列机制,实现定时任务。
使用 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来实现定时任务,这种方案较简单。
我们需要下载 rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源项目,我们直接下载即可。
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择适合自己的版本,我这里选择3.9.0 版。
下载完成后,通过SFTP 上传CentOS服务器,再将此文件拷贝至RabbitMQ 容器中。
docker cp /home/rabbitmq_delayed_message_exchange-3.9.0.ez 4b0032:/plugins
第一个参数是宿主机上的文件地址,第二个参数是拷贝到容器的位置
再执行如下命令进入到 RabbitMQ 容器中:
docker exec -it 4b0032 /bin/bash
进入到容器之后,执行如下命令启用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
启用成功之后通过如下命令查看所有安装的插件:
rabbitmq-plugins list
MobeXterm 操作详细截图:
- [root@localhost home]# docker cp /home/rabbitmq_delayed_message_exchange-3.9.0.ez 4b0032:/plugins
- [root@localhost home]# docker exec -it 4b0032 /bin/bash
- root@4b0032878886:/# rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- Enabling plugins on node rabbit@4b0032878886:
- rabbitmq_delayed_message_exchange
- The following plugins have been configured:
- rabbitmq_delayed_message_exchange
- rabbitmq_management
- rabbitmq_management_agent
- rabbitmq_prometheus
- rabbitmq_web_dispatch
- Applying plugin configuration to rabbit@4b0032878886...
- The following plugins have been enabled:
- rabbitmq_delayed_message_exchange
-
- started 1 plugins.
- root@4b0032878886:/# rabbitmq-plugins list
- Listing plugins with pattern ".*" ...
- Configured: E = explicitly enabled; e = implicitly enabled
- | Status: * = running on rabbit@4b0032878886
- |/
- [ ] rabbitmq_amqp1_0 3.9.11
- [ ] rabbitmq_auth_backend_cache 3.9.11
- [ ] rabbitmq_auth_backend_http 3.9.11
- [ ] rabbitmq_auth_backend_ldap 3.9.11
- [ ] rabbitmq_auth_backend_oauth2 3.9.11
- [ ] rabbitmq_auth_mechanism_ssl 3.9.11
- [ ] rabbitmq_consistent_hash_exchange 3.9.11
- [E*] rabbitmq_delayed_message_exchange 3.9.0
- [ ] rabbitmq_event_exchange 3.9.11
- [ ] rabbitmq_federation 3.9.11
- [ ] rabbitmq_federation_management 3.9.11
- [ ] rabbitmq_jms_topic_exchange 3.9.11
- [E*] rabbitmq_management 3.9.11
- [e*] rabbitmq_management_agent 3.9.11
- [ ] rabbitmq_mqtt 3.9.11
- [ ] rabbitmq_peer_discovery_aws 3.9.11
- [ ] rabbitmq_peer_discovery_common 3.9.11
- [ ] rabbitmq_peer_discovery_consul 3.9.11
- [ ] rabbitmq_peer_discovery_etcd 3.9.11
- [ ] rabbitmq_peer_discovery_k8s 3.9.11
- [E*] rabbitmq_prometheus 3.9.11
- [ ] rabbitmq_random_exchange 3.9.11
- [ ] rabbitmq_recent_history_exchange 3.9.11
- [ ] rabbitmq_sharding 3.9.11
- [ ] rabbitmq_shovel 3.9.11
- [ ] rabbitmq_shovel_management 3.9.11
- [ ] rabbitmq_stomp 3.9.11
- [ ] rabbitmq_stream 3.9.11
- [ ] rabbitmq_stream_management 3.9.11
- [ ] rabbitmq_top 3.9.11
- [ ] rabbitmq_tracing 3.9.11
- [ ] rabbitmq_trust_store 3.9.11
- [e*] rabbitmq_web_dispatch 3.9.11
- [ ] rabbitmq_web_mqtt 3.9.11
- [ ] rabbitmq_web_mqtt_examples 3.9.11
- [ ] rabbitmq_web_stomp 3.9.11
- [ ] rabbitmq_web_stomp_examples 3.9.11
- root@4b0032878886:/# exit
第一:创建Maven项目,添加Web和RabbitMQ 依赖。
省略...
第二:添加RabbitMQ 配置对象 (延迟队列)
- package com.zzg.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.CustomExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 延迟队列配置对象
- */
- @Configuration
- public class RabbitMQDelayedConfig {
- public static final String QUEUE_NAME = "delay_queue";
- public static final String EXCHANGE_NAME = "delay_exchange";
- public static final String EXCHANGE_TYPE = "x-delayed-message";
-
- @Bean
- Queue queue() {
- return new Queue(QUEUE_NAME, true, false, false);
- }
-
- @Bean
- CustomExchange customExchange() {
- Map
args = new HashMap<>(); - args.put("x-delayed-type", "direct");
- return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);
- }
-
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(customExchange()).with(QUEUE_NAME).noargs();
- }
- }
延迟队列使用交换机为CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:
交换机名称。
交换机类型,这个地方是固定的。
交换机是否持久化。
如果没有队列绑定到交换机,交换机是否删除。
其他参数。
最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。
第三步:创建延迟队列的消费者
- package com.zzg.components;
-
- import com.zzg.config.RabbitMQDelayedConfig;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 延迟队列处理组件
- */
- @Component
- public class DelayedComponent {
-
- @RabbitListener(queues = RabbitMQDelayedConfig.QUEUE_NAME)
- public void handleMsg(String message) {
- System.out.println("delayed msg = " + message);
- }
- }
第四步:创建Controller,并且在头部设置消息延迟时间
- @GetMapping("/delayed")
- public void delayed() {
- Message message = MessageBuilder.withBody("Delayed RabbitMQ".getBytes())
- // 消息头中设置消息的延迟时间。
- .setHeader("x-delay", 3000)
- .build();
- rabbitTemplate.convertAndSend(RabbitMQDelayedConfig.EXCHANGE_NAME, RabbitMQDelayedConfig.QUEUE_NAME, message);
- }
延迟队列实现的思路也很简单,就是我们所说的 DLX(死信交换机)+TTL(消息超时时间)。
功能需求说明:
假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信
routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。
功能代码
第一步:创建Maven项目,添加Web 和RabbitMQ依赖
省略...
第二步: 添加RabbitMQ配置对象(普通队列和死信队列)
- package com.zzg.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 普通消息队列配置
- */
- @Configuration
- public class RabbitMQConfig {
- public static final String ROUTING_QUEUE_DEMO = "routing_queue_demos";
- public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange_demos";
- public static final String ROUTING_KEY = "routing_keys";
-
- @Bean
- Queue queue() {
- Map
args = new HashMap<>(); - //设置消息过期时间
- args.put("x-message-ttl", 1000 * 3);
- //设置死信交换机
- args.put("x-dead-letter-exchange", RabbitMQDSLConfig.DLX_EXCHANGE_NAME);
- //设置死信 routing_key
- args.put("x-dead-letter-routing-key", RabbitMQDSLConfig.DLX_ROUTING_KEY);
- return new Queue(ROUTING_QUEUE_DEMO, true, false, false, args);
- }
-
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
- }
-
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(directExchange())
- .with(ROUTING_KEY);
- }
- }
- package com.zzg.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- /**
- * RabbitMQ 死信队列配置
- */
- @Configuration
- public class RabbitMQDSLConfig {
- public static final String DLX_EXCHANGE_NAME = "dlx_exchange_names";
- public static final String DLX_QUEUE_NAME = "dlx_queue_names";
- public static final String DLX_ROUTING_KEY = "dlx_routing_keys";
-
- /**
- * 配置死信交换机
- *
- * @return
- */
- @Bean
- DirectExchange dlxDirectExchange() {
- return new DirectExchange(DLX_EXCHANGE_NAME, true, false);
- }
- /**
- * 配置死信队列
- * @return
- */
- @Bean
- Queue dlxQueue() {
- return new Queue(DLX_QUEUE_NAME);
- }
- /**
- * 绑定死信队列和死信交换机
- * @return
- */
- @Bean
- Binding dlxBinding() {
- return BindingBuilder.bind(dlxQueue())
- .to(dlxDirectExchange())
- .with(DLX_ROUTING_KEY);
- }
- }
第三步:为死信队列配置消息监听器
- package com.zzg.components;
-
- import com.zzg.config.RabbitMQDSLConfig;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 死信队列处理组件
- */
- @Component
- public class DSLComponent {
- @RabbitListener(queues = RabbitMQDSLConfig.DLX_QUEUE_NAME)
- public void handler(String message){
- System.out.println("dlx msg = " + message);
- }
- }
第四步:添加Controller, 触发消息发送
- @GetMapping("/message")
- public void hello() {
- Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
- .build();
- rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
- }
控制台输出:
- 2022-08-02 14:30:25.100 INFO 6960 --- [nio-8087-exec-3] o.s.web.servlet.DispatcherServlet : Completed initialization in 4 ms
- dlx msg = Hello RabbitMQ
温馨提示:本章节内容主要讨论如何确保消息生产者将消息发送成功,并不涉及消息消费的问题。
RabbitMQ 中的消息发送引入了 Exchange(交换机)的概念,消息的发送首先到达交换机上,然后再根据既定的路由规则,由交换机将消息路由到不同的 Queue(队列)中,再由不同的消费者去消费。
要确保消息发送的可靠性,主要从两方面去确认:
消息成功到达 Exchange
消息成功到达 Queue
如果能确认这两步,那么我们就可以认为消息发送成功了。
如果这两步中任一步骤出现问题,那么消息就没有成功送达,此时我们可能要通过重试等方式去重新发送消息,多次重试之后,如果消息还是不能到达,则可能就需要人工介入了。
经过上面的分析,我们可以确认,要确保消息成功发送,我们只需要做好三件事就可以了:
确认消息到达 Exchange。
确认消息到达 Queue。
开启定时任务,定时投递那些发送失败的消息。
保证消息的成功发送的三个步骤,前两个RabbitQ 提供了对应的解决办法,第三个步骤需要我们自己实现。
如何确保消息成功到达 RabbitMQ?RabbitMQ 给出了两种方案:
开启事务机制
发送方确认机制
温馨提示:
两种不同的方案,不可以同时开启,只能选择其中之一,如果两者同时开启,则会报如下错误:
第一步:创建Maven项目,添加Web 和RabbitMQ依赖
省略...
第二步: 添加RabbitMQ配置对象(普通队列+ 事务)
- package com.zzg.config;
-
- import org.springframework.amqp.core.Binding;
- import org.springframework.amqp.core.BindingBuilder;
- import org.springframework.amqp.core.DirectExchange;
- import org.springframework.amqp.core.Queue;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 普通消息队列配置
- */
- @Configuration
- public class RabbitMQConfig {
- public static final String ROUTING_QUEUE_DEMO = "routing_queue";
- public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange";
- public static final String ROUTING_KEY = "routing_key_transactional";
-
- @Bean
- Queue queue() {
- return new Queue(ROUTING_QUEUE_DEMO, true, false, false);
- }
-
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
- }
-
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(directExchange())
- .with(ROUTING_KEY);
- }
-
- @Bean
- RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
- return new RabbitTransactionManager(connectionFactory);
- }
- }
第三步:在消息生产者上面做两件事:添加事务注解并设置通信信道为事务模式:
- package com.zzg.service;
-
- import com.zzg.config.RabbitMQConfig;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
- import org.springframework.transaction.annotation.Transactional;
-
- @Service
- public class MessageServiceImpl {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Transactional
- public void send(){
- rabbitTemplate.setChannelTransacted(true);
- rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_EXCHANGE_DEMO,RabbitMQConfig.ROUTING_QUEUE_DEMO,"transactional Rabbitmq!".getBytes());
- int i = 1 / 0;
- }
-
-
- }
这里注意两点:
发送消息的方法上添加 @Transactional 注解标记事务。
调用 setChannelTransacted 方法设置为 true 开启事务模式。
这就 OK 了。
在上面的案例中,我们在结尾来了个 1/0 ,这在运行时必然抛出异常,我们可以尝试运行该方法,发现消息并未发送成功。
当我们开启事务模式之后,RabbitMQ 生产者发送消息会多出四个步骤:
客户端发出请求,将信道设置为事务模式。
服务端给出回复,同意将信道设置为事务模式。
客户端发送消息。
客户端提交事务。
服务端给出响应,确认事务提交。
上面的步骤,除了第三步是本来就有的,其他几个步骤都是平白无故多出来的。所以大家看到,事务模式其实效率有点低,这并非一个最佳解决方案。我们可以想想,什么项目会用到消息中间件?一般来说都是一些高并发的项目,这个时候并发性能尤为重要。
所以,RabbitMQ 还提供了发送方确认机制(publisher confirm)来确保消息发送成功,这种方式,性能要远远高于事务模式,一起来看下。
移除刚刚关于事务的代码,然后在 application.yml中配置开启消息发送方确认机制。
- spring:
- rabbitmq:
- host: 192.168.43.10
- port: 5672
- username: guest
- password: guest
- virtual-host: /
- publisher-confirm-type: correlated
- publisher-returns: true
publisher-confirm-type:配置消息到达交换器的确认回调,publisher-returns:配置消息到达队列的回调。
publisher-confirm-type属性的配置有三个取值:
none:表示禁用发布确认模式,默认即此。
correlated:表示成功发布消息到交换器后会触发的回调方法。
simple:类似 correlated,并且支持 waitForConfirms() 和 waitForConfirmsOrDie() 方法的调用。
RabbitMQ配置对象,开启两个监听
- package com.zzg.config;
-
- import org.springframework.amqp.core.*;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
- import java.util.HashMap;
- import java.util.Map;
-
- /**
- * 普通消息队列配置
- */
- @Configuration
- public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
- public static final String ROUTING_QUEUE_DEMO = "routing_queue";
- public static final String ROUTING_EXCHANGE_DEMO = "routing_exchange";
- public static final String ROUTING_KEY = "routing_key_transactional";
-
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Bean
- Queue queue() {
- return new Queue(ROUTING_QUEUE_DEMO, true, false, false);
- }
-
- @Bean
- DirectExchange directExchange() {
- return new DirectExchange(ROUTING_EXCHANGE_DEMO, true, false);
- }
-
- @Bean
- Binding binding() {
- return BindingBuilder.bind(queue())
- .to(directExchange())
- .with(ROUTING_KEY);
- }
-
- @PostConstruct
- public void initRabbitTemplate() {
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnCallback(this);
- }
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String s) {
- if (ack) {
- System.out.println(correlationData.getId() +":消息成功到达交换器");
- }else{
- System.out.println(correlationData.getId() +":消息发送失败");
- }
- }
-
- @Override
- public void returnedMessage(Message message, int i, String s, String s1, String s2) {
- System.out.println(message.getMessageProperties().getMessageId() +", 消息未成功路由到队列");
- }
- }
关于这个配置类,我说如下几点:
定义配置类,实现 RabbitTemplate.ConfirmCallback 和 RabbitTemplate.ReturnsCallback 两个接口,这两个接口,前者的回调用来确定消息到达交换器,后者则会在消息路由到队列失败时被调用。
定义 initRabbitTemplate 方法并添加 @PostConstruct 注解,在该方法中为 rabbitTemplate 分别配置这两个 Callback。
这就可以了。
尝试将消息发送到一个不存在的交换机中,代码如下:
- @GetMapping("/message")
- public void hello() {
- // Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
- // .build();
- // rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
- // exchange 不存在
- rabbitTemplate.convertAndSend("RabbitConfig.EXCHANGE_NAME",RabbitMQConfig.ROUTING_QUEUE_DEMO,"Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
- }
注意第一个参数是一个字符串,不是变量,这个交换器并不存在,此时控制台会报如下错误:
- 2022-08-02 15:33:26.051 ERROR 12100 --- [.168.43.10:5672] o.s.a.r.c.CachingConnectionFactory : Channel shutdown: channel error; protocol method: #method
(reply-code=404, reply-text=NOT_FOUND - no exchange 'RabbitConfig.EXCHANGE_NAME' in vhost '/', class-id=60, method-id=40) - 29cd1572-2db9-4236-b985-632597ab210e:消息发送失败
给定一个真实存在的交换器,但是给一个不存在的队列,像下面这样:
- @GetMapping("/message")
- public void hello() {
- // Message message = MessageBuilder.withBody("Hello RabbitMQ".getBytes())
- // .build();
- // rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_QUEUE_DEMO, message);
- // exchange 不存在
- //rabbitTemplate.convertAndSend("RabbitConfig.EXCHANGE_NAME",RabbitMQConfig.ROUTING_QUEUE_DEMO,"Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
- // queue 不存在
- rabbitTemplate.convertAndSend(RabbitMQConfig.ROUTING_EXCHANGE_DEMO,"RabbitMQConfig.ROUTING_QUEUE_DEMO","Hello RabbitMQ!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
- }
控制台输出错误信息:
- 2022-08-02 15:36:07.112 INFO 1536 --- [nio-8087-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 3 ms
- null, 消息未成功路由到队列
- a1906bf2-c4e9-44b5-adc6-d2e3f9d95728:消息成功到达交换器
失败重试分两种情况,一种是压根没找到 MQ 导致的失败重试,另一种是找到 MQ 了,但是消息发送失败了。
事务机制和发送方确认机制,都是发送方确认消息发送成功的办法。如果发送方一开始就连不上 MQ,那么 Spring Boot 中也有相应的重试机制,但是这个重试机制就和 MQ 本身没有关系了,这是利用 Spring 中的 retry 机制来完成的,具体配置如下:
- spring.rabbitmq.template.retry.enabled=true
- spring.rabbitmq.template.retry.initial-interval=1000ms
- spring.rabbitmq.template.retry.max-attempts=10
- spring.rabbitmq.template.retry.max-interval=10000ms
- spring.rabbitmq.template.retry.multiplier=2
从上往下配置含义依次是:
开启重试机制。
重试起始间隔时间。
最大重试次数。
最大重试间隔时间。
间隔时间乘数。(这里配置间隔时间乘数为 2,则第一次间隔时间 1 秒,第二次重试间隔时间 2 秒,第三次 4 秒,以此类推)
配置完成后,再次启动 Spring Boot 项目,然后关掉 MQ,此时尝试发送消息,就会发送失败,进而导致自动重试。
业务重试主要是针对消息没有到达交换器的情况。
如果消息没有成功到达交换器,根据我们上面的讲解,此时就会触发消息发送失败回调,在这个回调中,我们就可以做文章了!
整体思路是这样:
首先创建一张表,用来记录发送到中间件上的消息,像下面这样:

每次发送消息的时候,就往数据库中添加一条记录。这里的字段都很好理解,有三个我额外说下:
status:表示消息的状态,有三个取值,0,1,2 分别表示消息发送中、消息发送成功以及消息发送失败。
tryTime:表示消息的第一次重试时间(消息发出去之后,在 tryTime 这个时间点还未显示发送成功,此时就可以开始重试了)。
count:表示消息重试次数。
其他字段都很好理解,我就不一一啰嗦了。
在消息发送的时候,我们就往该表中保存一条消息发送记录,并设置状态 status 为 0,tryTime 为 1 分钟之后。
在 confirm 回调方法中,如果收到消息发送成功的回调,就将该条消息的 status 设置为1(在消息发送时为消息设置 msgId,在消息发送成功回调时,通过 msgId 来唯一锁定该条消息)。
另外开启一个定时任务,定时任务每隔 10s 就去数据库中捞一次消息,专门去捞那些 status 为 0 并且已经过了 tryTime 时间记录,把这些消息拎出来后,首先判断其重试次数是否已超过 3 次,如果超过 3 次,则修改该条消息的 status 为 2,表示这条消息发送失败,并且不再重试。对于重试次数没有超过 3 次的记录,则重新去发送消息,并且为其 count 的值+1。
当然这种思路有两个弊端:
去数据库走一遭,可能拖慢 MQ 的 Qos,不过有的时候我们并不需要 MQ 有很高的 Qos,所以这个应用时要看具体情况。
按照上面的思路,可能会出现同一条消息重复发送的情况,不过这都不是事,我们在消息消费时,解决好幂等性问题就行了。
当然,大家也要注意,消息是否要确保 100% 发送成功,也要看具体情况。
本章节主要讨论:如何确保消息消费成功,并且确保幂等。
RabbitMQ 的消息消费,整体上来说有两种不同的思路:
推(push):MQ 主动将消息推送给消费者,这种方式需要消费者设置一个缓冲区去缓存消息,对于消费者而言,内存中总是有一堆需要处理的消息,所以这种方式的效率比较高,这也是目前大多数应用采用的消费方式。
拉(pull):消费者主动从 MQ 拉取消息,这种方式效率并不是很高,不过有的时候如果服务端需要批量拉取消息,倒是可以采用这种方式。
这种方式大家比较常见,就是通过 @RabbitListener 注解去标记消费者,如下:
- package com.zzg.components;
-
- import com.zzg.config.RabbitMQDelayedConfig;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- /**
- * 延迟队列处理组件
- */
- @Component
- public class DelayedComponent {
-
- @RabbitListener(queues = RabbitMQDelayedConfig.QUEUE_NAME)
- public void handleMsg(String message) {
- System.out.println("delayed msg = " + message);
- }
- }
当监听的队列中有消息时,就会触发该方法。
- @Test
- public void test01() throws UnsupportedEncodingException {
- Object o = rabbitTemplate.receiveAndConvert(RabbitConfig.QUEUE_NAME);
- System.out.println("o = " + new String(((byte[]) o),"UTF-8"));
- }
调用 receiveAndConvert 方法,方法参数为队列名称,方法执行完成后,会从 MQ 上拉取一条消息下来,如果该方法返回值为 null,表示该队列上没有消息了。receiveAndConvert 方法有一个重载方法,可以在重载方法中传入一个等待超时时间,例如 3 秒。此时,假设队列中没有消息了,则 receiveAndConvert 方法会阻塞 3 秒,3 秒内如果队列中有了新消息就返回,3 秒后如果队列中还是没有新消息,就返回 null,这个等待超时时间要是不设置的话,默认为 0。
如果需要从消息队列中持续获得消息,就推荐使用推模式;如果只是单纯的消费一条消息,则使用拉模式即可。切忌将拉模式放到一个死循环中,变相的订阅消息,这会严重影响 RabbitMQ 的性能。
为了保证消息能够可靠的到达消息消费者,RabbitMQ 中提供了消息消费确认机制。当消费者去消费消息的时候,可以通过指定 autoAck 参数来表示消息消费的确认方式。
当 autoAck 为 false 的时候,此时即使消费者已经收到消息了,RabbitMQ 也不会立马将消息移除,而是等待消费者显式的回复确认信号后,才会将消息打上删除标记,然后再删除。
当 autoAck 为 true 的时候,此时消息消费者就会自动把发送出去的消息设置为确认,然后将消息移除(从内存或者磁盘中),即使这些消息并没有到达消费者。
来看一张图:

如上图所示,在 RabbitMQ 的 web 管理页面:
Ready 表示待消费的消息数量。
Unacked 表示已经发送给消费者但是还没收到消费者 ack 的消息数量。
这是我们可以从 UI 层面观察消息的消费情况确认情况。
当我们将 autoAck 设置为 false 的时候,对于 RabbitMQ 而言,消费分成了两个部分:
待消费的消息
已经投递给消费者,但是还没有被消费者确认的消息
换句话说,当设置 autoAck 为 false 的时候,消费者就变得非常从容了,它将有足够的时间去处理这条消息,当消息正常处理完成后,再手动 ack,此时 RabbitMQ 才会认为这条消息消费成功了。如果 RabbitMQ 一直没有收到客户端的反馈,并且此时客户端也已经断开连接了,那么 RabbitMQ 就会将刚刚的消息重新放回队列中,等待下一次被消费。
综上所述,确保消息被成功消费,无非就是手动 Ack 或者自动 Ack,无他。当然,无论这两种中的哪一种,最终都有可能导致消息被重复消费,所以一般来说我们还需要在处理消息时,解决幂等性问题。
当客户端收到消息时,可以选择消费这条消息,也可以选择拒绝这条消息。我们来看下拒绝的方式:
- @Component
- public class ConsumerDemo {
- @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
- public void handle(Channel channel, Message message) {
- //获取消息编号
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- //拒绝消息
- channel.basicReject(deliveryTag, true);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
消费者收到消息之后,可以选择拒绝消费该条消息,拒绝的步骤分两步:
获取消息编号 deliveryTag。
调用 basicReject 方法拒绝消息。
调用 basicReject 方法时,第二个参数是 requeue,即是否重新入队。如果第二个参数为 true,则这条被拒绝的消息会重新进入到消息队列中,等待下一次被消费;如果第二个参数为 false,则这条被拒绝的消息就会被丢掉,不会有新的消费者去消费它了。
需要注意的是,basicReject 方法一次只能拒绝一条消息。
消息确认分为自动确认和手动确认
在 Spring Boot 中,默认情况下,消息消费就是自动确认。
- @Component
- public class ConsumerDemo {
- @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
- public void handle2(String msg) {
- System.out.println("msg = " + msg);
- int i = 1 / 0;
- }
- }
通过 @Componet 注解将当前类注入到 Spring 容器中,然后通过 @RabbitListener 注解来标记一个消息消费方法,默认情况下,消息消费方法自带事务,即如果该方法在执行过程中抛出异常,那么被消费的消息会重新回到队列中等待下一次被消费,如果该方法正常执行完没有抛出异常,则这条消息就算是被消费了。
手动确认又分为两种:推模式手动确认与拉模式手动确认
推模式手动确认
要开启手动确认,需要我们首先关闭自动确认,关闭方式如下:
spring.rabbitmq.listener.simple.acknowledge-mode=manual
这个配置表示将消息的确认模式改为手动确认。
消费者端代码修改
- @RabbitListener(queues = RabbitConfig.JAVABOY_QUEUE_NAME)
- public void handle3(Message message,Channel channel) {
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- try {
- //消息消费的代码写到这里
- String s = new String(message.getBody());
- System.out.println("s = " + s);
- //消费完成后,手动 ack
- channel.basicAck(deliveryTag, false);
- } catch (Exception e) {
- //手动 nack
- try {
- channel.basicNack(deliveryTag, false, true);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- }
将消费者要做的事情放到一个 try..catch 代码块中。
如果消息正常消费成功,则执行 basicAck 完成确认。
如果消息消费失败,则执行 basicNack 方法,告诉 RabbitMQ 消息消费失败。
这里涉及到两个方法:
basicAck:这个是手动确认消息已经成功消费,该方法有两个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅确认当前消息消费成功,如果为 true,则表示当前消息之前所有未被当前消费者确认的消息都消费成功。
basicNack:这个是告诉 RabbitMQ 当前消息未被成功消费,该方法有三个参数:第一个参数表示消息的 id;第二个参数 multiple 如果为 false,表示仅拒绝当前消息的消费,如果为 true,则表示拒绝当前消息之前所有未被当前消费者确认的消息;第三个参数 requeue 含义和前面所说的一样,被拒绝的消息是否重新入队。
当 basicNack 中最后一个参数设置为 false 的时候,还涉及到一个死信队列的问题。
拉模式手动确认
拉模式手动 ack 比较麻烦一些,在 Spring 中封装的 RabbitTemplate 中并未找到对应的方法,所以我们得用原生的办法,如下:
- public void receive2() {
- Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
- long deliveryTag = 0L;
- try {
- GetResponse getResponse = channel.basicGet(RabbitConfig.JAVABOY_QUEUE_NAME, false);
- deliveryTag = getResponse.getEnvelope().getDeliveryTag();
- System.out.println("o = " + new String((getResponse.getBody()), "UTF-8"));
- channel.basicAck(deliveryTag, false);
- } catch (IOException e) {
- try {
- channel.basicNack(deliveryTag, false, true);
- } catch (IOException ex) {
- ex.printStackTrace();
- }
- }
- }
涉及到的 basicAck 和 basicNack 方法跟前面的一样,我就不再赘述。
问题场景:消费者在消费完一条消息后,向 RabbitMQ 发送一个 ack 确认,此时由于网络断开或者其他原因导致 RabbitMQ 并没有收到这个 ack,那么此时 RabbitMQ 并不会将该条消息删除,当重新建立起连接后,消费者还是会再次收到该条消息,这就造成了消息的重复消费。同时,由于类似的原因,消息在发送的时候,同一条消息也可能会发送两次。种种原因导致我们在消费消息时,一定要处理好幂等性问题。
幂等性问题的处理倒也不难,基本上都是从业务上来处理,我来大概说说思路。
采用 Redis,在消费者消费消息之前,现将消息的 id 放到 Redis 中,存储方式如下:
id-0(正在执行业务)
id-1(执行业务成功)
如果 ack 失败,在 RabbitMQ 将消息交给其他的消费者时,先执行 setnx,如果 key 已经存在(说明之前有人消费过该消息),获取他的值,如果是 0,当前消费者就什么都不做,如果是 1,直接 ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在 setnx 的基础上,再给 key 设置一个生存时间。生产者,发送消息时,指定 messageId。