在项目开发过程中,对于发短信、发邮件和数据同步功能,或许会使用消息队列完成此功能的开发。既可以实现业务解耦,还可以保证在面对突发流量时相关业务正常运行。但在实际使用过程中,只了解最基本的发送和消费过程是不够的,还应了解消息的可靠性、持久性、可用性和扩展性等问题。通过阅读《RabbitMQ实战指南》一书,对于RabbitMQ有了更全面的了解,通过理论和项目实践有了更深入的了解,先将此过程记录如下。
在了解 RabbitMQ 相关特性之前,还需要了解如何安装,这对于后续的参数配置和维护很有用。推荐使用 Docker 完成部署
# 下载带有管理页面的镜像
docker pull rabbitmq:3.9-management
# 启动MQ并设置应用端口为5672,管理页面端口为15672,用户名为root,密码为123456
docker run -d --hostname my-rabbit -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=123456 ${镜像ID}
从宏观角度看,RabbitMQ 所扮演的角色位于生产者和消费者之间,为两者提供了缓冲功能。在RabbitMQ内部通过交换(Exchange)和和队列(Queue)实现消息分发,最终由消费者完成消费。
在 RabbitMQ 中存储四种类型的交换机
在生产者发送消息前,需要配置交换机参数和队列参数,并完成交换机和队列的绑定。此过程既可以在管理页面进行设置,也可以通过代码配置
在 SpringBoot 项目中使用 RabbitMQ 发送消息是非常方便的,这里对发送消息的API进行记录
至于消息的参数,可以通过官方文档了解,其中比较常见的配置如下:
名称 | 说明 |
---|---|
timestamp | 消息发送时间 |
messageId | 消息id,一般使用 uuid |
deliveryMode | 消息持久化,取值有:NON_PERSISTENT、PERSISTENT |
contentEncoding | 内容编码,取值有:GZIP、UTF-8等 |
contentType | 内容类型:application/json、application/xml等 |
当调用 send() 方法发送消息后,如何保证消息准确进入 RabbitMQ 和指定的队列哪?RabbitMQ 提供了事务和**发送方确认机制(publisher confirm)**解决此问题,这里对后者做如下记录:
@Slf4j
@Component
public class DefaultRabbitCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
/**
* 消息成功进入 RabbitMQ 时,回调此方法且 ack 为 true
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm");
}
/**
* 当交换机无法根据自身的特性或路由键找到一个符合条件的队列,就会回调此方法
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("returnedMessage");
}
}
@Slf4j
@RestController
public class MqController {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private DefaultRabbitCallback defaultRabbitCallback;
@GetMapping(value = "/send2")
public Long send2() {
// 配置 confirm 回调
rabbitTemplate.setConfirmCallback(defaultRabbitCallback);
// 配置 return 回调
rabbitTemplate.setReturnsCallback(defaultRabbitCallback);
// 数据体
Map<String, Object> map = new HashMap<>(2);
map.put("name", "王大海");
map.put("age", 25);
// 生成唯一 id
String msgId = IdUtil.fastSimpleUUID();
// 将唯一id 写入其中,后续当 confirm 回调失败时,可以完成一些业务逻辑
CorrelationData correlationData = new CorrelationData();
correlationData.setId(msgId);
// 配置消息参数并发送
rabbitTemplate.convertAndSend("example.exchange", "example.key1", JSONUtil.toJsonStr(map), message -> {
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.setTimestamp(new Date());
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setMessageId(msgId);
messageProperties.setContentEncoding(StandardCharsets.UTF_8.name());
messageProperties.setContentType(MediaType.APPLICATION_JSON_VALUE);
return message;
}, correlationData);
return System.currentTimeMillis();
}
}
RabbitMQ 消费模式有两种:推(Push)模式和拉(Get)模式。
拉模式
// 队列名称和超时时间
Message receive = rabbitTemplate.receive("example.queue", 3000);
String str = new String(receive.getBody());
MessageProperties messageProperties = receive.getMessageProperties();
String messageId = messageProperties.getMessageId();
推模式
当有多个消费者监听一个队列时,RabbitMQ 会以轮训的方式将消息分发给每个消费者
@Slf4j
@Component
public class MqListener {
/**
*
* @param channel channel
* @param message 消息体
* @throws IOException IOException
*/
@RabbitListener(queues = "example.queue")
public void listen(Channel channel, Message message) throws IOException {
String body = new String(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
Date timestamp = messageProperties.getTimestamp();
log.info("body:{}; messageProperties:{}; timestamp:{}", body, messageProperties, DateUtil.formatDateTime(timestamp));
}
}
当消费者接收到新消息,如果此时服务被强制关闭,如何保证这条新被正确消费吶?RabbitMQ 使用 ACK 机制保证了消息消费的可靠性。当消费过程中出现异常导致无法执行完整业务逻辑,此消息会重新回到队列中
@Slf4j
@Component
public class MqListener {
/**
* 手动ack
*
* @param channel channel
* @param message 消息体
* @throws IOException IOException
*/
@RabbitListener(queues = "example.queue", ackMode = "MANUAL", concurrency = "2")
public void listen(Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
String body = new String(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
Date timestamp = messageProperties.getTimestamp();
log.info("body:{}; messageProperties:{}; timestamp:{}", body, messageProperties, DateUtil.formatDateTime(timestamp));
// 成功消费,RabbitMQ 可以删除此消息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.warn("exception:{}", ExceptionUtil.stacktraceToString(e));
// 拒绝此标签的消息,并重新入队列
channel.basicNack(deliveryTag, false, true);
}
}
}
当消费者在消费信息的过程中出现异常,使用basicNack
和basicReject
命令并设置requeue
参数为false
时,此消息成为“死信”,存储死信的队列叫“死信队列”,转发消息的交换机叫“死信交换机”,同时,当消息过期和达到队列最大长度时,也会产生死信。
具体到实际的业务场景,死信队列也可以实现业务解耦和功能拆分。例如:30分钟未支付就删除订单信息和事件过期监听等场景。
实践出真知,使用SpringBoot
实现死信队列的搭建和消息监听。
以上是死信队列的数据流转过程,搭建代码如下所示:
@Configuration
public class MessageConfig {
public static final String NORMAL_QUEUE_NAME = "normal.queue";
public static final String NORMAL_EXCHANGE_NAME = "normal.exchange";
public static final String NORMAL_ROUTING_KEY = "normal.routing.key";
public static final String DLX_QUEUE_NAME = "dlx.queue";
public static final String DLX_QUEUE_EXCHANGE = "dlx.exchange";
public static final String DLX_QUEUE_ROUTING_KEY = "dlx.routing.key";
/**
* 什么普通交换机
*
* @return Exchange
*/
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange(NORMAL_EXCHANGE_NAME).durable(true).build();
}
/**
* 申明普通队列,并指定死信交换机和routingKey
*
* @return Queue
*/
@Bean
public Queue queue() {
return QueueBuilder.durable(NORMAL_QUEUE_NAME).deadLetterExchange(DLX_QUEUE_EXCHANGE).deadLetterRoutingKey(DLX_QUEUE_ROUTING_KEY).build();
}
/**
* 申明死信交换机
*
* @return Exchange
*/
@Bean
public Exchange dlxExchange() {
return ExchangeBuilder.directExchange(DLX_QUEUE_EXCHANGE).durable(true).build();
}
/**
* 申明死信队列
*
* @return Queue
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(DLX_QUEUE_NAME).build();
}
/**
* 建立普通交换机和普通队列的绑定关系
*
* @return Binding
*/
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(NORMAL_ROUTING_KEY).noargs();
}
/**
* 简历死信交换机和死信队列的绑定关系
*
* @return Binding
*/
@Bean
public Binding dlxBinding() {
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_QUEUE_ROUTING_KEY).noargs();
}
}
向普通队列中推送偶数,并完成消费,若未奇数则转发给死信队列消费
@RestController
public class MqController {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MqConfirmConfig mqConfirmConfig;
@GetMapping(value = "/send")
public Long send() {
MessageProperties messageProperties = new MessageProperties();
messageProperties.setMessageId(IdUtil.fastSimpleUUID());
messageProperties.setContentEncoding(StandardCharsets.UTF_8.name());
int i = RandomUtil.randomInt(1, 20);
Message message = new Message(String.valueOf(i).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.setConfirmCallback(mqConfirmConfig);
rabbitTemplate.send(MessageConfig.NORMAL_EXCHANGE_NAME, MessageConfig.NORMAL_ROUTING_KEY, message, new CorrelationData(IdUtil.fastSimpleUUID()));
return System.currentTimeMillis();
}
}
普通队列消费偶数
@Slf4j
@Component
public class NormalMessageListener {
@RabbitListener(queues = MessageConfig.NORMAL_QUEUE_NAME)
public void exec(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
int num = Integer.parseInt(s);
boolean fixed = num % 2 == 0;
if (fixed) {
log.info("偶数正常消费; num:{}", num);
channel.basicAck(messageProperties.getDeliveryTag(), false);
return;
}
log.info("奇数不消费; num:{}", num);
channel.basicReject(messageProperties.getDeliveryTag(), false);
}
}
死信队列消费奇数
@Slf4j
@Component
public class DlxMessageListener {
@RabbitListener(queues = MessageConfig.DLX_QUEUE_NAME)
public void dlxListener(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
log.info("死信队列消费奇数; s:{}", s);
channel.basicAck(messageProperties.getDeliveryTag(), false);
}
}
延时队列是一种特殊的死信队列,通过给消息设置过期时间(TTL),当消息过期后转发至死信交换机再进行消费
@RestController
public class MqController {
@Resource
private RabbitTemplate rabbitTemplate;
@Resource
private MqConfirmConfig mqConfirmConfig;
@GetMapping(value = "/send")
public Long send() {
MessageProperties messageProperties = new MessageProperties();
// 设置消息的有效时间为10秒
messageProperties.setExpiration(String.valueOf(TimeUnit.SECONDS.toMillis(10)));
messageProperties.setMessageId(IdUtil.fastSimpleUUID());
messageProperties.setContentEncoding(StandardCharsets.UTF_8.name());
Message message = new Message(DateUtil.now().getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.setConfirmCallback(mqConfirmConfig);
rabbitTemplate.send(MessageConfig.NORMAL_EXCHANGE_NAME, MessageConfig.NORMAL_ROUTING_KEY, message, new CorrelationData(IdUtil.fastSimpleUUID()));
return System.currentTimeMillis();
}
}
普通队列不消费,10秒钟后由死信交换机对应的消费者进行消费
@Slf4j
@Component
public class DlxMessageListener {
@RabbitListener(queues = MessageConfig.DLX_QUEUE_NAME)
public void dlxListener(Message message, Channel channel) throws IOException {
String s = new String(message.getBody());
MessageProperties messageProperties = message.getMessageProperties();
log.info("死信队列消费; s:{}", s);
channel.basicAck(messageProperties.getDeliveryTag(), false);
}
}
候后续补充