• RabbitMQ之Exchange(交换机)属性及备用交换机解读


    目录

    基本介绍

    主要结论

    备用交换机

     springboot代码实战(备用交换机)

    实战架构

    工程概述

    RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

    MessageService业务类:发送消息及接收消息

    主启动类RabbitMq01Application:实现ApplicationRunner接口


    基本介绍

    RabbitMQ 中,交换机主要用来将生产者生产出来的消息,传送到对应的频道中,即交换机是一个消息传送的媒介,其英文被称为 exchange 。交换机在 RabbitMQ 中起着承上启下的作用。

    它主要由以下属性可供选择 :

    1. Name:交换机名称;就是一个字符串
    2. Type:交换机类型,direct, topic, fanout, headers四种
    3. Durability:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在;
    4. Auto delete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机;
    5. Internal:内部使用的,如果是yes,客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定。
    6. Arguments:只有一个取值alternate-exchange,表示备用交换机;

    主要结论

    结论1:没发消息之前不会创建交换机和对列

    结论2:发消息后,如果交换机不存在,才开始创建交换机,如果队列不存在,则创建新的对列

    结论3:创建交换机或者队列完成后再重新创建,如果修改交换机或队列参数则会报错

    406错误(inequivalent arg 'durable' for exchange 'exchange.durability' in vhost 'powernode': received 'false' but current is 'true', class-id=40, method-id=10))

    结论4:设置持久化为false ,重启rabbitmq-server,则交换机丢失,实验durable参数,看下控制台,然后重启rabbitmq-server

    结论5:自动删除为 true ,从控制台上手动解绑,会发现自动删除

    备用交换机

    备份 交换机可以理解为 RabbitMQ 中交换机的“备胎”,当我们为某一个交换机声明一个对应的备份交换机时, 就是为它创建一个备胎,当交换机接收到一条不可路由消息时,将会把这条消息转发到备份交换机中,由 备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑 定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都 进入这个队列了。到达这个队列以后消费者可以进行处理,通知开发人员进行查看。

    当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换机来实现,可以接收备用交换机的消息,然后记录日志或发送报警信息。 

    设置参考代码

    1. Map arguments = new HashMap<>();
    2. //指定当前正常的交换机的备用交换机是谁
    3. arguments.put("alternate-exchange", EXCHANGE_ALTERNATE);
    4. //DirectExchange(String name, boolean durable, boolean autoDelete, Map arguments)
    5. return new DirectExchange(EXCHANGE, true, false, arguments);
    6. //return ExchangeBuilder.directExchange(EXCHANGE).withArguments(args).build();

     springboot代码实战(备用交换机)

    实战架构

    如上图,消息到达正常的交换机normalExchange后,发现没有对应的路由队列,消息会转发到备用交换机alternateExchange中,备用交换机是一个扇形交换机,随后会转发到alternateQueue队列中去。

    工程概述

     工程采用springboot架构,主要用到的依赖为:

    1. <!-- rabbit的依赖-->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-amqp</artifactId>
    5. </dependency>
    6. <dependency>
    7. <groupId>org.projectlombok</groupId>
    8. <artifactId>lombok</artifactId>
    9. </dependency>

    application.yml配置文件如下:

    1. server:
    2. port: 8080
    3. spring:
    4. rabbitmq:
    5. host: 123.249.70.148
    6. port: 5673
    7. username: admin
    8. password: 123456
    9. virtual-host: /

    RabbitConfigDeal 配置类:创建队列及交换机并进行绑定 

    1. @Configuration
    2. public class RabbitConfigDeal {
    3. }

    创建正常交换机

    1. @Bean
    2. public DirectExchange normalExchange() {
    3. Map arguments = new HashMap<>();
    4. //设置备用交换机
    5. arguments.put("alternate-exchange", "alternateExchange");
    6. return ExchangeBuilder.directExchange("normalExchange")
    7. .withArguments(arguments)
    8. .build();
    9. }

    创建备用交换机

    1. @Bean
    2. public FanoutExchange alternateExchange() {
    3. return ExchangeBuilder.fanoutExchange("alternateExchange").build();
    4. }

     创建备用交换的队列

    1. @Bean
    2. public Queue alternateQueue(){
    3. return QueueBuilder.durable("alternateQueue").build();
    4. }

     绑定备用交换机和队列

    1. @Bean
    2. public Binding bindingAlternateExchange(Queue alternateQueue, FanoutExchange alternateExchange){
    3. return BindingBuilder.bind(alternateQueue).to(alternateExchange);
    4. }

    MessageService业务类:发送消息及接收消息

    1. @Component
    2. @Slf4j
    3. public class MessageService {
    4. @Resource
    5. private RabbitTemplate rabbitTemplate;
    6. }

     发送消息方法

    1. public void sendMsg(){
    2. MessageProperties messageProperties = new MessageProperties();
    3. Message message1 = new Message("hello word ".getBytes(), messageProperties);
    4. //发送消息
    5. rabbitTemplate.convertAndSend("alternateExchange", "info", message1);
    6. log.info("发送完毕:{}" , new Date());
    7. }

    这里用的路由key为info 。

    MessageConvert

    • 涉及网络传输的应用序列化不可避免,发送端以某种规则将消息转成 byte 数组进行发送,接收端则以约定的规则进行 byte[] 数组的解析
    • RabbitMQ 的序列化是指 Messagebody 属性,即我们真正需要传输的内容,RabbitMQ 抽象出一个 MessageConvert 接口处理消息的序列化,其实现有 SimpleMessageConverter默认)、Jackson2JsonMessageConverter

      接受消息

    1. @RabbitListener(queues = {"alternateQueue"})
    2. public void receiveMsg(Message message){
    3. byte[] body = message.getBody();
    4. String queue = message.getMessageProperties().getConsumerQueue();
    5. String msg=new String(body);
    6. log.info("{}接收到消息时间:{},消息为{}",queue,new Date(),msg);
    7. }

     Message

    在消息传递的过程中,实际上传递的对象为 org.springframework.amqp.core.Message ,它主要由两部分组成:

    1. MessageProperties // 消息属性

    2. byte[] body // 消息内容

    @RabbitListener

    使用 @RabbitListener 注解标记方法,当监听到队列 debug 中有消息时则会进行接收并处理

    • 消息处理方法参数是由 MessageConverter 转化,若使用自定义 MessageConverter 则需要在 RabbitListenerContainerFactory 实例中去设置(默认 Spring 使用的实现是 SimpleRabbitListenerContainerFactory)

    • 消息的 content_type 属性表示消息 body 数据以什么数据格式存储,接收消息除了使用 Message 对象接收消息(包含消息属性等信息)之外,还可直接使用对应类型接收消息 body 内容,但若方法参数类型不正确会抛异常:

      • application/octet-stream:二进制字节数组存储,使用 byte[]
      • application/x-java-serialized-object:java 对象序列化格式存储,使用 Object、相应类型(反序列化时类型应该同包同名,否者会抛出找不到类异常)
      • text/plain:文本数据类型存储,使用 String
      • application/json:JSON 格式,使用 Object、相应类型

    主启动类RabbitMq01Application:实现ApplicationRunner接口

    1. /**
    2. * @author 风轻云淡
    3. */
    4. @SpringBootApplication
    5. public class RabbitMq01Application implements ApplicationRunner {
    6. public static void main(String[] args) {
    7. SpringApplication.run(RabbitMq01Application.class, args);
    8. }
    9. @Resource
    10. private MessageService messageService;
    11. /**
    12. * 程序一启动就会调用该方法
    13. * @param args
    14. * @throws Exception
    15. */
    16. @Override
    17. public void run(ApplicationArguments args) throws Exception {
    18. messageService.sendMsg();
    19. }
    20. }

    在SpringBoot中,提供了一个接口:ApplicationRunner。 该接口中,只有一个run方法,他执行的时机是:spring容器启动完成之后,就会紧接着执行这个接口实现类的run方法。

    由于该方法是在容器启动完成之后,才执行的,所以,这里可以从spring容器中拿到其他已经注入的bean。

    启动主启动类后查看控制台:

    1. 2023-10-01 11:57:57.660 INFO 83984 --- [ main]
    2. c.e.rabbitmq01.service.MessageService :
    3. 发送完毕:Sun Oct 01 11:57:57 CST 2023
    4. 2023-10-01 11:57:57.718 INFO 83984 --- [ntContainer#0-1]
    5. c.e.rabbitmq01.service.MessageService :
    6. alternateQueue接收到消息时间:Sun Oct 01 11:57:57 CST 2023,消息为hello word

    我们在这里可以看见与备用交换机绑定的alternateQueue成功接收到了消息

  • 相关阅读:
    如何在MySQL中设置和使用EXPLAIN来分析查询性能?你知道哪些MySQL性能监控工具?
    AD9371 官方例程 NO-OS 主函数 headless 梳理(二)
    ACM 动态规划题 -找最多可能浏览的景点数
    GC标记清除算法
    Android 协程 异常捕获 异常的传播特性
    人生第一个java项目 学生管理系统
    AI语音机器人的重点功能配置之话术
    【RTOS训练营】I2C和UART知识和预习安排 + 晚课提问
    Eviews用向量自回归模型VAR实证分析公路交通通车里程与经济发展GDP协整关系时间序列数据和脉冲响应可视化...
    使用 Kitten 开发一款趣味成语接龙游戏
  • 原文地址:https://blog.csdn.net/m0_62436868/article/details/133459737