有道无术,术尚可求,有术无道,止于术。
使用RabbitMQ
原生API
,发送消息时,发送的是二进制byte[]
数据。
void basicPublish(String var1, String var2, byte[] var4) throws IOException;
使用RabbitTemplate.send
方法发送Message
对象,也是二进制byte[]
数据。
public Message(byte[] body) {
this(body, new MessageProperties());
}
在接收时,需要将二进制数据转为你想要的数据格式。在JAVA
编程中都是基于对象操作,一般消息都是对象,比如订单、日志。
所以RabbitTemplate
提供了convertAndSend
方法,可以直接发送对象,那么对象在网络传输,就涉及到了序列化机制。
首先我们看下RabbitTemplate.convertAndSend
是如何工作及序列化对象的。
发送一个用户User
对象,该对象需要实现Serializable
序列化接口。
User user = new User();
user.setName("张三");
rabbitTemplate.convertAndSend("bbdbdbdb","aaa.key",user);
convertAndSend
也是调用send
方法,只是多了一个convertMessageIfNecessary
,将对象转为二进制数组,并封装到Message
对象中。
public void convertAndSend(String exchange, String routingKey, Object object, @Nullable CorrelationData correlationData) throws AmqpException {
// this.convertMessageIfNecessary(object) 将JAVA 消息对象转为`Message`
this.send(exchange, routingKey, this.convertMessageIfNecessary(object), correlationData);
}
convertMessageIfNecessary
会判断当前消息是否是Message
类型,如果是直接返回,不是则调用消息转换器进行转换。
protected Message convertMessageIfNecessary(Object object) {
return object instanceof Message ? (Message)object : this.getRequiredMessageConverter().toMessage(object, new MessageProperties());
}
获取消息转换器,直接通过RabbitTemplate.getMessageConverter
获取其成员属性,也就是SimpleMessageConverter
,这是默认值。
private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
// private MessageConverter messageConverter = new SimpleMessageConverter();
MessageConverter converter = this.getMessageConverter();
if (converter == null) {
throw new AmqpIllegalStateException("No 'messageConverter' specified. Check configuration of RabbitTemplate.");
} else {
return converter;
}
}
接着调用消息转换器的toMessage
方法,
public final Message toMessage(Object object, @Nullable MessageProperties messagePropertiesArg, @Nullable Type genericType) throws MessageConversionException {
// 1. 创建消息属性对象
MessageProperties messageProperties = messagePropertiesArg;
if (messagePropertiesArg == null) {
messageProperties = new MessageProperties();
}
// 2. 创建Message对象
Message message = this.createMessage(object, messageProperties, genericType);
messageProperties = message.getMessageProperties();
if (this.createMessageIds && messageProperties.getMessageId() == null) {
messageProperties.setMessageId(UUID.randomUUID().toString());
}
return message;
}
createMessage
创建Message
对象并返回。如果不是 byte[]
、String
类型,最后会查看消息对象是否实现了Serializable
接口,如果是,则进行序列化,并设置ContentType:application/x-java-serialized-object
,以上都是不是则会抛出IllegalArgumentException
异常。
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
byte[] bytes = null;
// 1. byte[] 类型
if (object instanceof byte[]) {
bytes = (byte[])object;
// 设置消息属性 ContentType:application/octet-stream
messageProperties.setContentType("application/octet-stream");
} else if (object instanceof String) {
// 2. String 类型
try {
// 转为字节
bytes = ((String)object).getBytes(this.defaultCharset);
} catch (UnsupportedEncodingException var6) {
throw new MessageConversionException("failed to convert to Message content", var6);
}
// 设置消息属性 ContentType:text/plain
messageProperties.setContentType("text/plain");
// 设置消息属性 ContentEncoding:UTF-8
messageProperties.setContentEncoding(this.defaultCharset);
} else if (object instanceof Serializable) {
// 3. 实现了 Serializable接口
try {
// 转为byte[]
bytes = SerializationUtils.serialize(object);
} catch (IllegalArgumentException var5) {
throw new MessageConversionException("failed to convert to serialized Message content", var5);
}
// 设置消息属性 ContentType:application/x-java-serialized-object
messageProperties.setContentType("application/x-java-serialized-object");
}
if (bytes != null) {
// 4. 设置长度
messageProperties.setContentLength((long)bytes.length);
// 5. 返回`Message`对象
return new Message(bytes, messageProperties);
} else {
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName());
}
}
Message
创建成功后,调用原生的channel.basicPublish
方法,发送消息对象、属性。
protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory, Message message) throws IOException {
AMQP.BasicProperties convertedMessageProperties = this.messagePropertiesConverter.fromMessageProperties(message.getMessageProperties(), this.encoding);
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}
查看控制台,可以看到对象消息的相关信息:
在消费者接收消息时,可以直接接收业务对象。
@RabbitListener(queues = {"dsfsf"})
public void receive003(User user) {
System.out.println("收到消息" + user);
}
容器监听消息,调用消息转换器SimpleMessageConverter
将二进制数据转为相应的对象。
调用的是SimpleMessageConverter.fromMessage
方法。
public Object fromMessage(Message message) throws MessageConversionException {
Object content = null;
// 1. 处理消息属性
MessageProperties properties = message.getMessageProperties();
if (properties != null) {
// 获取contentType ,这里为:application/x-java-serialized-object
String contentType = properties.getContentType();
// 2. contentType 以text 开头(字符串),二进制转为字符串返回
if (contentType != null && contentType.startsWith("text")) {
String encoding = properties.getContentEncoding();
if (encoding == null) {
encoding = this.defaultCharset;
}
try {
content = new String(message.getBody(), encoding);
} catch (UnsupportedEncodingException var8) {
throw new MessageConversionException("failed to convert text-based Message content", var8);
}
// 3. contentType为 application/x-java-serialized-object(序列化对象),
} else if (contentType != null && contentType.equals("application/x-java-serialized-object")) {
try {
// 反序列化为对象
content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl));
} catch (IllegalArgumentException | IllegalStateException | IOException var7) {
throw new MessageConversionException("failed to convert serialized Message content", var7);
}
}
}
// 4. 以上都不是,直接返回二进制
if (content == null) {
content = message.getBody();
}
return content;
}
可是使用其他序列化方式,比如Jackson
。
只需要在RabbitTemplate
、监听容器工厂RabbitListenerContainerFactory
中设置转换器即可。
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}