RabbitMQ是一款开源的消息队列中间件,它实现了高级消息队列协议(AMQP)标准。作为一个消息代理,RabbitMQ可以在应用程序之间可靠地传递和存储消息,并支持多种消息传递模式。
消息:在RabbitMQ中,消息是传输的基本单位。它由消息体和可选的属性组成,消息体是要传递的实际数据,而属性则包含有关消息的元数据信息。
队列:队列是消息的容器,它类似于一个缓冲区,用于存储待处理的消息。生产者将消息发送到队列,消费者从队列中接收和处理消息。
交换机:交换机是消息路由的核心组件,它接收来自生产者的消息,并根据特定的路由规则将消息分发给一个或多个绑定到它上面的队列。
绑定:绑定定义了交换机和队列之间的关系,它指定了消息在被发送到交换机时如何被路由到与之绑定的队列。
路由模式:RabbitMQ支持多种路由模式,包括直连、主题、扇出和头部路由等。不同的路由模式提供了不同的消息分发机制,以满足不同的应用需求。
可靠性:RabbitMQ提供了持久化消息、手动确认和事务等机制来确保消息的可靠性传递和处理。
高可用性:通过设置集群和镜像队列,RabbitMQ可以实现高可用性,确保即使某个节点或队列发生故障,系统仍然可用。
插件生态系统:RabbitMQ具有丰富的插件生态系统,可以扩展其功能和集成其他系统。
<dependency>
<groupId>org.springframework.bootgroupId>
<artifactId>spring-boot-starter-amqpartifactId>
dependency>
spring:
# rabbitmq相关配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: demo
password: demo
#虚拟host 可以不设置,使用server默认host
virtual-host: /demo
# 链接超时时间
connection-timeout: 1000
# 缓存配置
cache:
channel:
# 要保留在高速缓存中的通道数,注意此值不能超过 requested-channel-max
size: 2980
# 如果已达到高速缓存大小,则等待获取通道的持续时间。如果为0,则始终创建一个新通道。
checkout-timeout: 500
connection:
# 链接模式 channel 和 connection 两种,建议channel,channel使用ThreadLocal绑定,记得使用线程池
mode: channel
publisher-returns: true
publisher-confirm-type: simple
# 监听器相关配置(可以看作消费者链接工厂配置)
listener:
# 设置默认连接器(simple)
type: simple
# 简单的链接工厂(默认)
simple:
# 可选最大消费者数量 cpu * 2
max-concurrency: 20
# 手动确认消费
acknowledge-mode: manual
# 初始化消费者数量
concurrency: 10
# 消费者一次从MQ服务器拉取的数据量,
prefetch: 250
# 最大channel 数量,该数量和rabbit mq server中配置相关,二者取最小值
requested-channel-max: 3000
# 生产者配置
template:
retry:
# 是否开启重试
enabled: true
# 最大重试次数 5 次
max-attempts: 5
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MessageSender {
private final AmqpTemplate rabbitTemplate;
@Autowired
public MessageSender(AmqpTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
public void sendMessage(String message) {
rabbitTemplate.convertAndSend("<交换机名称>", "<路由键>", message);
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MessageReceiver {
@RabbitListener(queues = "<队列名称>")
public void receiveMessage(String message) {
// 处理接收到的消息
System.out.println("Received message: " + message);
}
}
org.springframework.amqp.AmqpTimeoutException: No available channels
,需要做好容错处理spring.rabbitmq.listener.type
进行设置
连接管理:确保在处理完消息后正确关闭连接,以避免资源泄漏。建议使用连接池来管理和复用连接。连接池推荐使用 CachingConnectionFactory
,默认的也是这种
消息确认:当发送消息到 RabbitMQ 时,可以选择等待确认。这样可以确保消息被成功处理,或者在发生错误时进行重试。确保在代码中实现消息确认机制,以保证消息的可靠性。
消息持久化:为了防止消息丢失,在发送消息时应将消息标记为持久化。这样即使 RabbitMQ 服务意外关闭,消息也会被保存在磁盘上,并在重新启动后恢复。
序列化与反序列化:在将对象转换为消息发送到 RabbitMQ 之前,需要进行序列化操作。同样,在接收到消息后,需要进行相应的反序列化操作。确保选择一种适合你的数据类型和语言的序列化方式。并且你的生产者和消费者的序列化方式必须相同
错误处理:当发送消息时,要考虑可能出现的异常情况。例如,RabbitMQ 服务器不可用或消息队列已满。在代码中实现适当的错误处理机制,可以记录日志、重试发送或采取其他措施
自己整理的 Spring boot 中RabbitMQ工作的简易流程图,可能有不对的地方,仅供参考