
前言: 最近没事浏览Spring官网,简单写一些相关的笔记,这篇文章整理Spring AMQP相关内容。文章并不包含所有技术点,只是记录有收获
目录
Spring AMQP提供了一个扮演核心角色的“模板”,定义操作的接口是AmqpTemplate , 接口操作涵盖了发送和接收消息的一般行为 . 因此他包含了发送和接收消息的所有基本操作. 模板接口的每一个实现都依赖特定的客户端类库, 目前只有RabbitTemplate


参考源码:
- package org.springframework.amqp.core;
-
- public interface AmqpTemplate {
-
- /**
- * 使用默认路由密钥向默认交换机发送消息
- */
- void send(Message message) throws AmqpException;
-
- /**
- * 使用特定路由密钥向默认交换机发送消息
- */
- void send(String routingKey, Message message) throws AmqpException;
-
- /**
- * 使用特定路由密钥向特定交换机发送消息
- */
- void send(String exchange, String routingKey, Message message) throws AmqpException;
-
- //代码略
- }
AmqpTemplate指定AMQP操作的提供了发送消息的方法,其中最后一个方法有三个参数,它是显示发送消息的方法,他允许在运行时提供AMQP交换器 名称和路由关键字来发送消息,而最后的参数是实际常见的Message 。使用方法如下
- amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
- new Message("12.34".getBytes(), someProperties));
如果是使用同给一个交换器(exchange)可以通过设置(set)的方法设置交换器,然后再发送消息,例如
- amqpTemplate.setExchange("marketData.topic");
- amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
如果amqpTemplate 设置了交换器(exchange)和路由键(routingKey)属性,那么只需要接收消息参数即可
- amqpTemplate.setExchange("marketData.topic");
- amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
- amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
AmqpTemplate提供了将Java对象转换为消息并发送的方法

