开启消息确认机制:
在发布消息时,可以设置deliveryMode为2(持久化),以确保消息不会因为RabbitMQ的崩溃而丢失。
使队列持久化:
通过设置durable为true,可以确保队列在RabbitMQ重启后依然存在。
使消费者确认机制:
启用手动确认模式,并在消费完消息后手动确认。
以下是使用Java和Spring AMQP的示例代码:
- @Bean
- public Queue myQueue() {
- return QueueBuilder.durable("myQueue").build();
- }
-
- @Bean
- public DirectExchange myExchange() {
- return new DirectExchange("myExchange");
- }
-
- @Bean
- public Binding myBinding() {
- return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey");
- }
-
- @Bean
- public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
- SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
- container.setConnectionFactory(connectionFactory);
- container.setQueueNames("myQueue");
- container.setMessageListener(listenerAdapter);
- container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认模式
- return container;
- }
-
- @Bean
- public MessageListenerAdapter listenerAdapter(MyConsumer myConsumer) {
- return new MessageListenerAdapter(myConsumer, "handleMessage");
- }
-
- public class MyConsumer {
- public void handleMessage(Message message) {
- // 处理消息
- // ...
-
- // 确认消息
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- }
在发送消息时:
- rabbitTemplate.convertAndSend("myExchange", "myRoutingKey", message, message -> {
- message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 设置消息持久化
- return message;
- });
确保你的消费者在处理完消息后调用basicAck来确认消息,这样即使消费者崩溃,未确认的消息也会被重新传递给另一个消费者。如果你希望在消费者异常时自动重新将消息放回队列,可以在handleMessage方法中捕获异常,并在异常处理逻辑中调用basicNack或basicReject方法,并设置重回队列的参数。