• RabbitMQ(基于AMQP的开源消息代理软件)


    一、AMQP高级消息队列协议

    (1)介绍

    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

    (2)工作流程

    发布者(Publisher)发布消息(Message),经过交换机(Exchange),交换机根据路由规则将收到消息分发给交换机绑定的队列(Queue),最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

    二、RabbitMQ开源消息代理软件

     (1)介绍

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的。所有主要的编程语言均有与代理接口通讯的客户端库。

    消息中间件作为分布式系统重要组件之一,可以解决应用耦合,异步消息,流量削峰等问题。

    (2)特点

    RabbitMQ两大核心特性:异步消息、队列。 异步消息:只要异步消息就不阻塞线程,减少了主线程执行时间。所有需要这种效果场景都可以使用MQ。 队列:进入队列的数据一定有先后之分。只要应用程序要对内容分先后的场景都可以使用MQ。 

    (3)适用场景

    1.应用解耦

    2. 排队算法

    使用队列特性。把数据发送给MQ,进入到队列就有了排队的效果

    3.秒杀活动

    使用队列特性。例如:抢红包、限时秒杀、直播卖货时抢商品。使用了MQ按照顺序一个一个操作,当商品库存操作到0个时,秒杀结束。

    4.消息分发

    在程序中同时向多个其他程序发送消息。应用了AMQP中交换机,实现消息分发

    5.异步处理

    利用MQ异步消息特性。大大提升主线程效率。

    6.数据同步

    利用异步特性。我们电商中使用RabbitMQ绝大多数的事情就是在实现数据同步。

    7.处理耗时任务

    利用异步特性。可以把程序中耗时任务(例如:发送邮件、发送验证码)交给MQ去处理,减少当前项目的耗时时间。

    8.流量削峰

    在互联网项目中,可能会出现某一段时间范围内,访问流量骤增的情况(双11、品牌促销,10点抢购),如果使用监控工具,会发现这段时间访问出现顶峰。使用MQ可以把这些访问分摊到多个项目中,把流量分摊,去除了顶峰效果,这就叫做流量削锋。是利用RabbitMQ中交换机实现的。

    三、RabbitMQ核心原理

     

     发送者Publisher向RabbitMQ发送消息Message,在Message会包含路由键Routing Key、交换器名称、消息内容。交换器Exchange接收到消息Message后会根据交换器类型Exchange Type判断把消息如何发送给绑定的队列Queue中,如果交换器类型是Direct这个消息只放入到路由键对应的队列中,如果是topic交换器消息放入到routing key匹配的多个队列中,如果是fanout交换器消息会放入到所有绑定到交换器的队列中。放入到队列成功后会返回给发送者Publisher一个ACK确认消息,表示消息发送成功了。剩下的事情是由Consumer进行完成,Consumer一直在监听队列,当队列里面有消息就会把消息取出,取出后根据程序的逻辑对消息进行处理,处理完成后会返回给RabbitMQ一个ACK,表示消息处理完成,RabbitMQ会删除这个消息。以上这些就是RabbitMQ的运行原理。  

    核心概念

     1. Message

    消息。它由消息头消息体组成。消息体是不透明的,而消息头则由一系列可选属性组成,这些属性包括:routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出消息可能持久性存储)等。

    2.Publisher

    消息的生产者。也是一个向交换器发布消息的客户端应用程序。 ​ 通俗说明:哪些项目向RabbitMQ发送消息,哪些项目就是Publisher

    3. Consumer

    消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。Consumer会一直监听指定的队列,只要队列中有消息,就会按照顺序依次取出。使用MQ做耗时任务时,耗时任务就交给Consumer进行完成。

    4. Exchange

    交换器。用来接收生产者发送的消息并将这些消息路由给服务器中的队列。一共支持四种的交换器类型:

    1. direct(发布与订阅 完全匹配)

    2. fanout(广播)

    3. topic(主题,规则匹配)

    4. header(使用较少,相比direct就多了一些头信息)

    5. Binding

    绑定。用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 ​ 一个交换器里面可以绑定多个队列。一个队列一般都是只绑定到一个交换器上。消息发送给交换器,交换器会把效果按照特定规则发送给绑定的队列。

    6. Queue

    消息队列。用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

    7. Routing-key

    路由键。RabbitMQ决定消息该投递到哪个队列的规则。(也可以理解为队列的映射,路由键是key,队列是value)。队列通过路由键绑定到交换器。 ​ 消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。 ​ 如果相匹配,消息将会投递到该队列。如果不匹配,消息将会进入黑洞(相当于丢弃)。 ​ 通俗理解:队列绑定到交换器时有路由键,这个路由键就相当于key-value中的key,value则是队列。当Publisher发送消息时一定会携带路由键(即使路由键是Null),有了路由键就让交换器知道了这个消息要发送给哪个队列。

    8. Connection

    链接。指Rabbit服务器和客户端建立的TCP链接。

    9. Channel

    Channel中文叫做信道,是TCP里面的虚拟链接。例如:电缆相当于TCP,信道是一个独立光纤束,一条TCP连接上创建多条信道是没有问题的。 ​ 在RabbitMQ中,TCP链接一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的。

    10. Virtual Host

    虚拟主机。表示一批交换器,消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。vhost是AMQP概念的基础,必须在链接时指定,RabbitMQ默认的vhost是 '/'。 ​ 通俗理解:一个RabbitMQ可以包含多个虚拟主机,每个虚拟主机都是一个RabbitMQ。平时我们没有去创建虚拟主机,都是使用RabbitMQ里面默认的'/'虚拟主机,单实际上一个RabbitMQ可以包含多个虚拟主机主机的,也就是说一个RabbitMQ可以包含多个实例。就像MySQL可以创建多个数据库一样。

    11. Borker

    表示消息队列服务器实体。就是RabbitMQ服务器进程。

    12. 交换器和队列的关系

    交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟交换器中绑定的路由键匹配,那么消息就会被路由到该绑定的队列中。 ​ 也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。 ​ 路由键可以理解为匹配的规则。

    13. RabbitMQ为什么需要信道?为什么不是TCP直接通信?

    1. TCP的创建和销毁开销特别大。创建需要3次握手,销毁需要4次分手。

    2. 如果不用信道,那应用程序就会以TCP链接RabbitMQ,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。

    3. 信道的原理是一条线程一条通道,多条线程多条通道共用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

    这也是为什么使用RabbitMQ去处理秒杀、流量削锋、海量请求时依然对RabbitMQ比较有信心的原因。

    四、安装RabbitMQ

     (1)拉取镜像

    docker pull rabbitmq:management

    (2)创建并启动容器

    docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 --restart=always -e DEFAULT_USER=bjsxt -e DEFAULT_PASS=bjsxt rabbitmq:management

     五、Spring AMQP介绍

    Spring AMQP是Spring的顶级项目。是基于AMQP协议的消息传递解决方案。此框架提供顶级抽象模板AmqpTemplate接口,用于抽象消息传递标准。提供基于容器的监听处理。暂时Spring AMQP只提供了基于RabbitMQ处理消息传递的解决方案。其具体接口是RabbitOperations、实现是RabbitTemplate

    六、使用

    (1)引入依赖

           
                org.springframework.boot
                spring-boot-starter-amqp
           

    (2)编写配置文件

    # 配置RabbitMQ相关信息
    # 当创建RabbitMQ容器的时候,不提供用户名和密码配置,自动创建用户guest,密码guest。
    # guest用户只能本地访问RabbitMQ。
    spring:
      rabbitmq:
        host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
        port: 5672 # RabbitMQ服务器的端口。
        username: bjsxt # RabbitMQ的访问用户名。默认guest。
        password: bjsxt # RabbitMQ的访问密码。默认guest
        virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /

    (3)发送消息

    1. /**
    2. * 测试Spring AMQP框架中发送消息的方式。
    3. */
    4. @SpringBootTest
    5. public class TestPublisher {
    6. /**
    7. * 注入客户端对象。
    8. * 类型可以是: AmqpTemplate(顶级接口), RabbitOperations(专用子接口),RabbitTemplate(具体实现)
    9. * 建议使用接口: 优先级是 RabbitOperations > AmqpTemplate
    10. */
    11. @Autowired
    12. private RabbitOperations rabbitOperations;
    13. /**
    14. * 测试发送消息
    15. * 消息内容是字符串。
    16. * 注意:
    17. * Spring AMQP可以发送的消息类型必须是Message类型。
    18. * Spring AMQP可以帮助程序员自动封装消息类型Message对象。
    19. * 只要提供消息具体内容(消息体)即可实现默认封装。
    20. * Spring AMQP可以自动转换封装的消息体类型是Object。只要类型可序列化即可。
    21. */
    22. @Test
    23. public void testSendStringMessage(){
    24. String messageContent = "第一个字符串消息";
    25. String exchangeName = "direct.first.ex";
    26. String routingKey = "routing.key.1";
    27. // 发送消息的时候,只要指定要发送到的具体交换器名称,路由键,和消息内容即可。
    28. rabbitOperations.convertAndSend(exchangeName, routingKey, messageContent);
    29. System.out.println("消息发送完毕");
    30. }
    31. }

    (4)基于Configuration配置创建交换器、队列及完成绑定

    1. @Configuration
    2. public class RabbitMQConfig {
    3. // 发送消息时如果不存在这个队列,会自动创建这个队列。
    4. // 注意:是发送消息时,而不是启动项目时。
    5. // 相当于:可视化操作时创建一个队列
    6. // 如果队列创建完成后,没有绑定(没有另外两个方法),默认绑定到AMQP default交换器
    7. @Bean
    8. public Queue queue(){
    9. return new Queue("queue.second");
    10. }
    11. // 如果没有这个交换器,在发送消息创建这个交换器
    12. // 配置类中方法名就是这个类型的实例名。相当于的id属性,返回值相当于class
    13. @Bean
    14. public DirectExchange directExchange(){
    15. return new DirectExchange("direct.first.ex");
    16. }
    17. // 配置类中方法参数,会由Spring 容器自动注入
    18. @Bean
    19. public Binding directBingding(DirectExchange directExchange,Queue queue){
    20. // with(“自定义路由键名称”)
    21. return BindingBuilder.bind(queue).to(directExchange).with("routing.key.2");
    22. // withQueueName() 表示队列名就是路由键名称
    23. // return BindingBuilder.bind(queue).to(directExchange).withQueueName();
    24. }
    25. }

    (5)编辑消息消费者

    1. /**
    2. * 字符串类型消息体处理消费者
    3. * Spring AMQP和Spring Boot配合的启动器,可以自动注册监听。
    4. * 要求,当前类型的bean对象,必须被spring容器管理。
    5. */
    6. @Component
    7. public class StringMessageConsumer {
    8. /**
    9. * 常规开发中,都会先定义消息的消费者。后定义消息的发布者。
    10. * 典型的观察者设计模式。先有监听,后有事件。
    11. *
    12. * 注解RabbitListener
    13. * 可选属性: 常用属性
    14. * bindings - 定义绑定规则。属性类型是: QueueBinding[]
    15. * 注解QueueBinding,描述具体的绑定规则。就是交换器和队列的绑定规则。
    16. * 必要属性:
    17. * value - 监听的队列,属性类型是Queue
    18. * exchange - 队列绑定的交换器,属性是Exchange
    19. * 可选属性:
    20. * key - 绑定的路由键都是什么,类型是String[]
    21. * 注解Queue,描述一个具体的队列,如果队列在RabbitMQ中存在,直接使用并监听;如果不存在,
    22. * 创建队列并监听。
    23. * 可选属性:
    24. * name - 队列名称,String类型
    25. * autoDelete - 是否是自动删除的队列,String类型。可选值: "true" | "false"
    26. *
    27. * 注解Exchange,描述一个具体的交换器,如果交换器存在,直接使用;如果不存在,则创建,
    28. * 并基于key绑定队列
    29. * 可选属性:
    30. * name - 交换器名称,String类型
    31. * autoDelete - 是否是自动删除的交换器,String类型。可选值: "false" | "true"。默认false
    32. * type - 交换器的类型,String类型。默认是direct。可选: direct,fanout,topic,headers
    33. * 可以使用枚举类型中的常量赋值,具体是ExchangeTypes.XXX
    34. * @param messageBody
    35. */
    36. @RabbitListener(bindings = {
    37. @QueueBinding(
    38. value = @Queue(name = "queue.second", autoDelete = "false", durable = "true"),
    39. exchange = @Exchange(name = "direct.second.ex",
    40. autoDelete = "false", type = ExchangeTypes.DIRECT),
    41. key = {"routing.key.second.1", "routing.key.second.2"}
    42. )
    43. })
    44. public void onMessage(String messageBody){
    45. System.out.println("第二个消息消费者监听,处理消息:" + messageBody);
    46. }
    47. }

    七、传递自定义消息

    1. /**
    2. * 处理自定义类型消息体
    3. */
    4. @Component
    5. public class MyMessageConsumer {
    6. /**
    7. * Spring AMQP可以自动实现消息体类型转换。
    8. * 使用的方式是强制类型转换。只要包装传输的消息体数据类型和方法参数类型匹配即可。
    9. */
    10. @RabbitListener(bindings = {
    11. @QueueBinding(
    12. value = @Queue(name = "pojo.queue1", autoDelete = "false"),
    13. exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
    14. key = {"routing.key.1"}
    15. )
    16. })
    17. public void onMessage1(MyMessage myMessage){
    18. System.out.println("处理自定义消息:" + myMessage);
    19. }
    20. /**
    21. * 可以通过统一消息类型Message处理消息内容
    22. * @param message
    23. */
    24. @RabbitListener(bindings = {
    25. @QueueBinding(
    26. value = @Queue(name = "pojo.queue2", autoDelete = "false"),
    27. exchange = @Exchange(name = "topic.pojo.ex", type = "topic"),
    28. key = {"routing.key.2"}
    29. )
    30. })
    31. public void onMessage2(Message message) throws Exception{
    32. // 获取消息体,消息体是字节数组。根据具体类型进行处理。
    33. byte[] body = message.getBody();
    34. ByteArrayInputStream byteArrayInputStream =
    35. new ByteArrayInputStream(body);
    36. ObjectInputStream inputStream =
    37. new ObjectInputStream(byteArrayInputStream);
    38. Object obj = inputStream.readObject();
    39. if(obj.getClass() == MyMessage.class){
    40. MyMessage myMessage = (MyMessage) obj;
    41. System.out.println(myMessage);
    42. }
    43. System.out.println("消息体中的对象类型是:" + obj.getClass().getName());
    44. }
    45. }

    八、ACK确认机制

    发送消息确认

    (1)修改配置

    spring:
      rabbitmq:
        host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
        port: 5672 # RabbitMQ服务器的端口。
        username: bjsxt # RabbitMQ的访问用户名。默认guest。
        password: bjsxt # RabbitMQ的访问密码。默认guest
        virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
        publisher-confirm-type: correlated # 开启到达交换器确认机制。默认值:none,不开启确认机制。
        publisher-returns: true # 开启路由失败确认机制。默认值:false

    (2)实现RabbitTemplate.ConfirmCallback和RabbitTemplate.ReturnsCallback接口重写不能到达交换器或路由失败时的回调处理逻辑

    1. @Component
    2. public class PublisherHandler implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    3. @Autowired
    4. private RabbitTemplate rabbitTemplate;
    5. /**
    6. * 构造方法执行结束后立刻执行此方法。即初始化逻辑。
    7. */
    8. @PostConstruct
    9. public void init(){
    10. // 设置RabbitTemplate中的回调逻辑
    11. this.rabbitTemplate.setConfirmCallback(this);
    12. this.rabbitTemplate.setReturnsCallback(this);
    13. }
    14. /**
    15. * 消息路由失败回调逻辑
    16. * @param returned 路由失败的消息
    17. */
    18. @Override
    19. public void returnedMessage(ReturnedMessage returned) {
    20. System.out.println("交换器 : " + returned.getExchange());
    21. System.out.println("路由键 : " + returned.getRoutingKey());
    22. System.out.println("路由失败编码 : " + returned.getReplyCode());
    23. System.out.println("路由失败描述 : " + returned.getReplyText());
    24. System.out.println("消息 : " + returned.getMessage());
    25. }
    26. /**
    27. * 当交换器不能到达时,具体的处理方案。
    28. * @param correlationData 消息唯一标记
    29. * @param ack 是否确认
    30. * @param cause 不能到达交换器(即ack为false)的具体原因
    31. */
    32. @Override
    33. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    34. System.out.println("消息唯一标记 : " + correlationData);
    35. System.out.println("是否确认到达交换器 : " + ack);
    36. System.out.println("不能到达交换器的原因 : " + cause);
    37. }
    38. }

    消费消息确认

    在Spring AMQP中,消费者(Consumer)默认的ACK机制是自动确认,即消费代码正常执行结束,立刻确认消息已消费;消费代码发送异常,相当于消息未消费。如果希望关闭自动ACK机制,可使用两种处理方案实现。

    (1)重试消费

    可以在消费者中基于配置开启重试机制,并设置重试消费次数。当消费消息发生错误,导致未确认(NACK)时,消费者尝试重复消费消息;当重复消费次数到达设置阈值后,强制确认(ACK),RabbitMQ会移除队列中的消息。

    修改配置文件

    spring:
      rabbitmq:
        host: 192.168.91.128
        port: 5672
        username: bjsxt
        password: bjsxt
        listener:
          simple:
            retry:
              enabled: true # 开启重试机制
              max-attempts: 1  # 重试消费1次

    (2)手工确认ACK

    1.修改配置文件

    spring:
      rabbitmq:
        host: 192.168.91.128
        port: 5672
        username: bjsxt
        password: bjsxt
        listener:
          simple:
            acknowledge-mode: manual # 手工确认。 默认AUTO,自动确认

    2.正常消费

    1. @Component
    2. public class StringMessageConsumer {
    3. /**
    4. * 消费方法。实现手工ACK确认
    5. * @param messageBody 消息内容
    6. * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
    7. * RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
    8. * @param deliveryTag RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
    9. * 是一个递增的正整数,delivery tag 的范围仅限于 Channel
    10. */
    11. @RabbitListener(queues = {"queue.first"})
    12. public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
    13. System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
    14. try {
    15. /*
    16. * 确认消息
    17. * 参数1 - 消息的唯一标识
    18. * 参数2 - 是否批量提交。为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,
    19. * 则可以一次性确认 deliveryTag 小于等于传入值的所有消息
    20. */
    21. channel.basicAck(deliveryTag, false);
    22. System.out.println("消息已确认");
    23. }catch (IOException e){
    24. e.printStackTrace();
    25. }
    26. }
    27. }

    3.错误消费

    1. @Component
    2. public class StringMessageConsumer {
    3. /**
    4. * 消费方法。实现手工NACK确认
    5. * @param messageBody 消息内容
    6. * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
    7. * RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
    8. * @param deliveryTag RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
    9. * 是一个递增的正整数,delivery tag 的范围仅限于 Channel
    10. */
    11. @RabbitListener(queues = {"queue.first"})
    12. public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
    13. System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
    14. try {
    15. /*
    16. * 参数1 - 消息的唯一标识
    17. * 参数2 - 是否批量提交
    18. * 参数3 - 是否重新发出消息。false则废弃此消息。
    19. */
    20. channel.basicNack(deliveryTag, false, true);
    21. System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
    22. }catch (IOException e){
    23. e.printStackTrace();
    24. }
    25. }
    26. }

    4.Reject确认(丢弃)

    1. @Component
    2. public class StringMessageConsumer {
    3. /**
    4. * 消费方法。实现手工ACK确认
    5. * @param messageBody 消息内容
    6. * @param channel 信道对象,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,
    7. * RabbitMQ 会用 basic.deliver 方法向消费者推送消息。
    8. * @param deliveryTag RabbitMQ向该Channel投递的这条消息的唯一标识 ID,
    9. * 是一个递增的正整数,delivery tag 的范围仅限于 Channel
    10. */
    11. @RabbitListener(queues = {"queue.first"})
    12. public void onMessage(String messageBody, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
    13. System.out.println("第一个消息消费者监听,处理消息:" + messageBody);
    14. try {
    15. /*
    16. * 参数1 - 消息的唯一标识
    17. * 参数3 - 是否重新发出消息。false则废弃此消息。
    18. */
    19. channel.basicReject(deliveryTag, false);
    20. System.out.println("消息未确认-重复消费-deliveryTag=" + deliveryTag);
    21. }catch (IOException e){
    22. e.printStackTrace();
    23. }
    24. }
    25. }

    九、同步消息

    (1)修改配置文件

    spring:
      rabbitmq:
        host: 192.168.91.128 # RabbitMQ服务器的IP。默认localhost
        port: 5672 # RabbitMQ服务器的端口。
        username: bjsxt # RabbitMQ的访问用户名。默认guest。
        password: bjsxt # RabbitMQ的访问密码。默认guest
        virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
        template:
          reply-timeout: 100000 # 配置同步消息超时时长,单位毫秒

    (2)消费者

    1. /**
    2. * 同步消息消费者。如:抢红包。
    3. */
    4. @Component
    5. public class SyncMessageConsumer {
    6. /**
    7. * 同步消息消费方法。和异步消息消费方法的唯一区别就是有返回值。类型不限。
    8. * @param message
    9. * @return
    10. */
    11. @RabbitListener(bindings = {
    12. @QueueBinding(
    13. value = @Queue(name = "queue.sync", autoDelete = "false"),
    14. exchange = @Exchange(name = "topic.sync", type = "topic"),
    15. key = {"routing.key"}
    16. )
    17. })
    18. public String onMessage(String message){
    19. System.out.println("处理同步消息:" + message);
    20. return "消息已处理";
    21. }
    22. }

    (3)发送者

    1. /**
    2. * 转换,处理,发送消息,并等待接收消费者反馈。方法返回值就是消费者端返回的确认消息,即消费方法返回结果
    3. * Object convertSendAndReceive(String exchange, String routingKey, Object messageBody)
    4. * 如果消费者超时未返回,代码自动结束,并向下继续运行。
    5. * 消费者反馈结果使用null填充。
    6. */
    7. @Test
    8. public void testSyncMessage(){
    9. String message = "测试同步消息";
    10. Object result = rabbitOperations.convertSendAndReceive(
    11. "topic.sync",
    12. "routing.key",
    13. message
    14. );
    15. if(result == null){
    16. System.out.println("超时未返回");
    17. }else {
    18. System.out.println(result.getClass().getName());
    19. System.out.println(result);
    20. }
    21. }
  • 相关阅读:
    Spring Cloud微服务治理框架深度解析
    香港硬防服务器的防御有什么优缺点?
    23 行为型模式-迭代器模式
    kafka3.X基本概念和使用
    一种基于局部适应度景观的进化规划的混合策略
    钟控D触发器
    面试题—JAVA基础①
    redis做缓存(cache)
    primAlgorithm普利姆算法
    分布式ID生成方案总结整理
  • 原文地址:https://blog.csdn.net/weixin_53455615/article/details/127993642