• RabbitMQ系列【18】对象序列化机制


    有道无术,术尚可求,有术无道,止于术。

    前言

    使用RabbitMQ原生API,发送消息时,发送的是二进制byte[]数据。

        void basicPublish(String var1, String var2, byte[] var4) throws IOException;
    
    • 1

    使用RabbitTemplate.send方法发送Message对象,也是二进制byte[]数据。

        public Message(byte[] body) {
            this(body, new MessageProperties());
        }
    
    • 1
    • 2
    • 3

    在接收时,需要将二进制数据转为你想要的数据格式。在JAVA 编程中都是基于对象操作,一般消息都是对象,比如订单、日志。

    所以RabbitTemplate提供了convertAndSend方法,可以直接发送对象,那么对象在网络传输,就涉及到了序列化机制。

    发送对象

    首先我们看下RabbitTemplate.convertAndSend是如何工作及序列化对象的。

    发送一个用户User 对象,该对象需要实现Serializable序列化接口。

            User user = new User();
            user.setName("张三");
            rabbitTemplate.convertAndSend("bbdbdbdb","aaa.key",user);
    
    • 1
    • 2
    • 3

    convertAndSend也是调用send方法,只是多了一个convertMessageIfNecessary,将对象转为二进制数组,并封装到Message对象中。

       public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
       		// this.convertMessageIfNecessary(object) 将JAVA 消息对象转为`Message`
            this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
        }
    
    • 1
    • 2
    • 3
    • 4

    convertMessageIfNecessary会判断当前消息是否是Message类型,如果是直接返回,不是则调用消息转换器进行转换。

       protected Message convertMessageIfNecessary(Object object) {
            return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
        }
    
    • 1
    • 2
    • 3

    获取消息转换器,直接通过RabbitTemplate.getMessageConverter获取其成员属性,也就是SimpleMessageConverter,这是默认值。

        private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
        	//  private MessageConverter messageConverter = new SimpleMessageConverter();
            MessageConverter converter = this.getMessageConverter();
            if (converter == null) {
                throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
            } else {
                return converter;
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    接着调用消息转换器的toMessage方法,

        public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg, @Nullable Type genericType) throws MessageConversionException {
        	// 1. 创建消息属性对象
            MessageProperties messageProperties = messagePropertiesArg;
            if (messagePropertiesArg == null) {
                messageProperties = new MessageProperties();
            }
    		// 2. 创建Message对象
            Message message = this.createMessage(object, messageProperties, genericType);
            messageProperties = message.getMessageProperties();
            if (this.createMessageIds && messageProperties.getMessageId() == null) {
                messageProperties.setMessageId(UUID.randomUUID().toString());
            }
    
            return message;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    createMessage创建Message 对象并返回。如果不是 byte[]String类型,最后会查看消息对象是否实现了Serializable接口,如果是,则进行序列化,并设置ContentType:application/x-java-serialized-object,以上都是不是则会抛出IllegalArgumentException异常。

        protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
            byte[] bytes = null;
            // 1. byte[] 类型
            if (object instanceof byte[]) {
                bytes = (byte[])object;
                // 设置消息属性 ContentType:application/octet-stream
                messageProperties.setContentType("application/octet-stream");
            } else if (object instanceof String) {
            	// 2. String 类型
                try {
                	// 转为字节
                    bytes = ((String)object).getBytes(this.defaultCharset);
                } catch (UnsupportedEncodingException var6) {
                    throw new MessageConversionException("failed to convert to Message content", var6);
                }
    			// 设置消息属性 ContentType:text/plain
                messageProperties.setContentType("text/plain");
                // 设置消息属性 ContentEncoding:UTF-8
                messageProperties.setContentEncoding(this.defaultCharset);
            } else if (object instanceof Serializable) {
            	// 3. 实现了 Serializable接口
                try {
                	// 转为byte[] 
                    bytes = SerializationUtils.serialize(object);
                } catch (IllegalArgumentException var5) {
                    throw new MessageConversionException("failed to convert to serialized Message content", var5);
                }
    			// 设置消息属性 ContentType:application/x-java-serialized-object
                messageProperties.setContentType("application/x-java-serialized-object");
            }
    
            if (bytes != null) {
            	// 4. 设置长度
                messageProperties.setContentLength((long)bytes.length);
                // 5. 返回`Message`对象 
                return new Message(bytes, messageProperties);
            } else {
                throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    Message 创建成功后,调用原生的channel.basicPublish方法,发送消息对象、属性。

        protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException {
            AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
            channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
        }
    
    • 1
    • 2
    • 3
    • 4

    查看控制台,可以看到对象消息的相关信息:
    在这里插入图片描述

    接收对象

    在消费者接收消息时,可以直接接收业务对象。

        @RabbitListener(queues = {"dsfsf"})
        public void receive003(User user) {
            System.out.println("收到消息" + user);
        }
    
    • 1
    • 2
    • 3
    • 4

    容器监听消息,调用消息转换器SimpleMessageConverter将二进制数据转为相应的对象。

    调用的是SimpleMessageConverter.fromMessage方法。

        public Object fromMessage(Message message) throws MessageConversionException {
            Object content = null;
            // 1. 处理消息属性
            MessageProperties properties = message.getMessageProperties();
            if (properties != null) {
            	// 获取contentType ,这里为:application/x-java-serialized-object
                String contentType = properties.getContentType();
                // 2. contentType 以text 开头(字符串),二进制转为字符串返回
                if (contentType != null && contentType.startsWith("text")) {
                    String encoding = properties.getContentEncoding();
                    if (encoding == null) {
                        encoding = this.defaultCharset;
                    }
    
                    try {
                        content = new String(message.getBody(), encoding);
                    } catch (UnsupportedEncodingException var8) {
                        throw new MessageConversionException("failed to convert text-based Message content", var8);
                    }
                // 3. contentType为 application/x-java-serialized-object(序列化对象),
                } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
                    try {
                    	// 反序列化为对象
                        content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
                    } catch (IllegalArgumentException | IllegalStateException | IOException var7) {
                        throw new MessageConversionException("failed to convert serialized Message content", var7);
                    }
                }
            }
    		// 4. 以上都不是,直接返回二进制
            if (content == null) {
                content = message.getBody();
            }
            return content;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    使用Jackson 序列化

    可是使用其他序列化方式,比如Jackson

    只需要在RabbitTemplate 、监听容器工厂RabbitListenerContainerFactory中设置转换器即可。

        @Bean
        public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            return factory;
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate();
            configurer.configure(template, connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            return template;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
  • 相关阅读:
    数据结构笔记(王道考研) 第四章:串
    Matlab论文插图绘制模板第125期—特征渲染的三维气泡图
    已解决TypeError: __init__() missing 1 required positional argument: ‘scheme‘
    Java学习笔记——并发编程(一)
    如何设计性能优良的mysql索引?
    选择算法之冒泡排序【图文详解】
    产业互联网周报:CRM服务商玄武云7月港股上市;亚马逊云宣布成立“量子网络中心”;欧洲多国重启煤炭发电;邬贺铨:我国数据中心……...
    中科新生命峰会预告 | 大咖云集,共襄单细胞多组学及空间组学峰会
    结构体类型
    Gradient
  • 原文地址:https://blog.csdn.net/qq_43437874/article/details/128101868