• Springboot+Rabbitmq消费者注解详解、改序列化方式


    RabbitMQ–了解中间件、常用的中间件、分布式系统使用中间件、Docker安装rabbitmq及遇到的问题、RabbitMQ核心组成、消息模式
    Springboot整合RabbitMQ(Fanout、Direct、Topic模式)、设置队列信息TTL、死信队列、RabbitMQ磁盘监控,内存控制
    Springboot+Rabbitmq消费者注解详解、改序列化方式
    Docker简易部署RabbitMQ集群、分布式事务解决方案+案例(可靠生产、可靠消费)
    Springboot+RabbitMQ+ACK机制(生产方确认(全局、局部)、消费方确认)、知识盲区

    springboot+rabbitmq消费者

    最近在学习rabbitmq技术,在整理资料时发现整合springboot+rabbitmq的消费者方式,网上有不同的写法,为了满足我的好奇心,网上查阅资料然后整理一下,以作记录。

    一般需要使用到两个注解:

    • RabbitListener
    • RabbitHandler

    @RabbitListener

    看一下这个注解的源码

    在这里插入图片描述

    看到:这个注解可以声明在方法注解上,一般我们使用时只需要在类或者方法上即可。

    通常使用:

    • queues: 表示要监听的队列,可以监听多个队列
    @RabbitListener(queues = {"queue_work", "a_queue"})
    
    • 1

    其实,@RabbitListener注解中有很多的属性,一般只使用queues完全够用,不推荐在此@RabbitListener注解中去创建和绑定交换机和队列,维护起来非常麻烦,推荐使用配置类的方式创建和绑定交换机和队列

    @RabbitHandler

    如果@RabbitListener是声明在方法上的,这个注解可以不使用。如下:

    @RabbitListener(queues = {"queue_work", "a_queue"})
    public void receiveEmail2(String va) {
        System.out.println("--消费者2接收到了普通模式队列信息-->" + va);
    }
    
    • 1
    • 2
    • 3
    • 4

    但是当@RabbitListener注解声明在类上面时,这个注解是和@RabbitListener配合使用的。

    假设有这样的需求:需要根据队列中发送的数据格式,进行不同的处理。比如:公司内部决定,遇到byte[]这种的数据需要按照方法一处理,遇到字符串格式传输的数据需要按照方法二处理。这个时候就需要使用@RabbitHandler来配合@RabbitListener来处理。

    以下两个方法,只有接收到相同类型的数据的方法,才回去执行处理这个消息。

    @Component
    @RabbitListener(queues = "work_queue")
    public class EmailConsumer {
    
        @RabbitHandler
        public void handler1(String message) {
            System.out.println(message);
        }
    
        @RabbitHandler
        public void handler2(byte[] message) {
            System.out.println(new String(message));
        }
        
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    MessageConvert&序列化

    涉及到网络数据传输,那么序列化是不可避免的。生产者以某种规则将消息数据转为byte[]进行发送,消费者以约定的规则将byte[]进行解析。

    • Message类中的body属性,是我们的真正要传输的消息数据。Rabbitmq中有一个MessageConvert接口来处理消息的序列化,现有的序列化处理类SimpleMessageConverter(默认的)、Jackson2JsonMessageConverter 等。
    • 调用 convertAndSend方法后,会使用MessageConvert 进行消息的序列化
    • SimpleMessageConverter 对于要发送的消息体 body 为 byte[] 时不进行处理;若是 Java 对象,则使用 jdk 序列化将消息转成字节数组,转出来的结果较大,含class类名,类相应方法等信息。因此性能较差。
    • 如果数据量比较大,要考虑使用Jackson2JsonMessageConverter 等序列化形式提高性能。

    使用JSON序列化

    通常情况下我们如果不配置序列化的处理类,那么我们可以在传递参数时直接给传递一个json字符串或者字节数组,然后接收方再通过同样的方式解析即可。

    
    // 发送方
    public void sendMessage(Order order) {
    	rabbitTemplate.convertAndSend("user_exchange", "user_queue", JSON.toJSONString(user));
    }
    
    // 接收方
    @RabbitListener(queues = "user_queue")
    public void messageConsumer(String msg) throws Exception {
        System.out.println("消息:" + msg);
    	// 解析对象
        User order = JSON.parseObject(msg, User.class);
       // 后续处理
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    配置使用Jackson2JsonMessageConverter

    若传递数据时不想手动将对象序列化为其他格式,那么我们可以配置序列化处理类来进行处理。

    生产者需要配置RabbitTemplate

    @Configuration
    public class RabbitMqConfig {
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            return rabbitTemplate;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    消费者需要使用相同的序列化类

    @Configuration
    public class MyRabbitListenerConfigurer implements RabbitListenerConfigurer{
    
        //以下配置RabbitMQ消息服务
        @Autowired
        public ConnectionFactory connectionFactory;
    
        @Bean
        public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            // 设置转换器
            factory.setMessageConverter(new MappingJackson2MessageConverter());
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
            rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    生产者发送消息

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    public void send(User user) throws Exception {
        MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        System.out.println(messageConverter);
        rabbitTemplate.convertAndSend("user_exchange", "user_queue", user);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    看下控制台日志

    在这里插入图片描述

    接收者,直接接收相应 的数据类型

    @RabbitListener(queues = "user_queue")
    public void receiveEmail3(User user) {
        System.out.println("接收到消息==>" + user);
    }
    
    • 1
    • 2
    • 3
    • 4

    接收到的信息

    在这里插入图片描述

    注意:要传输的实体类必须要实现序列化接口,并且一定要提供一个无参构造函数!!!

    建议

    推荐使用手动将对象序列化的方式进行数据传输,否则再去配置序列化的处理类比较麻烦。如果手动序列化方式的地方过多,那么可以考虑配置序列化类

    参考

    SpringBoot 整合 RabbitMQ 修改序列化方式

    如果要改为Json序列化格式,可以看下这篇文章RabbitMQ:@RabbitListener 与 @RabbitHandler 及 消息序列化

  • 相关阅读:
    一款.NET下 WPF UI框架介绍
    Hybrid综合应用
    二、RabbitMQ的五种工作模式
    MATLAB算法实战应用案例精讲-【图像处理】Transformer
    ASP.NET Core 6框架揭秘实例演示[09]:将配置绑定为对象
    【单片机仿真项目】利用定时器0实现对LED灯的闪烁控制,LED灯的闪烁间隔为0.5秒
    无人机航拍技术基础入门,无人机拍摄的方法与技巧
    数学建模——确定性时间序列分析方法
    Github 2024-06-15Rust开源项目日报Top10
    北航第六次数据结构与程序设计作业(查找与排序)选填题
  • 原文地址:https://blog.csdn.net/weixin_45248492/article/details/126146499