• 仿大众点评——秒杀系统部分03——RabbitMq措施


    RabbitMq保证消息不丢失

    RabbitMQ如何保证消息的可靠性:
    1.从生产者到消息队列,congfirm模式(与事务相比confirm模式最大的优势是异步)通过消息确认机制来保证,通过给每个指派唯一标志,完成消费后返回ack确认, 2 消息队列 消息队列的持久化,包括交换机持久化,消息队列持久化以及消息持久化 3.消费者 消费完返回完后手动ack确认,开启RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。 若长时间没有ack确认会像下一个消费者发送任务,同时需要做等幂性处理。

    RabbitMQ消息丢失的情况

    RabbitMQ 消息丢失的源头主要有以下三个:

    • 生产者丢失消息
    • RabbitMQ 丢失消息
    • 消费者丢失消息
    生产者丢失消息解决方案
    事务消息机制:在生产者发送消息之前,通过 channel.txSelect 开启一个事务,接着发送消息,如果消息没有成功被 RabbitMQ 接收到,生产者会收到异常,此时就可以进行事务回滚 channel.txRollback 然后重新发送;假如 RabbitMQ 收到了这个消息,就可以提交事务channel.txCommit;但是这样一来,生产者的吞吐量和性能都会降低很多,现在一般不这么干。
    // 开启事务
    channel.txSelect
    try {
          // 这里发送消息
    } catch (Exception e) {
          channel.txRollback
    // 这里再次重发这条消息
    }
    // 提交事务
    channel.txCommit
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    confirm 机制:就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道我已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

    一般采用异步confirm,所以发送消息之前,需要把消息存起来。所以我们需要为消息分配全局唯一的Id,与消息内容一一对应。

    生产者也需要监听Broker发送的通知,根据ack / nack 进行确认。

    而具体实现时,需要注意:

    • 限制重发次数:超限应标志为任务异常,通过人工处理,避免一直重发浪费资源
    • 定时扫描未确认消息,因为broker的响应可能会丢失
    rabbitmq:
        # 开启发送确认
        publisher-confirm-type: correlated
    
    • 1
    • 2
    • 3
    @Service
    @Slf4j
    public class MqSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        //发送秒杀信息
        public void sendSeckillMessage(String msg){
            log.info("发送消息:" + msg);
            rabbitTemplate.convertAndSend("seckillExchange", "seckill.message", msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    //连接工厂
        @Resource
        private CachingConnectionFactory cachingConnectionFactory;
    //向spring容器中注入RabbitTemplate对象,并配置开启确认消息回调和消息失败回调
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
            /**
             * 消息确认回调,确认消息是否到达broker
             * correlationData:消息唯一标识
             * ack:确认结果,消息是否发送成功
             * cause:失败原因
             */
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                String msgId = correlationData.getId();
                if (ack) {
                    LOGGER.info("消息发送成功============>" + msgId);
                    //监听消息是否发送成功,如果成功将状态改为1
    //                mailLogService.update(new UpdateWrapper().set("status", 1).eq("msgId", msgId));
                } else {
                    LOGGER.error("消息发送到queue失败============>" + msgId);
                }
            });
            return rabbitTemplate;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    RabbitMQ 丢失消息解决方案
    消息持久化

    RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,需要给exchange、queue和message都进行持久化。

    设置持久化,两个步骤:必须都设置。

    • 创建 exchange、queue 时,设置为持久化 durable = true:(其实默认是持久化):会持久化 queue 的元数据,但不会持久化 queue 里的消息
    • 因此需要单独设置消息的持久化:发送消息时,将消息Properties的 deliveryMode=PERSISTENT:

    无法保障100%不丢失,因为可能持久化完成前就宕机。

    (若设置持久化,会在持久化完成后再发出ack)

    @Bean
        public Queue queue(){
            // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
            // exclusive:该队列是否只供一个消费者进行消费是否进行消息共享,true可以多个消费者消费,false:只能-一个消费者消费
            // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
            return new Queue(QUEUE);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    //绑定correlationId、以及消息持久化
            rabbitTemplate.setBeforePublishPostProcessors(new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message, Correlation correlation) {
                    MessageProperties messageProperties = message.getMessageProperties();
    
                    if (correlation instanceof CorrelationData) {
                        String correlationId = ((CorrelationData) correlation).getId();
                        messageProperties.setCorrelationId(correlationId);
                    }
                    // 持久化处理
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                }
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    MessageProperties messageProperties = message.getMessageProperties();
                    messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    消息入库

    消息入库,顾名思义就是将要发送的消息保存到数据库中。

    • 发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示消息发送了但还没收到确认;收到确认后将status设为1,表示RabbitMQ已收到消息。
    • 生产端这边还需要开一个定时器,定时检索消息表,将status=0并且超过固定时间后(可能消息刚发出去还没来得及确认,这边定时器刚好检索到这条status=0的消息,所以要设置时间)
    • 还没收到确认的消息取出重发 (消息重发可能会造成幂等性问题,这里消费端要做幂等性处理),可能重发还会失败,所以还要添加一个最大重发次数字段retry_count,超过就做另外的处理。
      在这里插入图片描述
    消息延迟投递,做二次确认,回调检查

    在这里插入图片描述

    消费者消息丢失

    如何保证消费成功(或限流处理):对应消费端丢数据,即消费完成前,宕机

    ACK确认机制

    RabbitMQ默认是自动ack的,缺点:

    • 消息失败,则重新入队,但入队还是在队列首部,很容易造成死循环(可通过 default-requeue-rejected覆盖)
    • 只要有消息,就会源源不断地发送给客户端,而不管客户端能否消费的完(即无法限流)。注意:不要搞混了none和auto,none才是发送了就不管,auto只是自动回发ack。

    因此使用手动ack确认

    • application.yml配置
    spring:
    	rabbitmq:
    	    listener:
    	      simple:
    	        # 消费者最小数量
    	        concurrency: 10
    	        # 消费者最大数量
    	        max-concurrency: 10
    	        # 限制消费者每次处理消息的数量(预抓取:等价于缓冲区大小,限流关键)
    	        prefetch: 1
    	        # manual:手动确认
    	        acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    @RabbitListener(queues = "seckillQueue")
        @RabbitHandler
        public void receiveDeadMsg(String msg, Channel channel, Message messages) throws Exception {
            try {
                log.info("接收消息:" + msg);
                VoucherOrder voucherOrder = JsonUtil.jsonStr2Object(msg, VoucherOrder.class);
                voucherOrderService.createVoucherOrder(voucherOrder);
                // 回发ack,第二个参数multiple,表示是否批量确认
                channel.basicAck(messages.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                // 回发Nack,且重回队列为false,防死循环
                channel.basicNack(messages.getMessageProperties().getDeliveryTag(), false, false);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消息幂等性处理

    如何防止重复消费

    重复消费的原因:正常情况,消费完毕,RabbitMQ会回复一个确认消息给消息队列,消息队列就把这个消息删除了,但是因为宕机或者网络导致确认消息没返回成功,消息队列不知道自己消费过该消息,会将消息再次分发。

    解决思路是:保证消息的唯一性,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;

    解决方法:

    • 消费者的业务逻辑接口设计成幂等性的。比如扣除库存的时候,带上唯一的订单号和状态标志,消费到这个消息的时候先去redis里查一下之前消费过没有,消费过就不处理。
    • 使用防重表,每个消息都有业务唯一标识,处理过就不处理了。
    • 或者使用唯一键,保证重复数据不会插入多条
  • 相关阅读:
    多线程获取官方汇率
    mybatis、mybatisPlus
    算法:(二)数组
    不敲一个代码,10分钟做出数据可视化大屏,这个教程太赞了
    测试面试回顾(1)
    R语言使用nnet包的multinom函数构建无序多分类logistic回归模型、使用AIC函数比较两个模型的AIC值的差异(简单模型和复杂模型)
    BMN:Boundary-matching network for temporal action proposal generation
    GStreamer 进阶
    Boost获取当前时间并格式化为字符串
    Flutter框架性泛学习系列之二、Flutter应用层(Application Layer)上-常用Widgets与简单动画
  • 原文地址:https://blog.csdn.net/mao____mao/article/details/127940966