参考源码如下:
- package org.springframework.amqp.core;
-
- public interface AmqpTemplate {
-
- //将Java对象转换为消息并发送给默认交换机
- void convertAndSend(Object message) throws AmqpException;
-
- //将Java对象转换为Amqp消息,并将其发送到具有特定路由的默认交换机
- void convertAndSend(String routingKey, Object message) throws AmqpException;
-
- //将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
- void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
-
- //将Java对象转换为Amqp消息,并使用默认路由密钥将其发送到默认交换机
- void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
-
- //将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到默认交换机
- void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
- throws AmqpException;
-
- //将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
- void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
- throws AmqpException;
- //代码略
- }
从接口AmqpTemplate 的实现类RabbitTemplate可以看到Object对象通过MessageConverter对象转换为成Message对象
- package org.springframework.amqp.rabbit.core;
-
- public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
- implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
- ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
-
- //默认转换对象为SimpleMessageConverter
- private MessageConverter messageConverter = new SimpleMessageConverter();
-
- @Override
- public void convertAndSend(String exchange, String routingKey, final Object message,
- final MessagePostProcessor messagePostProcessor,
- @Nullable CorrelationData correlationData) throws AmqpException {
- Message messageToSend = convertMessageIfNecessary(message);
- messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData,
- nullSafeExchange(exchange), nullSafeRoutingKey(routingKey));
- send(exchange, routingKey, messageToSend, correlationData);
- }
-
- protected Message convertMessageIfNecessary(final Object object) {
- if (object instanceof Message) {
- return (Message) object;
- }
- return getRequiredMessageConverter().toMessage(object, new MessageProperties());
- }eturn converter;
- }
-
- private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
- MessageConverter converter = getMessageConverter();
- if (converter == null) {
- throw new AmqpIllegalStateException(
- "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
- }
- return converter;
- }
-
- //代码略
- }
显示设置交换器(exchange)和路由键(routingKey)属性是比较推荐的使用方法,应为代码更清晰易读。 当不显示设置两个属性的时候也会有默认值。
AmqpTemplate默认交换器和默认路由键都是空String类型。因为AMQP规范将默认交换器定义为没有名称。 所有队列都自动绑定到默认交换器(direct exchange).
- //参考RabbitTemplate 源码
- package org.springframework.amqp.rabbit.core
-
- public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
- implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
- ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
- private static final String DEFAULT_EXCHANGE = "";
-
- private static final String DEFAULT_ROUTING_KEY = "";
-
- @Override
- public void send(Message message) throws AmqpException {
- send(this.exchange, this.routingKey, message);
- }
-
- @Override
- public void send(String routingKey, Message message) throws AmqpException {
- send(this.exchange, routingKey, message);
- }
-
- //省略其他代码
- }
例如可以创建一个模板,用于将消息发送到某个队列
- // 没有设置交换器就是默认交换器
- RabbitTemplate template = new RabbitTemplate();
- // 消息将会发送到名称为queue.helloWorld队列中
- template.setRoutingKey("queue.helloWorld");
- // 执行发送消息
- template.send(new Message("Hello World".getBytes(), someProperties));
AmqpTemplate接口使用的Message参数,可以使用MessageBuilder和MessagePropertiesBuilder对象快速构建
MessageBuilder
- package org.springframework.amqp.core;
-
- public final class MessageBuilder extends MessageBuilderSupport
{ -
- //由构建器创建的消息正文将直接引用“body”
- public static MessageBuilder withBody(byte[] body) {}
-
- //由构建器创建的消息正文将是新数组中“body”的副本,通过Array.copyOf复制
- public static MessageBuilder withClonedBody(byte[] body) {}
-
- //由构建器创建的消息正文将是一个新数组,包含“body”的字节范围,通过Array.copyOf复制
- public static MessageBuilder withBody(byte[] body, int from, int to) {}
-
- //构建器创建的消息将具有一个包含参数的正文副本的新数组的主体。
- //参数的属性被复制到一个新的MessageProperties对象
- public static MessageBuilder fromClonedMessage(Message message) {}
-
- //建设者创建的消息将具有一个直接引用参数体的主体。
- //参数的属性被复制到一个新的MessageProperties对象
- public static MessageBuilder fromMessage(Message message) {}
-
- //构建返回Message 对象
- @Override
- public Message build() {
- return new Message(this.body, this.buildProperties());
- }
- }
通过MessageBuilder构建message
- Message message = MessageBuilder.withBody("foo".getBytes())
- .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
- .setMessageId("123")
- .setHeader("bar", "baz")
- .build();
MessagePropertiesBuilder
- package org.springframework.amqp.core;
-
- public final class MessagePropertiesBuilder extends MessageBuilderSupport
{ -
- public static MessagePropertiesBuilder newInstance() {}
-
- public static MessagePropertiesBuilder fromProperties(MessageProperties properties) {}
-
- public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) {}
-
- //构建返回MessageProperties对象
- @Override
- public MessageProperties build() {
- return this.buildProperties();
- }
-
- }
通过MessagePropertiesBuilder构建MessageProperties 然后在通过MessageBuilder构建message
- MessageProperties props = MessagePropertiesBuilder.newInstance()
- .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
- .setMessageId("123")
- .setHeader("bar", "baz")
- .build();
- Message message = MessageBuilder.withBody("foo".getBytes())
- .andProperties(props)
- .build();
接收消息有两种方式,一种是通过轮询的方法来轮询单个消息,另一种是是通过监听器来异步接收消息。异步消息使用的专用组件而不是AmqpTemplate, AmqpTemplate可以用于轮询接收消息,在使用AmqpTemplate接收消息时,默认情况下如果没有消息的时候不会阻塞,直接返回空, 也可设置接收的阻塞时间,

