• RabbitMQ消息的可靠性


    RabbitMQ消息的可靠性

    一 生产者的可靠性

    1. 生产者重试
      有时候由于网络问题,会出现连接MQ失败的情况,可以配置重连机制
      注意:SpringAMQP的重试机制是阻塞式的,重试等待的时候,当前线程会等待。
    spring:
    	rabbitmq:
    		connection-timout: 1s #设置MQ的连接超时时间
    		templete:
    			retry:
    				enabled: true #开启超时重试机制
    				initial-interval: 100ms #失败后的初始等待时间
    				multipier: 1 #失败后下次的等待时长倍数, 下次等待时长=initial-interval*multipier
    				max-attempts: 3 #最大重试次数
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    1. 生产者确认

      (1)在生产者服务的yaml文件中配置一下内容
    spring:
    	rabbitmq:
    		publisher-confirm-type: correlated #开启publisher confirm机制,并设置为MQ异步回调方式返回回执信息
    		publisher-returns: true #开启publisher return机制
    
    • 1
    • 2
    • 3
    • 4

    (2)配置return-callback

    @Slf4j
    @Configuration
    public class CommonConfig implements ApplicationContexAware{
    	@Override
    	public void setApplicationContext(ApplicationContent applicationContext){
    		// 获取MQ
    		RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
    		// 设置returnCallback
    		rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey)->{
    	log.info("消息发送失败,应答码:{},原因:{},交换机:{},路由键:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());
    });
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    (3)发送消息,指定消息ID,消息的ConfirmCallback
    相比于发布消息,多了消息的confirm

    @Test
    public void testPubliserConfir()throw InterupteDException{
    	// 创建correlationData
    	CorrelationData cd = new CorrelationData(UUID.randowUUID().toString());
    	// 给Future添加ConfirmCallback
    	cd.getFuture().addCallback(new ListenableFutureCllback<CorelationData.Confirm>(){
    	@Override
    	public void onFailure(Throwable ex){
    		// Future发生异常时的处理逻辑,一般不触发
    		log.error("handle message ack fail",ex);
    	}
    	@Override
    	public void onSuccess(CorrelationData.Confirm result){
    		// Future接收到回执的处理逻辑
    		if(result.isAck()){
    			log.debug("发送消息成功,收到ACK");
    		}else{
    			log.error("发送消息失败,收到NACK,reason:{}",result.getReson());
    		}
    	}
    });
    // 发送消息
    rabbitTemplate.coverAndSend("hmall.direct","red","hello",cd);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    二 MQ的可靠性

    MQ的持久化可以使用Lazy Queue
    (1)通过配置类

    @Bean
    public Queue lazyQueue(){
    	return QueueBuilder.durable("lazy.queue")// 队列名称
    	.lazy()//开启lazy
    	bulid();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    (1)基于注解

    @RabbitListener(queuesToDeclare = @Queue(
    	name="lazy.queue",
    	durable="ture",
    	arguments=@Argument(name="x-queue-mode",value="lazy")
    ))
    public void listenLazyQueue(String msg){
    	log.info("接收到 lazy.queue的消息:{}",msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    三 消费者确认

    1. 消费者确认机制
      在这里插入图片描述
      可以通过配置来进行确认
    spring:
    	rabbitmq:
    		listenner:
    			simple:
    				prefetch: 1
    				acknowledgs-mode: auto #确认机制 none-关闭ack,manual-手动ack,auto-自动
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    1. 消费失败处理
      重试机制
    spring:
    	rabbitmq:
    		listenner:
    			simple:
    				prefetch: 1
    				retry:
    					enabled: true #开启超时重试机制
    					initial-interval: 100ms #失败后的初始等待时间
    					multipier: 1 #失败后下次的等待时长倍数, 下次等待时长=initial-interval*multipier
    					max-attempts: 3 #最大重试次数
    					stateless: true #true为无状态,若业务包含事务,则使用false
    					
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    失败处理策略
    在这里插入图片描述
    在这里插入图片描述
    代码实现

    @Slf4j
    @Configureation
    @ConditionalOnProperty(prefix="spring.rabbitmq.listenner.simple.retry",name="enable",havingValue="true")// 只有重试机制是true才生效
    public class ErrorConfiguration{
    	@Bbean
    	public DirectExchange errorExchange(){
    		return new DirectExchange("error.direct");
    	}
    	@Bean
    	public Queue errorQueue(){
    		return new Queue("error.queue");
    	}
    	@Bean
    	public Binding errorBinding(DirectExchange errorExchange,Queue errorQueue){
    		return BindingBuilder.bind(errorQueue).to(errorExchange).with("eooro");
    	}
    	/**
    	* 重试失败处理策略
    	* RepublishMessageRecoverer:重试失败后,将消息发送到指定的队列中
    	*/
    	@Bean
    	public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
    		log.info("MessageRecoverer 重试失败处理策略配置");
    		return new RepublishMessageRecoverer(rabbitTemplate,"error.direct","error");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    四 业务幂等性

    1. 消息唯一id
    2. 业务判断
      在这里插入图片描述
  • 相关阅读:
    Java面试题大全(整理版)1000+面试题附答案详解,最全面详细,看完稳了
    微服务绕不过的坎-服务雪崩
    音频编码格式介绍-AAC
    大数据技术基础实验四:HDFS实验——读写HDFS文件
    这个锂电池保护方案来自TIDA-010030
    深入理解合成复用原则(Composition /Aggregate Reuse Principle)
    基数排序的简单理解
    微信小程序中使用wx.showToast()进行界面交互
    konva系列教程3:自定义图形
    【大数据】Flink 详解(七):源码篇 Ⅱ
  • 原文地址:https://blog.csdn.net/hcyxsh/article/details/134275559