• SpringBoot —— 整合RabbitMQ常见问题及解决方案


    前言

    企业中最常用的消息中间件既不是RocketMQ,也不是Kafka,而是RabbitMQ。

    RocketMQ很强大,但主要是阿里推广自己的云产品而开源出来的一款消息队列,其实中小企业用RocketMQ的没有想象中那么多。

    至于Kafka,主要还是用在大数据和日志采集方面,除了一些公司有特定的需求会使用外,对消息收发准确率要求较高的公司依然是以RabbitMQ作为企业级消息队列的首选


    一、使用步骤

    1.引入依赖

    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.coregroupId>
        <artifactId>jackson-annotationsartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.环境配置

    这里需要创建2个springboot项目,一个 provider (生产者),一个consumer(消费者)

    生产者application.yml
    生产者配置文件
    消费者application.yml
    消费者配置文件

    3.生产者处理消息队列

    创建消息队列

    package com.local.springboot.springbootcommon.config.amqp;
    
    import com.local.springboot.springbootcommon.constant.RabbitMQConstant;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.amqp.core.Queue;
    
    
    @Configuration
    public class RabbitMQConfig {
    
        /**
         * 创建队列说明
         * durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
         * exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
         * autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
         * 一般设置一下队列的持久化就好,其余两个就是默认false
         * @return
         */
        @Bean
        public Queue goodsEventQueue() {
            return new Queue(RabbitMQConstant.QUEUE_GOODS_EVENT, true, false, false, null);
        }
    
        /**
         * 创建交换机
         */
        @Bean
        public DirectExchange goodsEventExchange() {
            return new DirectExchange(RabbitMQConstant.EXCHANGE_GOODS_EXCHANGE, true, false);
        }
    
        /**
         * 将队列绑定到交换机上
         */
        @Bean
        public Binding goodsQueueToGoodsExchange() {
            return BindingBuilder
                    .bind(goodsEventQueue())
                    .to(goodsEventExchange())
                    .with(RabbitMQConstant.ROUTING_KEY_GOODS_EVENT);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47

    启动生产者服务,浏览器打开http://127.0.0.1:15672/,可以看见消息队列创建
    在这里插入图片描述
    发送消息
    在业务需要的地方,发生消息至消息队列

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    @Override
    public ApiResponse saveItem(ItemEntity itemEntity) {
        if (itemEntity != null) {
            String id = itemEntity.getSkuId();
            if (StringUtils.isNotBlank(id)) {
                ItemEntity entity = getById(id);
                if (entity != null) {
                    BeanUtils.copyProperties(itemEntity, entity);
                    updateById(entity);
                }
            } else {
                EntityUtil.initEntity(itemEntity);
                itemEntity.setSkuId(IdWorker.get32UUID());
                save(itemEntity);
            }
        }
        // 同步商品信息
        rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_GOODS_EXCHANGE, RabbitMQConstant.ROUTING_KEY_GOODS_EVENT, itemEntity);
        return ApiResponse.ok();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    4.消费者监听队列

    package com.local.springboot.springbootservice.listener;
    
    import com.local.springboot.springbootdao.entity.ItemEntity;
    import com.local.springboot.springbootservice.service.search.ElasticSearchService;
    import com.rabbitmq.client.Channel;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.io.IOException;
    
    @Component
    @Slf4j
    public class GoodsEventQueueListener {
    
        @Resource
        private ElasticSearchService elasticSearchService;
    
        @RabbitListener(queues = "goodsEventQueue")
        public void onGoodsEvent(ItemEntity itemEntity, Channel channel
                , @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
            // 同步商品至es
            try {
                log.info("同步商品事件队列接收参数:{}", itemEntity);
                // 业务处理
                elasticSearchService.addGoods(itemEntity);
            } catch (Exception e) {
                log.error("同步商品事件异常:{}", e.getMessage());
                e.printStackTrace();
            } finally {
                // 手动签收消息
                channel.basicAck(tag, false);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    5.运行结果

    上述业务是在添加商品时,向消息队列发送消息,消费者接收消息之后对商品进行相应的处理,实现业务上的解耦。

    同时运行两个服务,生产者调用添加商品接口

    查看日志,消费者接收到消息之后做相应处理
    在这里插入图片描述
    至此,SpringBoot 简单整合RabbitMQ成功结束。

    二、问题及解决

    1.消息丢失

    消息丢失可能的原因

    ①消息发出后,中途网络故障,服务器没收到
    ②消息发出后,服务器收到了,还没持久化,服务器宕机
    ③消息发出后,服务器收到了,服务挂了,消息自动签收,消费方还未处理业务逻辑。

    在说解决方案之前,我们需要明白两个概念:消息确认机制消息签收机制

    1.消息确认机制

    主要是生产者使用的机制,用来确认消息是否被成功消费。

    添加配置如下:

    publisher-returns: true #确认消息已发送到队列(Queue)
    publisher-confirm-type: correlated #确认消息已发送到交换机(Exchange)
    
    • 1
    • 2
    @Component
    @Slf4j
    public class RabbitMQProducer implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
        
    
        /**
         * 发送消息
         *
         * @param exchange
         * @param routingKey
         * @param source
         */
        public void sendMessage(String exchange, String routingKey, ItemEntity source) {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnsCallback(this);
            rabbitTemplate.convertAndSend(exchange, routingKey, source);
        }
    
        /**
         * 成功接收后的回调
         *
         * @param correlationData
         * @param b
         * @param s
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            // 成功后的处理
        }
    
        /**
         * 失败后的回调
         *
         * @param returnedMessage
         */
        @Override
        public void returnedMessage(ReturnedMessage returnedMessage) {
            // 失败后的处理
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    实现RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback这两个接口的方法后,可以对失败或者成功之后进行相应处理,之后进一步做消息补偿。

    但是这种方法并不推荐,因为这种机制十分降低MQ的性能,一般采用后台管理实现人工补偿,两种方法只是性能与运维成本之间的一种抉择

    2.消息签收机制

    一般RabbitMQ的消息是自动签收的,你可以理解为快递签收了,那么这个快递的状态就从发送变为已签收,唯一的区别是快递公司会对物流轨迹有记录,而MQ签收后就从队列中删除了。

    在开发中,我们一般都采用手动签收的方式,这样可以有效避免消息的丢失。

    3.解决方案

    上述两个概念搞清楚之后,再回过头来看消息丢失的原因

    ①和②是由于生产方未开启消息确认机制导致
    ③是由于消费方未开启手动签收机制导致。

    解决方案

    ①生产方发送消息时,要try…catch,在catch中捕获异常,并将MQ发送的关键内容记录到日志表中,日志表中要有消息发送状态,若发送失败,由定时任务定期扫描重发并更新状态;
    ②生产方publisher必须要加入确认回调机制,确认成功发送并签收的消息,如果进入失败回调方法,就修改数据库消息的状态,等待定时任务重发;
    ③消费方要开启手动签收ACK机制,消费成功才将消息移除,失败或因异常情况而尚未处理,就重新入队。

    2.消息积压

    1.出现原因

    消息积压出现的场景一般有两种:

    ①消费方的服务挂掉,导致一直无法消费消息;
    ②消费方的服务节点太少,导致消费能力不足,从而出现积压,这种情况极可能就是生产方的流量过大导致。

    2.解决方案

    ①既然消费能力不足,那就扩展更多消费节点,提升消费能力;
    ②建立专门的队列消费服务,将消息批量取出并持久化,之后再慢慢消费。

    ①就是最直接的方式,也是消息积压最常用的解决方案,但有些企业考虑到服务器成本压力,会选择第②种方案进行迂回,先通过一个独立服务把要消费的消息存起来,比如存到数据库,之后再慢慢处理这些消息即可。

    2.消息重复

    1.出现原因

    消息重复大体上有两种情况会出现:

    ①消息消费成功,事务已提交,签收时结果服务器宕机或网络原因导致签收失败,消息状态会由unack转变为ready,重新发送给其他消费方;
    ②消息消费失败,由于retry重试机制,重新入队又将消息发送出去。

    2.解决方案

    网上大体上能搜罗到的方法有三种:

    ①消费方业务接口做好幂等;
    ②消息日志表保存MQ发送时的唯一消息ID,消费方可以根据这个唯一ID进行判断避免消息重复;
    ③消费方的Message对象有个getRedelivered()方法返回Boolean,为TRUE就表示重复发送过来的。

    这里只推荐第一种,业务方法幂等这是最直接有效的方式,②还要和数据库产生交互,③有可能导致第一次消费失败但第二次消费成功的情况被砍掉。
    ps:
    幂等性:就是一条命令执行任意多次所产生的影响和执行一次的影响相同

    (这里分布式锁应该能解决这个问题)

    最简单的方案就是,在数据库中建一个消息日志表,这个表记录消息ID和消息执行状态。这个我们消费消息的逻辑变为:在消息日志中增加一个消息记录,再根据消息记录,执行业务。我们每次都会在插入之前检查该消息是否已存在。这样就不会出现一条消息被多次执行的情况。这里的数据库也可以使用redis/memcache来实现唯一约束方案。

    2.小结

    消息丢失、消息重复、消息积压三个问题中,实际上主要解决的还是消息丢失,而消息丢失的最常见企业级方案之一就是定时任务补偿。

    其实MQ只是一个做为辅助的中间件,使用MQ的目的就是解耦和转发,不用做多余的事情,保证MQ本身是流畅的、职责单一的即可

    总结

    本文主要简单讲述了SpringBoot整合RabbitMQ的过程,以及RabbitMQ的三种常见问题及解决方案

    其实RabbitMQ本身的性能还是很强大的,总结以下三点:

    ①消息100%投递会增加运维成本,中小企业视情况使用,非必要不使用;
    ②消息确认机制影响性能,非必要不使用;
    ③消费者先保证消息能签收,业务处理失败可以人工补偿。

    此外消息中间件的问题其实还有很多,比如

    • 序列化、传输协议,以及内存管理等问题?
    • 为什么消息队列能实现高吞吐?
    • 消息中间件中的队列模型与发布订阅模型的区别?
    • 如何选型消息中间件?

    参考文章 https://baijiahao.baidu.com/s?id=1737713844357727373&wfr=spider&for=pc

    « 上一章:SpringBoot —— 简单多模块构建

    创作不易,关注💖、点赞👍、收藏🎉就是对作者最大的鼓励👏,欢迎在下方评论留言🧐

  • 相关阅读:
    关于新增字段我们应该测试什么?
    Minecraft 服务器安装Forge 并添加Mod
    Ubuntu openKylin 安装open VMware tool 工具
    Dubbo3元数据参考手册
    阿里为何禁止在对象中使用基本数据类型
    Web框架开发-Django简介
    TypeScript 最细项目实践:四步走高效改造现有的 JavaScript 项目
    Git中Branch(分支)和Tag(标签)的区别
    LabVIEW中将枚举与条件结构一起使用
    R语言使用dplyr包计算dataframe分组聚合样本个数值(number of values)
  • 原文地址:https://blog.csdn.net/qq_34383510/article/details/128197768