参考源码如下:
- package org.springframework.amqp.core;
-
- public interface AmqpTemplate {
-
- //如果存在来自默认队列的消息,则接收消息。立即返回
- Message receive() throws AmqpException;
-
- //如果有来自特定队列的消息,则接收消息 ,立刻返回
- Message receive(String queueName) throws AmqpException;
-
- //从默认队列接收消息,如果消息可用,则等待指定的等待时间
- Message receive(long timeoutMillis) throws AmqpException;
-
- //从特定队列接收消息,如果消息可用,则等待指定的等待时间
- Message receive(String queueName, long timeoutMillis) throws AmqpException;
-
- //代码略
- }
AmqpTemplate提供了四个重载方法来接收POJO,而不是上面的Message对象, 它们返回对象是Object,用以替换返回值 Message。

如下源码所示
- package org.springframework.amqp.core;
-
- public interface AmqpTemplate {
-
- //接收默认队列的消息,如果消息可用,并转换为JAVA对象。立即返回
- Object receiveAndConvert() throws AmqpException;
-
- //接收特定队列的消息,如果消息可用,将其转换为Java对象。立即返回,可能返回空值
- Object receiveAndConvert(String queueName) throws AmqpException;
-
- //接收默认队列的消息,如果消息可用,则等待指定的等待时间
- Object receiveAndConvert(long timeoutMillis) throws AmqpException;
-
- //接收特定队列的消息,如果消息可用,则等待指定的等待时间
- Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
-
- //代码略
- }
从接口AmqpTemplate 的实现类RabbitTemplate可以看到Message是通过MessageConverter对象转换为成Java对象
- package org.springframework.amqp.rabbit.core;
-
- public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
- implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
- ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
-
- //默认转换对象为SimpleMessageConverter
- private MessageConverter messageConverter = new SimpleMessageConverter();
-
- //接收特定队列的消息,如果消息可用,则等待指定的等待时间
- Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException{
- Message response = timeoutMillis == 0 ? doReceiveNoWait(queueName) : receive(queueName, timeoutMillis);
- if (response != null) {
- return getRequiredMessageConverter().fromMessage(response);
- }
- }
-
- private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
- MessageConverter converter = getMessageConverter();
- if (converter == null) {
- throw new AmqpIllegalStateException(
- "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
- }
- return converter;
- }
-
- //代码略
- }
除此之外AmqpTemplate还有几个receiveAndReply方法,这些方法用来实现同步接收,处理并回复消息

AmqpTemplate实现负责接收和回复阶段。在大多数情况下,提供ReceiveAndReplyCallback的实现来为接收到的消息执行一些业务逻辑,如果需要,可以构建回复对象或消息。需要注意的是ReceiveAndReplyCallback可能返回null。在这种情况下,没有发送回复消息
- package org.springframework.amqp.core;
-
- public interface AmqpTemplate {
-
- //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
- //如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
- //或者发送到默认交换和默认routingKe
-
boolean receiveAndReply(ReceiveAndReplyCallback callback) throws AmqpException; -
- //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
- //如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
- //或者发送到默认交换和默认routingKe
-
boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback) throws AmqpException; -
- //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
- //如果回调返回消息,则向提供的exchange和routingKey发送回复消息
-
boolean receiveAndReply(ReceiveAndReplyCallback callback, String replyExchange, String replyRoutingKey) - throws AmqpException;
-
- //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
- //如果回调返回消息,则向提供的exchange和routingKey发送回复消息
-
boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback, String replyExchange, - String replyRoutingKey) throws AmqpException;
-
- //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
- //如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
-
boolean receiveAndReply(ReceiveAndReplyCallback callback, - ReplyToAddressCallback
replyToAddressCallback) throws AmqpException; -
- //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
- //如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
-
boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback, - ReplyToAddressCallback
replyToAddressCallback) throws AmqpException; -
- //代码略
- }
例如
- boolean b=template.receiveAndReply("myqueue",new ReceiveAndReplyCallback
(){ - public String handle(String str) {
- System.out.println("getMessage is :" + str);
- //doSomething
- return "OK";
- }
- });