先来看看我们的万年老图,从图上我们大概可以看出来一个消息会经历四个节点,只有保证这四个节点的可靠性才能保证整个系统的可靠性。
经历了这四个保证,我们才能保证消息的可靠性,从而保证消息不会丢失。
我们的生产者发送消息之后可能由于网络闪断等各种原因导致我们的消息并没有发送到MQ之中,但是这个时候我们生产端又不知道我们的消息没有发出去,这就会造成消息的丢失。
为了解决这个问题,RabbitMQ引入了事务机制和发送方确认机制(publisher confirm)
生产者的发送消息处理好了之后,我们就可以来看看MQ端的处理,MQ可能出现两个问题:
这两种情况都可以用RabbitMQ提供的mandatory参数来解决,它会设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。
我们既然要做可靠性,当然是设置为返回到客户端。
配置:
- spring:
- rabbitmq:
- addresses: 127.0.0.1
- host: 5672
- username: guest
- password: guest
- virtual-host: /
- # 打开消息确认机制
- publisher-confirm-type: correlated
- # 打开消息返回
- publisher-returns: true
- template:
- mandatory: true
我们只需要在配置里面打开消息返回即可,template.mandatory: true这一步不要少~
生产者:
- public void sendAndReturn() {
- User user = new User();
-
- log.info("Message content : " + user);
-
- rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
- log.info("被退回的消息为:{}", message);
- log.info("replyCode:{}", replyCode);
- log.info("replyText:{}", replyText);
- log.info("exchange:{}", exchange);
- log.info("routingKey:{}", routingKey);
- });
-
- rabbitTemplate.convertAndSend("fail",user);
- log.info("消息发送完毕。");
- }
这里我们可以拿到被退回消息的所有信息,然后再进行处理,比如放到一个新的队列单独处理,路由失败一般都是配置问题了。
到这一步基本都是一些很小概率的问题了,比如MQ突然宕机了或者被关闭了,这种问题就必须要对消息做持久化,以便MQ重新启动之后消息还能重新恢复过来。
消息的持久化要做,但是不能只做消息的持久化,还要做队列的持久化和Exchange的持久化。
- @Bean
- public DirectExchange directExchange() {
- // 三个构造参数:name durable autoDelete
- return new DirectExchange("directExchange", false, false);
- }
-
- @Bean
- public Queue erduo() {
- // 其三个参数:durable exclusive autoDelete
- // 一般只设置一下持久化即可
- return new Queue("erduo",true);
- }
创建Exchange和队列时只要设置好持久化,发送的消息默认就是持久化消息。
设置持久化时一定要将Exchange和队列都设置上持久化:
单单只设置Exchange持久化,重启之后队列会丢失。单单只设置队列的持久化,重启之后Exchange会消失,既而消息也丢失,所以如果不两个一块设置持久化将毫无意义。
Tip: 这些都是MQ宕机引起的问题,如果出现服务器宕机或者磁盘损坏则上面的手段统统无效,必须引入镜像队列,做异地多活来抵御这种不可抗因素。
最后一步会出问题的地方就在消费者端了,不过这个解决问题的方法我们之前的文章已经说过了,就是消费者的消息确认。
- spring:
- rabbitmq:
- addresses: 127.0.0.1
- host: 5672
- username: guest
- password: guest
- virtual-host: /
- # 手动确认消息
- listener:
- simple:
- acknowledge-mode: manual
打开手动消息确认(手动ACK)之后,只要我们这条消息没有成功消费,无论中间是出现消费者宕机还是代码异常,只要连接断开之后这条信息还没有被消费那么这条消息就会被重新放入队列再次被消费。
当然这也可能会出现重复消费的情况,不过在分布式系统中幂等性是一定要做的,所以一般重复消费都会被接口的幂等给拦掉。
所谓幂等性就是:一个操作多次执行产生的结果与一次执行产生的结果一致。
幂等性相关内容不在本章讨论范围~所以我就不多做阐述了。
这个例子中的消息是先入库的,然后生产者从DB里面拿到数据包装成消息发给MQ,经过消费者消费之后对DB数据的状态进行更改,然后重新入库。
这中间有任何步骤失败,数据的状态都是没有更新的,这时通过一个定时任务不停的去刷库,找到有问题的数据将它重新扔到生产者那里进行重新投递。
这个方案其实和网上的很多方案大同小异,基础的可靠性保证之后,定时任务做一个兜底进行不断的扫描,力图100%可靠性。