RabbitMQ
是用 Erlang
实现的一个高并发高可靠 AMQP
消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。
Erlang
是一门动态类型的函数式编程语言,Erlang 是基于 Actor 模型的实现。每个 Actor 对应着一个 Erlang 进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销。
系统的可用性降低
:在还未引进MQ之前,系统只需要关系生产端与消费端的接口一致性就可以了,现在引进后,系统需要关注生产端、MQ与消费端三者的稳定性,这增加系统的负担,系统运维成本增加。系统的复杂性提高
:引入了MQ,需要考虑的问题就增加了,如何保障消息的一致性,消费不被重复消费等问题。一致性问题
:A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。1、业务异步解耦
最常见的场景就是用户注册之后,需要发送注册短信、邮件通知,以告知相关信息。使用MQ,只需要在处理完用户信息之后,给MQ发送两个消息即可,邮件服务、短信服务监听MQ的任务消息,根据消息进行发送即可。
直销银行中台处理支付操作,将支付报文发给支付中心MQ,支付中心监听到请求,会处理相关逻辑并发送第三方支付公司,将最终处理结果发给中台系统接收MQ。
中台系统也可以轮询查询终态。
2、流量削峰填谷
MQ 比较常用的一个场景,一般在秒杀、搞活动中使用广泛。使用MQ,可以将需要处理的消息全部放入其中,系统按照最大处理能力,去获取消息进行消费,这样就可以将一瞬间过来的请求,分散到一段时间内进行处理,避免了系统的崩溃。
3、消息分发
多个系统对同一个数据感兴趣,只需要监听同一类消息即可。
例如付款系统,付款成功后,正常做法是通知外围系统付款成功了,或者是外围系统定时来拉取付款结果。使用 MQ 后,付款系统可以在付款成功后,将消息放到 MQ 里面,想知道这个结果的系统订阅这个主题的消息即可,非常方便,也不需要定时去拉取数据了。
AMQP
(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而 RabbitMQ 则是 AMQP 的一种基于 erlang 的实现。AMQP 将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。
与 ActiveMQ 拿到消息就直接放在队列等待消费者拿走不同, RabbitMQ 拿到消息之后,会先交给 交换机 (Exchange
), 然后交换机再根据预先设定的不同绑定(Bindings)策略,来确定要发给哪个队列。
ConnectionFactory
:与 RabbitMQ 服务器连接的管理器Connection
:与 RabbitMQ 服务器的 TCP 连接Channel
:与Exchange
的连接,一个 Connection 可以包含多个 Channel。之所以需要 Channel,是因为 TCP 连接的建立和释放都是十分昂贵的,为了多路复用。RabbitMQ 建议客户端线程之间不要共用 Channel,但是 建议尽量共用 Connection。Exchange
:接受消息生产者的消息,并根据消息的RoutingKey
和 Exchange 绑定的BindingKey
,以及Binding规则
将消息路由给队列。ExchangeType 决定了 Exchange 路由消息的行为,例如,在RabbitMQ中,ExchangeType有 direct、fanout、topic、header ,不同类型的 Exchange 路由的行为是不一样的。Queue
:消息队列,用于存储还未被消费者消费的消息。Message
:由 Header 和 Body 组成,Header是由生产者添加的各种属性的集合,包括 Message 是否被持久化、由哪个 Message Queue 接受、优先级是多少等。而 Body 是真正需要传输的 APP 数据。RoutingKey
:由 Producer 发送 Message 时指定,指定当前消息被谁接受。BindingKey
:由 Consumer 在绑定 Exchange 与 Queue 时指定,指定当前 Exchange 下,什么样的 RoutingKey 会被下派到当前绑定的 Queue 中。Binding
:用于 消息队列-Queue 和 交换器-Exchange 之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。安装教程较简单,此处不做阐述,下载 RabbitMQ 和 Erlang 的安装包时要注意版本是否对应,不然不兼容无法安装启动。
参考文档:
📖 RabbitMQ安装
📖 RabbitMQ和Erlang对应版本
📌1. 引入依赖
<dependency>
<groupId>com.rabbitmqgroupId>
<artifactId>amqp-clientartifactId>
<version>3.6.5version>
dependency>
📌2. RabbitMQ工具类
public class RabbitMQUtils {
public static Connection getConnection() {
String host = "localhost";
// 注意:连接端口是5672,不是15672,15672是管理页面访问端口
int port = 5672;
String username = "guest";
String password = "guest";
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(host);
factory.setPort(port);
factory.setUsername(username);
factory.setPassword(password);
// 2、创建一个新的连接,TCP连接的建立和释放都是十分昂贵,建议尽量共用Connection。
Connection conn = null;
try {
conn = factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return conn;
}
public static void close(Connection conn, Channel channel) throws IOException, TimeoutException {
//关闭通道和连接
channel.close();
conn.close();
}
}
💦 配置 RabbitMQ 服务器连接工厂 ConnectionFactory ,并获取新的连接 Connection。
📌3. 生产者
public class TestProducer {
public final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个新的连接
Connection conn = RabbitMQUtils.getConnection();
//创建一个通道
Channel channel = conn.createChannel();
/* queue:队列的名称
* durable:是否持久化
* exclusive:设置是否排他
* autoDelete:设置是否自动删除。至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
* arguments:设置队列的其他一些参数
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
/* exchange:要将消息发送到的Exchange(交换器)
* routingKey: 路由KEY
* props: 消息的其它属性,如:路由头等
* body: 消息体
*/
channel.basicPublish("", QUEUE_NAME, null, ("hello").getBytes());
RabbitMQUtils.close(conn, channel);
}
}
💦 生产者声明定义队列,通过basicPublish()
方法往队列插入消息。
📌4. 消费者
public class TestConsumer {
public final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception{
Connection conn = RabbitMQUtils.getConnection();
Channel channel = conn.createChannel();
/* queue:队列名
* autoAck:true 接收到传递消息自动应答,false 接收到消息后不自动应答服务器
* deliverCallback: 当一个消息发送过来后的回调接口
*/
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1: 收到的消息: " + new String(body));
}
});
}
}
💦 消费者通过basicConsume()
方法堵塞监听指定队列(test_queue)的消息,接收到消息后,通过回调函数handleDelivery()
处理消息。
📌5. 测试
🎈1、先启动生产者 TestProducer:
💦 管理台可见,新增队列 test_queue,而且队列中存储一条消息。
🎈2、再启动消费者 TestConsumer:
💦 消费者收到并且消费消息。
RabbitMQ管理页面小知识:
- Queued messages:
Ready:队列存储没有投递给消费者的数量。只有消费者消费速度赶不上生产者发送消息速度时,才产生堆积,Ready才显示不为0。
Unacked:投递给消费者没有收到ACK数量。只有消费者接收了消息未发送ACK,Unacked才显示不为0。
Total:Ready+Unacked
- Message rates:统计消费的速率
详细管理台操作手册参考:
📖 RabbitMQ 管理界面操作说明
⚡注:⚡ Queue队列的声明queueDeclare()
在生产者、消费者代码都可以声明。队列声明创建后,将保存到 RabbitMQ。
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
📌1. 生产者
for (int i = 0; i < 20; i++) {
channel.basicPublish("", QUEUE_NAME, null, (i + "hello").getBytes());
}
📌2. 消费者2
public class TestConsumer2 {
public final static String QUEUE_NAME = "test_queue";
public static void main(String[] args) throws Exception{
Connection conn = RabbitMQUtils.getConnection();
Channel channel = conn.createChannel();
/* queue:队列名
* autoAck:true 接收到传递消息自动应答,false 接收到消息后不自动应答服务器
* deliverCallback: 当一个消息发送过来后的回调接口
*/
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-2: 收到的消息: " + new String(body));
}
});
}
}
📌3. 测试
🎈1、先启动消费者TestConsumer、消费者TestConsumer2监听,再启动生产者:
💦 两个消费者共同消费队列中的消息,因为消费速度很快,所以,Queued messages 中 Ready 堆积为0。
fanout 广播, 把消费转发给所有绑定到该交换机的所有队列;
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
- 交换机把消息发送给绑定过的所有队列。
- 每个消费者有自己的队列,并且队列与交换机进行绑定。
- 消费者从自己的队列消费消息,实现一条消息被多个消费者消费。
📌1. 生产者
public class TestProducer {
public final static String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
//创建一个新的连接
Connection conn = RabbitMQUtils.getConnection();
//创建一个通道
Channel channel = conn.createChannel();
/* 交换机声明
* var1:交换机名称
* var2:交换机类型
*/
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
/* exchange:要将消息发送到的Exchange(交换器)
* routingKey: 路由KEY
* props: 消息的其它属性,如:路由头等
* body: 消息体
*/
for (int i = 0; i < 8; i++) {
channel.basicPublish(EXCHANGE_NAME, "", null, (i + "hello").getBytes());
}
RabbitMQUtils.close(conn, channel);
}
}
💦 通过channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
声明定义交换机的名称、类型。
📌2. 消费者
public class TestConsumer {
public final static String EXCHANGE_NAME="fanout_exchange";
public static void main(String[] args) throws Exception{
Connection conn = RabbitMQUtils.getConnection();
Channel channel = conn.createChannel();
// 获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
/* 队列与交换机绑定
* var1:队列名称
* var2:交换机名称
* var3:routingKey
*/
//交换机与队列进行绑定,如果没有队列与交换机进行绑定,那么消费者接受不到生产者的消息,消息会丢失。
channel.queueBind(queueName, EXCHANGE_NAME,"");
/* queue:队列名
* autoAck:true 接收到传递消息自动应答,false 接收到消息后不自动应答服务器
* deliverCallback: 当一个消息发送过来后的回调接口
*/
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1: 收到的消息: " + new String(body));
}
});
}
}
💦 消费者定义队列,并且通过queueBind()
方法将队列和交换机进行绑定。
📌3. 测试
🎈1、先启动消费者TestConsumer、消费者TestConsumer2监听,再启动生产者:
⚠️注意:⚠️ 如果先启动生产者,再启动消费者,后启动的消费者将接收不到消息。因为,如果将消息发送给交换机 Exchange,如果Exchange没有匹配到队列,会将消息丢弃。
⚡注:⚡ Exchange交换机, 只负责转发消息, 不具备存储消息的能力; 因此如果没有任何队列与 Exchange 绑定, 或者没有符合规则的队列, 那么消息会丢失;
⚡注:⚡ Exchange交换机的声明exchangeDeclare()
在生产者、消费者代码都可以声明。交换机声明创建后,将保存到 RabbitMQ。
direct 定向, 把消息转发给符合 指定 routing key 路由键的队列;
- 队列与交换机的绑定,要指定一个RoutingKey(路由key)
- 发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
- Exchange 根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
📌1. 生产者
// 声明一个交换机, 交换机的类型为 direct
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
/* exchange:要将消息发送到的Exchange(交换器)
* routingKey: 路由KEY
* props: 消息的其它属性,如:路由头等
* body: 消息体
*/
channel.basicPublish(EXCHANGE_NAME, "success", null, "成功信息".getBytes());
channel.basicPublish(EXCHANGE_NAME, "error", null, "错误信息".getBytes());
💦 生产者发送消息时,指定 routingKey
。
📌2. 消费者
// 获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
/* 队列与交换机绑定
* var1:队列名称
* var2:交换机名称
* var3:routingKey
*/
channel.queueBind(queueName, EXCHANGE_NAME, "success");
💦 消费者绑定队列时,指定 routingKey
。
📌3. 消费者2
// 指定 routingKey = error
channel.queueBind(queueName, EXCHANGE_NAME, "error");
📌4. 测试
🎈1、先启动消费者TestConsumer、消费者TestConsumer2监听,再启动生产者:
topic 通配符, 把消息交给 routing pattern(通配符模式)的队列;
topic 类型与 direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 topic 类型 Exchange 可以让队列在绑定 Routing key 的时候使用通配符!!这种模型 Routingkey 一般都是由一个或多个单词组成,多个单词之间以
”.”
分割,例如:company.java
# 通配符
* :匹配不多不少恰好1个词
# :匹配一个或多个词
# 如:
company.* :只能匹配 company.java
company.# :匹配 company.java 或者 company.java.oracle 等
📌1. 生产者
// 声明一个交换机, 交换机的类型为 direct
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, "company.java", null, "通知信息".getBytes());
📌2. 消费者
// 获取一个临时队列
String queueName = channel.queueDeclare().getQueue();
// 指定 routingKey = company.#
channel.queueBind(queueName, EXCHANGE_NAME,"company.#");
📌3. 消费者2
// 指定 routingKey = company.java.#
channel.queueBind(queueName, EXCHANGE_NAME,"company.java.#");
📌4. 消费者3
// 指定 routingKey = company.html.*
channel.queueBind(queueName, EXCHANGE_NAME,"company.html.*");
📌5. 测试
🎈1、先启动消费者TestConsumer、消费者TestConsumer2、消费者TestConsumer3监听,再启动生产者:
💦 basicConsume()
方法第二个参数:true 接收到传递消息自动应答,false 接收到消息后不自动应答服务器。设置 false 后,需要手动应答。
📌1. TestConsumer:
//autoAck:false,接收到消息后不自动应答服务器
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1: 收到的消息: " + new String(body));
}
});
📌2. 测试
🎈1、先启动消费者TestConsumer监听,再启动生产者:
💦 Unacked:投递给消费者没有收到ACK数量。消费者没有返回ACK,消息堆积在队列中。
📌TestConsumer中加入手动返回ACK
channel.basicAck(envelope.getDeliveryTag(), false);
📌TestConsumer:
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer-1: 收到的消息: " + new String(body));
// 手动确认消息已经被消费了, 第一个参数是当前消费的消息的标签(递增的整数)
// 第二个参数是否确认多条消息,包括之前消费的消息
System.out.println(envelope.getDeliveryTag());
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
💦 启动TestConsumer后,会把队列中之前未返回ACK的消息依次接收并返回ACK,队列接收到ACK后将消息删除。
篇幅过大,下接 《消息队列-RabbitMQ(二)》