• SpringBoot集成RabbitMQ使用建议



    前言

    SpringBoot的集成RabbitMQ很简单,引入starer,简单配置几个属性就能开始使用,本人在平时工作使用过程中也遇到过一些坑,所以这篇文章主要罗列一些个人的使用建议。如有不对的地方或者好的建议,欢迎指正


    一、SpringBoot集成RabbitMQ

    1.引入依赖:

    <dependency>
      <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4

    2.配置属性

    在application.properties添加配置

    spring.rabbitmq.addresses=127.0.0.1:5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    
    • 1
    • 2
    • 3

    3.代码示例

    @RestController
    public class RabbitController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;
        AtomicInteger next = new AtomicInteger();
    
        @GetMapping("/send")
        public void send(@RequestParam(value = "routingkey", required = false, defaultValue = "test") String routingKey) {
            Rabbit rabbit = new Rabbit();
            rabbit.setId(next.getAndIncrement());
            rabbit.setName(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend("test", routingKey, JSONObject.toJSONString(rabbit));
        }
    
        @RabbitListener(queues = "test")
        public void handler(Message message) {
            System.out.println("收到消息:" + new String(message.getBody()));
        }
    
        public static class Rabbit implements Serializable {
    
            private static final long serialVersionUID = -1L;
            private Integer id;
            private String name;
    
            public Integer getId() {
                return id;
            }
            public void setId(Integer id) {
                this.id = id;
            }
            public String getName() {
                return name;
            }
            public void setName(String name) {
                this.name = name;
            }
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 项目运行起来后,请求/send接口就会往名为test的交换器发送一个Rabbit对象,routingkey为test的,
    • RabbitMQ服务收到后就会路由到相应绑定的队列
    • @RabbitListener注解会开启消费者线程,接收名为test的队列发送过来的消息

    二、使用建议

    1.发送方消息序列化器选择

    1.1 使用默认SimpleMessageConverter

    通常发送方通过rabbitTemplate.convertAndSend()来发送消息,默认的序列化器是SimpleMessageConverter,为了方便调试、格式统一,都用json格式,所以发送的时候要手动转换成json。

    rabbitTemplate.convertAndSend("test", routingKey, JSONObject.toJSONString(rabbit));
    
    • 1

    SimpleMessageConverter

    1.2 使用Jackson2JsonMessageConverter

    如果不想每次手动把对象转成json,可以使用Jackson2JsonMessageConverter,这时候需要配置一下rabbitTemplate

    @Configuration
    public class RabbitConfig {
    
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    rabbitTemplate.convertAndSend("test", routingKey, rabbit);
    
    • 1

    Jackson2JsonMessageConverter
    使用Jackson2JsonMessageConverter后,不要把对象转成json字符串后发送,例如:rabbitTemplate.convertAndSend(exchange, routingKey, JSONObject.toJSONString(rabbit)),这样Jackson2JsonMessageConverter会对字符串再做一次json序列化,会得到带转义字符的字符串,如下图
    在这里插入图片描述
    所以这里就看到了使用Jackson2JsonMessageConverter的一个小坑,如果想发送一个字符串,就不能用convertAndSend方法了,要用send方法。

    1.3 小结

    从上面分析来看,使用SimpleMessageConverter会更通用一些,容错性更好,虽然每次发送对象的时候需要手动转json,但挺符合人的惯性思维。当然,已经在运行的线上程序就不要轻易改动了,又不是不能用!(手动狗头)

    2.消费方序列化器选择

    2.1 建议使用默认SimpleMessageConverter

    使用Message来接收,可以拿到消息所有的信息(如消息头、body等),然后再进一步转成字符串或者对象。

    	@RabbitListener(queues = "test")
        public void handler(Message message) {
            System.out.println("收到消息:" + new String(message.getBody()));
        }
    
    • 1
    • 2
    • 3
    • 4

    PS:消费方使用SimpleMessageConverter,就不要用对象来接受信息,因为不能确保发送方也是用SimpleMessageConverter

    2.2 为什么不建议使用Jackson2JsonMessageConverter

    	@RabbitListener(queues = "test")
        public void handler(Rabbit rabbit) {
            System.out.println("收到消息:" + rabbit);
        }
        
    	@RabbitListener(queues = "test")
        public void handler(Message message) {
            System.out.println("收到消息:" + new String(message.getBody()));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    如上述代码示例,

    • 使用第一种,首先这样子拿不到完整消息内容,不利于排查问题。 如果别人发的是字符串,消息头没有content-type或者不是application/json,就会报错;也可能别人发的消息头content-type是application/json,而消息内容并不是Rabbit类,虽然不报错,但字段可能会对应不上,出现些奇怪的问题。
    • 使用第二种,如果消息头没有content-type或者不是application/json,会打印一个告警信息Could not convert incoming message with content-type [xxx], ‘json’ keyword missing。还有个更坑的是,如果别人发的消息头中__type_id__ 字段,且是一个自己项目中不存在的类,那会报类找不到的错误。

    2.3 小结

    消费方使用默认SimpleMessageConverter就好,且以Message作为参数来接收。

    PS 这时候不要给spring容器注入一个Jackson2JsonMessageConverter实例,否则消费者就会从spring容器里拿它来用,而不是用默认的SimpleMessageConverter

    3.发送方的和消费方使用不同的connection

    rabbitmq

    如图所示,客户端和RabbitMQ是保持着一个长连接connection,然后通过channel通信,一个connection默认最大可以支持2047个channel。所以当发送者和消费者共用一个connection,消费者又占用了过多的channel,发送者可能会无法创建或获取到空闲的channel来发送消息。

    通过下面一句代码就可以实现

      rabbitTemplate.setUsePublisherConnection(true);
    
    • 1

    4.避免开过多线程同时发送消息

    前面说到一个connection的channel数量是有上限的,spring帮我们实现了一个channel缓存池,以达到channel复用的效果。假如一时间有3000个线程同时调rabbitTemplate.convertAndSend()方法,channel缓存池会瞬间被掏空,从缓冲池拿不到就会创建新的channel,3000已经超过2047了,就会报一个channelMax的错误,如果没有失败重试机制,消息就丢失了。
    如果线上出现channelMax错误,去改RabbitMQ的配置要重启是不太现实的,可以通过下面代码暂时缓解,后面再优化代码,避免这种不好的做法

    	@Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            connectionFactory.setChannelCacheSize(100);
            connectionFactory.setChannelCheckoutTimeout(1000);
            return rabbitTemplate;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    通过setChannelCheckoutTimeout这个方法,可以激活spring的channel缓存池的限流功能(通过信号量实现),一般还要用setChannelCacheSize设置一个合理的channel数量(默认是25)

    5.自行实现发送失败重试机制

    5.1 spring的重试机制

    spring.rabbitmq.template.retry.enabled=true
    spring.rabbitmq.template.retry.max-attempts=3
    spring.rabbitmq.template.retry.multiplier=1
    spring.rabbitmq.template.retry.initial-interval=1000ms
    spring.rabbitmq.template.retry.max-interval=10000ms
    
    • 1
    • 2
    • 3
    • 4
    • 5

    虽然spring提供了RetryTemplate的重试机制,配置起来也简单,但它的重试会阻塞当前线程,我们使用MQ的地方一般都是流量大需要削峰的场景,如果出现发送失败的情况,这种重试的形式性能会很差,而且重试的数据就在内存中,如果服务宕机,消息就丢失了。

    5.2 自行实现

    可以通过setConfirmCallback()获取是否发送到exchange的回调结果,而setMandatory(true)、setReturnCallback()则是可以拿到exchange无法路由到任意队列的消息。

    rabbitTemplate.setConfirmCallback();
    
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnCallback();
    
    • 1
    • 2
    • 3
    • 4

    大致思路是:

    • 发送消息前,先把消息暂存起来(如数据库、redis等),然后发消息到MQ;
    • 如果回调结果是成功,则把之前暂存的消息删掉,失败的话不用处理;
    • 另外再起一个重试线程,定时重试之前暂存起来的消息,因为回调成功消息会删掉,所以重试的消息是没收到回调或者回调失败的;

    6.多数据源时,指定具体数据源

    为了方便,我们常常会在代码里配置queue、exchange、binding,如果我们连了多个MQ,如果不指定在那个MQ上创建,就会在每个MQ服务器上都创建一份queue、exchange、binding。

    	@Bean("rabbitAdmin1")
        public RabbitAdmin rabbitAdmin1(@Qualifier("conn2")CachingConnectionFactory c){
            return new RabbitAdmin(c);
        }
        @Bean("rabbitAdmin2")
        public RabbitAdmin rabbitAdmin2(@Qualifier("conn2") CachingConnectionFactory c){
            return new RabbitAdmin(c);
        }
        @Bean
        public Exchange exchange(){
            return ExchangeBuilder.fanoutExchange("xx").admins("rabbitAdmin1").build();
        }
        @Bean
        public Queue queue(){
            Queue queue = QueueBuilder.durable("xxx").build();
            queue.setAdminsThatShouldDeclare("rabbitAdmin1");
            return queue;
        }
        @Bean
        public Binding binding(Exchange exchange, Queue queue){
            Binding binding = BindingBuilder.bind(queue).to(exchange).with("xxx").noargs();
            binding.setAdminsThatShouldDeclare("rabbitAdmin1");
            return binding;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    如上述代码,admins()、setAdminsThatShouldDeclare()方法指定了在哪个数据源上创建、绑定。

    我们可能经常要订阅别的系统的消息,别人提供exchange,我们自行创建queue与其绑定。之前遇到的一个坑是,代码里没有指定在对方的MQ服务器进行绑定,那所有的MQ服务器都会进行尝试绑定,有的MQ服务器不存在同名的exchange的话就会报错了。

    7.择机使用懒惰队列

    创建队列的时候,如果带上x-queue-mode=lazy的参数,那么RabbitMQ就会把收到的消息存入磁盘,需要用的时候再加载到内存。适用于消息堆积量大,但消费速度又很慢的的场景,这样做可以降低MQ服务器的内存使用量,缺点就是损失一些消费性能。
    代码示例如下

    	@Bean
        public Queue queue(){
            Queue queue = QueueBuilder.durable("xxx").withArgument("x-queue-mode","lazy").build();
            queue.setAdminsThatShouldDeclare("rabbitAdmin1");
            return queue;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    8.消费者配置建议

    8.1 线程数配置

    	@RabbitListener(queues = "test", concurrency = "3")
        public void handler(Message message) {
            System.out.println("收到消息:" + new String(message.getBody()));
        }
    
    • 1
    • 2
    • 3
    • 4

    spring会根据concurrency的值创建相应数量的线程,首先我们都知道线程数并不是越多越好,线程切换有消耗,另外其底层是每个线程会占用一个channel,做长轮询操作,如果设置过多线程就浪费资源了。总之,要根据实际业务场景考虑。

    8.2 重试配置

    8.2.1 使用spring的重试机制

    如果消费者收到消息后,都是在同一个线程里处理事情的话,对吞吐率要求不高的话,个人建议用spring提供的重试机制就可以了,配置也很简单,如下:

    spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
    spring.rabbitmq.listener.simple.retry.enabled=true
    spring.rabbitmq.listener.simple.retry.max-attempts=3
    spring.rabbitmq.listener.simple.retry.initial-interval=1000ms
    
    • 1
    • 2
    • 3
    • 4

    acknowledge-mode=AUTO这个配置可以保证我们处理消息完成后,spring才会向RabbitMQ发送ack,RabbitMQ才会把这条消息删除;而如果重试多次后还是处理异常,spring也会向RabbitMQ发送nack,我们可以通过设置死信队列来保证消息不丢失。

    PS:看RabbitMQ的客户端SDK发现个spring不同的地方,就是消费者的autoAck参数,如果是true,是不用回ack的意思,即MQ服务器发送消息给客户端就删掉消息了;如果是false,则客户端要回复ack后,MQ服务器才会删掉消息。
    String basicConsume(String queue, boolean autoAck, Consumer callback)

    8.2.2 自行重试

    使用spring提供的重试机制,是会阻塞线程,直到成功或者超过重试次数,线程才能干其他活,吞吐率低,可以自行实现重试

    spring.rabbitmq.listener.simple.acknowledge-mode=AUTO
    spring.rabbitmq.listener.simple.retry.enabled=false
    
    • 1
    • 2

    配置上acknowledge-mode=AUTO同时禁用消费者的重试机制,大致思路是,先把处理失败的消息暂存起来,过一段时间后再进行重试,下面列举一些方案:
    1) 如果MQ服务器支持rabbitmq_delayed_message_exchange插件,可以用它来发送延时消息。

    	@Bean
        public Exchange exchange(){
            Map<String, Object> args = new HashMap<>();
            args.put("x-delayed-type", "direct");
            final CustomExchange customExchange = new CustomExchange("xxx", "x-delayed-message", true, false, args);
            return customExchange;
        }
    
    	rabbitTemplate.convertAndSend("exchange", "routingkey", msg, msg -> {
                        msg.getMessageProperties().setDelay((int) TimeUnit.MINUTES.toMillis(1)); //设置延时时间
                        msg.getMessageProperties().setPriority(10);//到队列后优先处理
                        return msg;
                    });
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    上面的代码首先是在创建Exchange的时候需要定义type为x-delayed-message,其次发送消息的时候要在属性上加上x-delay=xx。这样做,失败的消息会延时重新投递回我们的队列,达到重试的效果。当然我们也不能无限重试,所以要在消息上记录重试次数,超过次数就不再重试了。

    2)利用死信队列机制。可以把失败的消息投递到另一个队列,改队列的死信队列指定为我们原先的队列,同时消息设置一个过期时间,等时间到了,RabbitMQ就自动帮我们把消息重新投递回原先的队列。

    3)失败的消息记录在数据库、redis等,定时任务重试。

    个人比较喜欢用第一种,实现起来简单。

  • 相关阅读:
    C# 让程序代码在固定的线程里运行
    【深度学习】 图像识别实战 102鲜花分类(flower 102)实战案例
    如何使用 edu 邮箱注册 Azure
    新手小白前端学习艰辛之路
    深度解读GaussDB逻辑解码技术原理
    基于springCloud的分布式架构体系
    二叉搜索树迭代器
    LeetCode 452. 用最少数量的箭引爆气球
    Spring
    应用计量经济学问题~
  • 原文地址:https://blog.csdn.net/seasonLai/article/details/126755997