消息确认共分为两个方面,上一篇已经讲过了生产者推送消息确认,此篇文章就讲解一下 消费者接收消息的确认。
首先,我们要知道消费者从消息队列中拿到消息处理完成后,会反馈给RabbitMQ服务器,RabbitMQ在收到反馈后将此消息从队列中删除。
这就是RabbitMQ的ack消息确认机制,为了保证不会因为消费端异常导致正在处理中的消息未能成功处理,而造成数据丢失。
RabbitMQ消息确认模式有3种,分别是自动确认(NONE)、手动确认(MANUAL)以及根据情况确认(AUTO)。RabbitMQ默认开启自动确认,即视消费端的处理一律成功。
在特定业务场景下,我们可能让消费端处理程序更加灵活,例如:将处理失败的消息重新丢回队列中,用于后续继续被消费。将消息确认模式修改为手动确认(MANUAL)。
@Component
public class TopicConsumer {
@RabbitListener(queues = "TopicFirstQueue", ackMode = AckMode.MANUAL)
public void process(Map map, Message message, Channel channel) throws IOException {
System.out.println("[consumer-1]从队列[TopicFirstQueue]中收到消息:" + map.toString());
// 第一个参数deliveryTag表示消息ID;
// 第二个参数为false表示仅确认当前消息,否则一次性确认 delivery_tag 小于等于传入值的所有消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
/*
// 第一个参数deliveryTag表示消息ID;
// 第二个参数表示是否针对多条消息,如果是true则一次性把当前通道消息的delivery_tag小于等于当前这条消息的都拒绝确认,否则只针对当前这条消息;
// 第三个参数表示是否重新入列,如果是true则重新丢回队列里等待再次消费,否则数据只是被消费,不会丢回队列里。
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
*/
/*
// 第一个参数deliveryTag表示消息ID;
// 第二个参数为true表示是否重新入列,如果是true则重新丢回队列里等待再次消费,否则数据只是被消费,不会丢回队列里。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
*/
}
}
public class AckMode {
// 自动确认,默认模式
public final static String NONE = "NONE";
// 根据情况确认
public final static String AUTO = "AUTO";
// 手动确认
public final static String MANUAL = "MANUAL";
}
手动确认中消费者通过调用basicAck、basicNack、basicReject几种方法进行确认。注意,其中的deliveryTag表示RabbitMQ向该Canncel投递的消息的唯一标识,仅作用于Canncel内,是单调递增的。
在手动确认模式下,若消费者无法做出ack。对应的消息会一直保留在unnacked,等待消费端的反馈确认。