• RabbitMQ笔记


    一、名词解释

    • ConnectionFactory:生产RabbitMQ服务器连接的工厂
    • Connection:客户端与RabbitMQ服务器的连接
    • Channel:由Connection产生,负责与Exchange通信。一个连接可以控制多个Channel与多个交换机通信,减少客Connection个数
    • Exchange:接受消息提供者(生产者)的消息,并根据消息的RoutingKey,与交换机绑定队列的BindingKey进行匹配分配消息
    • Queue:存储消息接收者(消费者)的消息
    • RoutingKey:指定当前消息被哪些队列接受,RoutingKey由生产者发送给交换机
    • BindingKey:交换机与队列的绑定关系。交换机拿到RoutingKey之后,拿去跟队列的绑定关系BindingKey进行匹配

    在这里插入图片描述

    二、案例

    2.1 生产者代码

    先装配一个RabbitTemplate

    @Bean(name = "rabbitTemplate")
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        //必须是prototype类型
        public RabbitTemplate rabbitTemplate(@Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    
            //使用jackson消息转换器
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter(objectMapper));
            rabbitTemplate.setEncoding("UTF-8");
    
            /*
             * 开启ReturnCallback
             * 为false时,匹配不到会直接被丢弃
             * 为true时,消息通过交换器无法匹配到队列会返回给生产者,并触发ReturnedMessage
             */
            rabbitTemplate.setMandatory(true);
            //当消息进入Exchange交换器,但是未进入队列时回调
            rabbitTemplate.setReturnsCallback((ReturnedMessage returned) -> {
                String correlationId = returned.getMessage().getMessageProperties().getCorrelationId();
                log.info("消息:{} 发送失败, 应答码:{} 原因:{} 交换机: {}  路由键: {}", correlationId, returned.getReplyCode(),
                        returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
            });
            /*
             * 消息确认yml需要配置publisher-returns: true
             * 当消息进入Exchange交换器时就进入回调,不管是否进入队列
             */
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
                if (!ack) {
                    log.info("消息发送到exchange失败,原因: {}", cause);
                }
            });
            return rabbitTemplate;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    发送消息

    /*
     1. CusRabbitMQConfig.MESSAGE_NOTICE 交换机名称,RabbitMQ中如果不存在该交换机就会报错,并回调ConfirmCallback
     2. 第二个参数是RoutingKey,设置为空字符串,那么只能批配到"#",匹配不到(mandatory为true)会回调ReturnsCallback
     3. 第三个参数是消息体
     4. 这里没有指定交换机的类型,因为交换机是在消费端创建的
    */
    rabbitTemplate.convertAndSend(CusRabbitMQConfig.MESSAGE_NOTICE, "", message);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    2.2 消费者代码

     @RabbitListener(bindings = @QueueBinding(
     			//指定交换机,容器启动时(后置增强器RabbitListenerAnnotationBeanPostProcessor)RabbitMQ中如果不存在就创建该交换机
                exchange = @Exchange(value = StreamInputInterface.RECEIVE_NOTICE_MESSAGE, type = ExchangeTypes.TOPIC),
                //指定队列,同样在后置增强器里面会与RabbitMq通信,自动创建一个队列与交换机绑定,队列名称自动生成
                value = @Queue(durable = "true"),
                //指定BindingKey交换机与队列的绑定关系,井号表示匹配所有,意味着只要发送到该交换机的消息都会进入这个队列,从而被消费
                key = "#"
        ), containerFactory = "rabbitListenerContainerFactory")
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    启动应用在这里插入图片描述看输出的日志就知道,自动创建了一个队列,该队列持久化并且自动删除。具体过程可以看这篇队列如何自动创建
    在RabbitMQ后台也可以看到该队列已经绑定到交换机。停止工程之后队列就消失了。在这里插入图片描述

    2.3 总结

    1. 生产者产生消息时,必须交换机和队列都找到才处理消息,否则就回调生产者自行处理。
    2. 产生消息时,会指定一个RoutingKey。
    3. 对于一个交换机而言,每个消费者只会消费一个队列,这个队列会和交换机绑定(BindingKey)。
    4. 一个队列可以被多个消费者消费,队列里面的每条条消息只会被这些消费者其中之一消费
    5. 交换机接收到生产者的消息,通过生产者提供的RoutingKey与所有队列的BindingKey匹配(类似于正则)。哪个队列匹配成功,就将消息投送过去。
    6. 假如有两个消费者,他们消费同一个交换机的两个不同的队列,这两个队列与交换机的绑定关系相同(BindingKey相同),当某条消息的RoutingKey与这个BindingKey匹配,就会将这条消息投送到两个队列,两个消费者就都能消费到。
    7. 某个服务在集群部署时要保证每个节点消费的队列名称相同,像spring自动生成名称并创建队列就不行,会导致消息被重复消费,可以手动指定队列名称,这样就只会创建一次。

    2.3.1 消费者组

    根据第6条,我们可以设计一个类似于Kafka消费者组的功能,如图:
    在这里插入图片描述

  • 相关阅读:
    AI Earth ——开发者模式案例3:典型植被指数计算及区域统计
    .net 6 api 修改URL为小写
    【Python笔记-设计模式】状态模式
    React 全栈体系(六)
    Lua速成(2)
    在哪里考华为认证更容易?
    Avalonia 使用EFCore调用SQLite实现Singleton全局注册
    postgres创建递归视图
    【CSS】全局滚动条样式设置
    win11系统前端IIS部署发布网站步骤
  • 原文地址:https://blog.csdn.net/yx444535180/article/details/125404949