写在前面,由于Rabbit MQ涉及的内容较多,赶在春招我个人先按照我认为重要的内容进行一定总结,也算是个学习笔记吧。主要参考官方文档、其他优秀文章、大模型问答。自己边学习边总结。后面有时间我会慢慢把所有内容补全,分享出来也是希望可以给需要的同学提供一点儿帮助,想关注后续完整文章的可以点个关注丫,后续我会把现有文章全部重新补充和优化。
官网地址: RabbitMQ: One broker to queue them all | RabbitMQ
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.
The major difference between RabbitMQ and the post office is that it doesn't deal with paper, instead it accepts, stores, and forwards binary blobs of data ‒ messages.
RabbitMQ, and messaging in general, uses some jargon.
简单翻译
RabbitMQ 是一个消息代理:它接受和转发消息。你可以把它想象成一个邮局:当你把你想投递的邮件放在一个邮箱里时,你可以确定信使最终会把邮件送到你的收件人手中。在这个类比中,RabbitMQ 是一个邮箱、一个邮局和一个信箱。
RabbitMQ 和邮局之间的主要区别在于它不处理纸张,而是接受、存储和转发二进制数据 blob 数据 ‒ 消息。
RabbitMQ 和一般的消息传递使用一些行话。
消息队列是一种中间件技术,主要用于处理应用之间的通信、解耦和异步处理等问题。
消息队列的主要作用是在分布式系统中实现不同服务之间的数据传输和通信。它允许不同的组件在不同的时间点处理消息,从而实现了系统的异步处理能力。
解耦:消息队列允许发送者和接收者独立工作,不需要直接交互,从而降低了系统各部分之间的耦合度。
异步处理:发送者可以将消息放入队列后立即继续执行其他任务,而不必等待接收者处理完毕。
负载均衡:通过智能地分配消息到不同的接收者,可以平衡系统的负载。
容错:如果接收者暂时不可用,消息可以保留在队列中,直到接收者准备好处理它们。
可扩展性:可以根据需要增加接收者的数量来处理更多的消息。
顺序保证:某些消息队列提供消息的顺序保证,确保消息的发送和接收顺序一致。
传输保障:消息队列通常提供消息持久化的功能,即使系统崩溃,消息也不会丢失。
事件驱动架构:消息队列是实现事件驱动架构的关键组件,允许系统响应各种事件和触发相
RabbitMQ、RocketMQ、ActiveMQ、Kafka
RabbitMQ: 由Pivotal Software开发,使用Erlang语言编写。它支持多种协议,提供了可靠的消息传递模式,并且具有灵活的路由和负载均衡功能。
RocketMQ: 由阿里巴巴开发的分布式消息中间件,能够处理大规模的消息传输,具有良好的扩展性和高吞吐量。
ActiveMQ: 是Apache的一个开源项目,基于Java实现。它支持JMS规范,提供了丰富的消息模型和可靠性级别。
Kafka: 最初由LinkedIn开发,是一个分布式流处理平台,适合处理高吞吐量的数据流。Kafka具有高可扩展性和持久性,常用于日志收集和实时数据管道。
- public class Send {
- //队列名
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] argv) throws Exception {
- ...
- }
- }
- //创建到服务器的连接
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
-
- }
- //参数依次为:队列名、是否持久化、是否独占、是否自动删除、其他属性。
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- //交换机名称(这里为空字符串,表示使用默认的交换机)、队列名、路由键(这里为空,表示使用默认的路由键)、消息体(需要将字符串转换为字节数组)
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
完整代码
- public class Send {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }
- public class Recv {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- //参数依次为:队列名、是否持久化、是否独占、是否自动删除、其他属性。
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- }
- }
回调函数
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
-
- //用于消费指定队列中的消息。
- //消费的队列的名称。
- //自动确认消息。如果设置为true,则在消费者成功处理消息后会自动发送确认信号给RabbitMQ服务器;如果设置为false,则需要手动发送确认信号。
- //deliverCallback: 这是一个回调函数,用于处理接收到的消息。当有新的消息到达时,该回调函数将被调用。
- //consumerTag -> {}: 这是一个Lambda表达式,表示一个空的消费者标签(Consumer Tag)。消费者标签是一个唯一标识符,用于标识消费者实例。
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
完整代码
- public class Recv {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
- }
- }
扇形交换机(Fanout Exchange):采用广播模式,它会将消息路由到所有绑定的队列,不考虑路由键。这适用于需要将消息发送给多个消费者的情况。
直连交换机(Direct Exchange):它根据消息的路由键(Routing Key)进行匹配,将消息传递给与之绑定的队列。这种类型的交换机适用于一对一的消息传递场景,如日志处理或任务分发。
主题交换机(Topic Exchange):此类型允许使用模式匹配来路由消息。路由键可以包含特定的符号,如#
表示一个或多个词,*
表示一个词,这样可以实现更复杂的消息分发逻辑。
头交换机(Headers Exchange):这种类型的交换机不处理路由键,而是根据发送消息内容中的headers属性进行匹配。这为基于消息内容的路由提供了灵活性。
Fanout交换机在RabbitMQ中用于实现发布/订阅模式,它能够将消息广播到所有绑定的队列。
Fanout交换机的主要作用是将接收到的消息无条件地广播到所有与之绑定的队列中。这种类型的交换机不关心消息的内容,也不使用路由键(Routing Key)来决定消息的路由,而是简单地将消息复制到所有绑定的队列中。
以下是Fanout交换机的一些典型应用场景:
Fanout交换机的优点包括:
然而,Fanout交换机也有一些缺点:
总的来说,Fanout交换机适用于需要广泛分发消息的场景,但需要注意其资源使用效率和消息筛选的限制。
- public class FanoutProducer {
- //定义要使用的交换机名名称,这次的队列就叫fanout-exchange
- private static final String EXCHANGE_NAME = "fanout-exchange";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- //声明fanout类型的交换机
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- Scanner sc = new Scanner(System.in);
- while (sc.hasNext()) {
- String message = sc.nextLine();
- //将消息发送到指定的交换机
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
-
- }
-
-
- }
- }
- }
- public class FanoutConsumer {
- private static final String EXCHANGE_NAME = "fanout-exchange";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- //创建两个通道
- Channel channel1 = connection.createChannel();
- Channel channel2 = connection.createChannel();
-
- //声明交换机
- channel1.exchangeDeclare(EXCHANGE_NAME,"fanout");
- //创建队列1
- String queueName = "fanoutTest1";
- channel1.queueDeclare(queueName,true,false,false,null);
- channel1.queueBind(queueName,EXCHANGE_NAME,"");
- //创建队列2
- String queueName2 = "fanoutTest2";
- channel2.queueDeclare(queueName2,true,false,false,null);
- channel2.queueBind(queueName2,EXCHANGE_NAME,"");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- //创建交付回调函数1
- DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [喜羊羊] Received '" + message + "'");
- };
- //创建交付回调函数2
- DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [美羊羊] Received '" + message + "'");
- };
-
- //开始消费消息队列1
- channel1.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
- channel2.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
- }
- }
Direct交换机在RabbitMQ中用于根据消息的路由键将消息发送到与之完全匹配的队列上。
Direct交换机是RabbitMQ中常用的一种交换机类型,它属于AMQP(高级消息队列协议)的一部分。这种类型的交换机会根据消息携带的路由键(Routing Key)来决定消息的路由。只有当队列绑定到Direct交换机时指定了与消息完全相同的路由键,该队列才会接收到消息。这种方式确保了只有特定的队列能够接收到特定的消息,实现了精确的消息传递。
Direct交换机的典型应用场景包括:
Direct交换机的优点包括:
然而,Direct交换机也存在一些缺点:
- public class DirectProducer {
- //定义要使用的交换机名名称
- private static final String EXCHANGE_NAME = "direct-exchange";
-
- public static void main(String[] argv) throws Exception {
- //创建连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //主机地址为本机
- factory.setHost("localhost");
- //创建连接并创建通道
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- //声明交换机,类型为direct
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
-
- /* //拿路由键和消息内容
- String severity = getSeverity(argv);
- String message = getMessage(argv);*/
-
- Scanner sc = new Scanner(System.in);
- while (sc.hasNext()) {
- String str = sc.nextLine();
- String[] strings = str.split(" ");
-
- // 输入内容不符合要求,继续下一行
- if (strings.length < 1) {
- continue;
- }
-
- //获取消息内容和路由键
- String message = strings[0];
- String routingKey = strings[1];
-
- //发布消息到交换机
- //EXCHANGE_NAME: 这是一个字符串常量,表示要发布消息的交换机的名称。
- //severity: 路由键(Routing Key),用于确定消息应该被发送到哪个队列。
- //null: 表示没有使用任何消息属性。
- //message.getBytes("UTF-8"): 这是将要发送的消息内容,通过调用getBytes方法将其转换为字节数组,并指定字符编码为UTF-8。
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
- }
- }
- }
- }
- public class DirectConsumer {
- //定义正在监听的交换机名名称
- private static final String EXCHANGE_NAME = "direct-exchange";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明交换机,类型为direct
- channel.exchangeDeclare(EXCHANGE_NAME, "direct");
-
-
- // 创建队列,绑定到 xiaoming 路由键
- String queueName = "xiaoming_queue";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, EXCHANGE_NAME, "xiaoming");
- // 创建队列,绑定到 xiaohong 路由键
- String queueName2 = "xiaohong_queue";
- channel.queueDeclare(queueName, true, false, false, null);
- channel.queueBind(queueName, EXCHANGE_NAME, "xiaohong");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- //创建一个 DeliverCallback 实例来处理接收到的消息(xiaoming)
- DeliverCallback xiaomingdeliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [xiaoming] Received '" +
- delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
-
- //创建一个 DeliverCallback 实例来处理接收到的消息(xiaohong)
- DeliverCallback xiaohongdeliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [xiaohong] Received '" +
- delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- //开始消息队列中的消息,设置自动确认消息已经被消费(xiaoming)
- channel.basicConsume(queueName, true, xiaomingdeliverCallback, consumerTag -> {
- });
- //开始消息队列中的消息,设置自动确认消息已经被消费(xiaohong)
- channel.basicConsume(queueName, true, xiaomingdeliverCallback, consumerTag -> {
- });
- }
- }
Topic交换机在RabbitMQ中用于实现模式匹配的消息路由。
Topic交换机是RabbitMQ中的一种交换机类型,它允许更复杂的路由规则。与Direct交换机不同,Topic交换机不是基于精确匹配的路由键,而是使用模式匹配的方式来决定消息应该被发送到哪个队列。这种类型的交换机非常灵活,因为它可以基于多个标准进行路由。
以下是Topic交换机的一些典型应用场景:
Topic交换机的优点包括:
然而,Topic交换机也存在一些缺点:
总的来说,Topic交换机适用于需要基于多个标准或模式匹配来路由消息的场景,它提供了比Direct交换机更高的灵活性,但同时也带来了配置上的复杂性和潜在的性能影响。
Headers交换机在RabbitMQ中用于基于消息的头信息而非路由键来进行消息路由。
Headers交换机的使用场景包括但不限于:
然而,Headers交换机也存在一些缺点:
总的来说,虽然Headers交换机提供了一种基于消息头信息的灵活路由机制,但由于其性能上的不足和实际使用场景的限制,它并不是RabbitMQ中最常用的交换机类型。
RabbitMQ的消息过期机制,也称为TTL(Time To Live),是指可以对消息或队列设置一个过期时间,一旦超过这个时间,消息将被视为dead letter(死信)并可被删除或转移到死信队列。
该机制的用途包括但不限于:
在应用场景中,消息过期机制特别适用于以下情况:
优点包括:
然而,它也存在一些缺点:
ack: 消费成功
nack: 消费失败
rejiect: 拒绝
支持配置autoack,自动执行 ack 命令, 接受到消息立刻就成功
channel.basicConsume(queueName, true, xiaomingdeliverCallback, consumerTag -> { });
指定确认某条消息
- //getEnvelope(): 是delivery对象的一个方法,用于获取消息的信封信息。
- //getDeliveryTag(): 是Envelope对象的一个方法,用于获取消息的唯一标识符(即消息ID)。
- //false: 表示是否批量确认消息。如果为true,则表示批量确认;如果为false,则表示只确认当前消息。
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
拒绝某条消息
- //getEnvelope(): 是delivery对象的一个方法,用于获取消息的信封信息。
- //getDeliveryTag(): 是Envelope对象的一个方法,用于获取消息的唯一标识符(即消息ID)。
- //false: 表示是否批量拒绝消息。如果为true,则表示批量拒绝;如果为false,则表示只拒绝当前消息。
- //false: 表示是否将消息重新放回队列。如果为true,则表示将消息重新放回队列;如果为false,则表示将消息丢弃。
- channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
RabbitMQ的消息确认机制确保消息的可靠传递和处理,主要通过发布确认(Publish Confirm)和消费确认(Consumer Acknowledgement)来实现。
发布确认指的是生产者发送消息后,等待RabbitMQ服务器返回确认消息的过程,以确保消息已经被成功地发送到RabbitMQ服务器。而消费确认则是确认消费者是否成功消费了队列中的消息。
这种机制在以下场景下会用到:
优点包括:
然而,它也存在一些缺点:
总的来说,RabbitMQ的消息确认机制是一个强大的工具,可以帮助确保消息的可靠传递和处理,适用于对消息可靠性要求较高的场景。但是,它也需要合理设计和优化,以减少对系统性能的影响。
RabbitMQ中的死信队列(Dead-Letter-Exchange,简称DLX)是一种专门用于处理无法被正常消费的消息的队列。
死信队列的作用主要是接收那些因为各种原因无法被消费者正确处理的消息,这些原因可能包括消息过期、队列达到最大长度、消息被拒绝等。当这些消息成为所谓的“死信”时,它们可以被重新发布到另一个交换机,即死信队列中,从而可以进行后续的特殊处理或分析。
以下是一些典型的使用场景:
basic.reject
或basic.nack
),并且设置requeue=false
,那么这个消息会被发送到死信队列。优点包括:
然而,死信队列也有其缺点:
图源来自编程导航( 仅用于学习 侵删)