• SpringAMQP中AmqpTemplate发送接收消息



    前言: 最近没事浏览Spring官网,简单写一些相关的笔记,这篇文章整理Spring AMQP相关内容。文章并不包含所有技术点,只是记录有收获


     目录

    1.AmqpTemplate 介绍

    2.发送消息(Sending Message)

    2.1发送Message消息

    2.2发送POJO对象

    2.3默认交换器与默认路由

    2.5构建消息方法

    3.接收消息(Receiving Message)

    3.1接收Message消息

    3.2接收Java对象

    3.3接收消息并回复



    1.AmqpTemplate 介绍

    Spring AMQP提供了一个扮演核心角色的“模板”,定义操作的接口是AmqpTemplate ,  接口操作涵盖了发送和接收消息的一般行为 . 因此他包含了发送和接收消息的所有基本操作. 模板接口的每一个实现都依赖特定的客户端类库, 目前只有RabbitTemplate 

    2.发送消息(Sending Message)
     

    2.1发送Message消息

    参考源码: 

    1. package org.springframework.amqp.core;
    2. public interface AmqpTemplate {
    3. /**
    4. * 使用默认路由密钥向默认交换机发送消息
    5. */
    6. void send(Message message) throws AmqpException;
    7. /**
    8. * 使用特定路由密钥向默认交换机发送消息
    9. */
    10. void send(String routingKey, Message message) throws AmqpException;
    11. /**
    12. * 使用特定路由密钥向特定交换机发送消息
    13. */
    14. void send(String exchange, String routingKey, Message message) throws AmqpException;
    15. //代码略
    16. }

    AmqpTemplate指定AMQP操作的提供了发送消息的方法,其中最后一个方法有三个参数,它是显示发送消息的方法,他允许在运行时提供AMQP交换器 名称和路由关键字来发送消息,而最后的参数是实际常见的Message 。使用方法如下

    1. amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    2. new Message("12.34".getBytes(), someProperties));

    如果是使用同给一个交换器(exchange)可以通过设置(set)的方法设置交换器,然后再发送消息,例如

    1. amqpTemplate.setExchange("marketData.topic");
    2. amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

    如果amqpTemplate 设置了交换器(exchange)和路由键(routingKey)属性,那么只需要接收消息参数即可

    1. amqpTemplate.setExchange("marketData.topic");
    2. amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
    3. amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

    2.2发送POJO对象

    AmqpTemplate提供了将Java对象转换为消息并发送的方法

     参考源码如下:

    1. package org.springframework.amqp.core;
    2. public interface AmqpTemplate {
    3. //将Java对象转换为消息并发送给默认交换机
    4. void convertAndSend(Object message) throws AmqpException;
    5. //将Java对象转换为Amqp消息,并将其发送到具有特定路由的默认交换机
    6. void convertAndSend(String routingKey, Object message) throws AmqpException;
    7. //将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
    8. void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
    9. //将Java对象转换为Amqp消息,并使用默认路由密钥将其发送到默认交换机
    10. void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
    11. //将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到默认交换机
    12. void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
    13. throws AmqpException;
    14. //将Java对象转换为Amqp消息,并使用特定的路由密钥将其发送到特定的交换机。
    15. void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
    16. throws AmqpException;
    17. //代码略
    18. }

    从接口AmqpTemplate 的实现类RabbitTemplate可以看到Object对象通过MessageConverter对象转换为成Message对象

    1. package org.springframework.amqp.rabbit.core;
    2. public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
    3. implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
    4. ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
    5. //默认转换对象为SimpleMessageConverter
    6. private MessageConverter messageConverter = new SimpleMessageConverter();
    7. @Override
    8. public void convertAndSend(String exchange, String routingKey, final Object message,
    9. final MessagePostProcessor messagePostProcessor,
    10. @Nullable CorrelationData correlationData) throws AmqpException {
    11. Message messageToSend = convertMessageIfNecessary(message);
    12. messageToSend = messagePostProcessor.postProcessMessage(messageToSend, correlationData,
    13. nullSafeExchange(exchange), nullSafeRoutingKey(routingKey));
    14. send(exchange, routingKey, messageToSend, correlationData);
    15. }
    16. protected Message convertMessageIfNecessary(final Object object) {
    17. if (object instanceof Message) {
    18. return (Message) object;
    19. }
    20. return getRequiredMessageConverter().toMessage(object, new MessageProperties());
    21. }eturn converter;
    22. }
    23. private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
    24. MessageConverter converter = getMessageConverter();
    25. if (converter == null) {
    26. throw new AmqpIllegalStateException(
    27. "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
    28. }
    29. return converter;
    30. }
    31. //代码略
    32. }

    2.3默认交换器与默认路由

    显示设置交换器(exchange)和路由键(routingKey)属性是比较推荐的使用方法,应为代码更清晰易读。 当不显示设置两个属性的时候也会有默认值。

    AmqpTemplate默认交换器和默认路由键都是空String类型。因为AMQP规范将默认交换器定义为没有名称。 所有队列都自动绑定到默认交换器(direct exchange). 

    1. //参考RabbitTemplate 源码
    2. package org.springframework.amqp.rabbit.core
    3. public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
    4. implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
    5. ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
    6. private static final String DEFAULT_EXCHANGE = "";
    7. private static final String DEFAULT_ROUTING_KEY = "";
    8. @Override
    9. public void send(Message message) throws AmqpException {
    10. send(this.exchange, this.routingKey, message);
    11. }
    12. @Override
    13. public void send(String routingKey, Message message) throws AmqpException {
    14. send(this.exchange, routingKey, message);
    15. }
    16. //省略其他代码
    17. }

    例如可以创建一个模板,用于将消息发送到某个队列

    1. // 没有设置交换器就是默认交换器
    2. RabbitTemplate template = new RabbitTemplate();
    3. // 消息将会发送到名称为queue.helloWorld队列中
    4. template.setRoutingKey("queue.helloWorld");
    5. // 执行发送消息
    6. template.send(new Message("Hello World".getBytes(), someProperties));

    2.5构建消息方法

    AmqpTemplate接口使用的Message参数,可以使用MessageBuilder和MessagePropertiesBuilder对象快速构建

    MessageBuilder

    1. package org.springframework.amqp.core;
    2. public final class MessageBuilder extends MessageBuilderSupport {
    3. //由构建器创建的消息正文将直接引用“body”
    4. public static MessageBuilder withBody(byte[] body) {}
    5. //由构建器创建的消息正文将是新数组中“body”的副本,通过Array.copyOf复制
    6. public static MessageBuilder withClonedBody(byte[] body) {}
    7. //由构建器创建的消息正文将是一个新数组,包含“body”的字节范围,通过Array.copyOf复制
    8. public static MessageBuilder withBody(byte[] body, int from, int to) {}
    9. //构建器创建的消息将具有一个包含参数的正文副本的新数组的主体。
    10. //参数的属性被复制到一个新的MessageProperties对象
    11. public static MessageBuilder fromClonedMessage(Message message) {}
    12. //建设者创建的消息将具有一个直接引用参数体的主体。
    13. //参数的属性被复制到一个新的MessageProperties对象
    14. public static MessageBuilder fromMessage(Message message) {}
    15. //构建返回Message 对象
    16. @Override
    17. public Message build() {
    18. return new Message(this.body, this.buildProperties());
    19. }
    20. }

    通过MessageBuilder构建message

    1. Message message = MessageBuilder.withBody("foo".getBytes())
    2. .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    3. .setMessageId("123")
    4. .setHeader("bar", "baz")
    5. .build();

    MessagePropertiesBuilder

    1. package org.springframework.amqp.core;
    2. public final class MessagePropertiesBuilder extends MessageBuilderSupport {
    3. public static MessagePropertiesBuilder newInstance() {}
    4. public static MessagePropertiesBuilder fromProperties(MessageProperties properties) {}
    5. public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) {}
    6. //构建返回MessageProperties对象
    7. @Override
    8. public MessageProperties build() {
    9. return this.buildProperties();
    10. }
    11. }

    通过MessagePropertiesBuilder构建MessageProperties 然后在通过MessageBuilder构建message

    1. MessageProperties props = MessagePropertiesBuilder.newInstance()
    2. .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    3. .setMessageId("123")
    4. .setHeader("bar", "baz")
    5. .build();
    6. Message message = MessageBuilder.withBody("foo".getBytes())
    7. .andProperties(props)
    8. .build();

    3.接收消息(Receiving Message)

    接收消息有两种方式,一种是通过轮询的方法来轮询单个消息,另一种是是通过监听器来异步接收消息。异步消息使用的专用组件而不是AmqpTemplate, AmqpTemplate可以用于轮询接收消息,在使用AmqpTemplate接收消息时,默认情况下如果没有消息的时候不会阻塞,直接返回空, 也可设置接收的阻塞时间,

    3.1接收Message消息

    参考源码如下:

    1. package org.springframework.amqp.core;
    2. public interface AmqpTemplate {
    3. //如果存在来自默认队列的消息,则接收消息。立即返回
    4. Message receive() throws AmqpException;
    5. //如果有来自特定队列的消息,则接收消息 ,立刻返回
    6. Message receive(String queueName) throws AmqpException;
    7. //从默认队列接收消息,如果消息可用,则等待指定的等待时间
    8. Message receive(long timeoutMillis) throws AmqpException;
    9. //从特定队列接收消息,如果消息可用,则等待指定的等待时间
    10. Message receive(String queueName, long timeoutMillis) throws AmqpException;
    11. //代码略
    12. }

    3.2接收Java对象

     AmqpTemplate提供了四个重载方法来接收POJO,而不是上面的Message对象, 它们返回对象是Object,用以替换返回值 Message。

    如下源码所示

    1. package org.springframework.amqp.core;
    2. public interface AmqpTemplate {
    3. //接收默认队列的消息,如果消息可用,并转换为JAVA对象。立即返回
    4. Object receiveAndConvert() throws AmqpException;
    5. //接收特定队列的消息,如果消息可用,将其转换为Java对象。立即返回,可能返回空值
    6. Object receiveAndConvert(String queueName) throws AmqpException;
    7. //接收默认队列的消息,如果消息可用,则等待指定的等待时间
    8. Object receiveAndConvert(long timeoutMillis) throws AmqpException;
    9. //接收特定队列的消息,如果消息可用,则等待指定的等待时间
    10. Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
    11. //代码略
    12. }


    从接口AmqpTemplate 的实现类RabbitTemplate可以看到Message是通过MessageConverter对象转换为成Java对象

    1. package org.springframework.amqp.rabbit.core;
    2. public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
    3. implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
    4. ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {
    5. //默认转换对象为SimpleMessageConverter
    6. private MessageConverter messageConverter = new SimpleMessageConverter();
    7. //接收特定队列的消息,如果消息可用,则等待指定的等待时间
    8. Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException{
    9. Message response = timeoutMillis == 0 ? doReceiveNoWait(queueName) : receive(queueName, timeoutMillis);
    10. if (response != null) {
    11. return getRequiredMessageConverter().fromMessage(response);
    12. }
    13. }
    14. private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
    15. MessageConverter converter = getMessageConverter();
    16. if (converter == null) {
    17. throw new AmqpIllegalStateException(
    18. "No 'messageConverter' specified. Check configuration of RabbitTemplate.");
    19. }
    20. return converter;
    21. }
    22. //代码略
    23. }

    3.3接收消息并回复

    除此之外AmqpTemplate还有几个receiveAndReply方法,这些方法用来实现同步接收,处理并回复消息 

    AmqpTemplate实现负责接收和回复阶段。在大多数情况下,提供ReceiveAndReplyCallback的实现来为接收到的消息执行一些业务逻辑,如果需要,可以构建回复对象或消息。需要注意的是ReceiveAndReplyCallback可能返回null。在这种情况下,没有发送回复消息

    1. package org.springframework.amqp.core;
    2. public interface AmqpTemplate {
    3. //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
    4. //如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
    5. //或者发送到默认交换和默认routingKe
    6. boolean receiveAndReply(ReceiveAndReplyCallback callback) throws AmqpException;
    7. //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
    8. //如果回调返回一个消息,则将回复消息发送到MessageProperties中的replyTo地址,
    9. //或者发送到默认交换和默认routingKe
    10. boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback) throws AmqpException;
    11. //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
    12. //如果回调返回消息,则向提供的exchange和routingKey发送回复消息
    13. boolean receiveAndReply(ReceiveAndReplyCallback callback, String replyExchange, String replyRoutingKey)
    14. throws AmqpException;
    15. //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
    16. //如果回调返回消息,则向提供的exchange和routingKey发送回复消息
    17. boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback, String replyExchange,
    18. String replyRoutingKey) throws AmqpException;
    19. //接收默认队列的消息,调用提供的ReceiveAndReplyCallback,
    20. //如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
    21. boolean receiveAndReply(ReceiveAndReplyCallback callback,
    22. ReplyToAddressCallback replyToAddressCallback) throws AmqpException;
    23. //接收指定队列的消息,调用提供的ReceiveAndReplyCallback,
    24. //如果回调返回一个消息,则向ReplyToAddressCallback的结果中的replyToAddress发送回复消息
    25. boolean receiveAndReply(String queueName, ReceiveAndReplyCallback callback,
    26. ReplyToAddressCallback replyToAddressCallback) throws AmqpException;
    27. //代码略
    28. }

    例如

    1. boolean b=template.receiveAndReply("myqueue",new ReceiveAndReplyCallback(){
    2. public String handle(String str) {
    3. System.out.println("getMessage is :" + str);
    4. //doSomething
    5. return "OK";
    6. }
    7. });


    上一篇:SpringAMQP和RabbitMQ入门

  • 相关阅读:
    postgres数据迁移
    不能说的秘密:加密信件
    协议-TCP协议-基础概念03-Keep live保活机制-TCP RST-TCP连接
    【JavaWeb】Servlet系列 --- HttpServletRequest接口详解(接口方法要记住!!!)
    嵌入式设备时间同步(校时)
    5.8 Device Self-test command
    未来属于 Firefly:通过最新的生成式 AI 创新解锁新的创造力水平
    订单30分钟自动关闭的五种解决方案
    设计模式学习(三):工厂模式
    Nginx入门到搭建
  • 原文地址:https://blog.csdn.net/Beijing_L/article/details/127639484