• 初识 RabbitMQ


    初识 RabbitMQ


    1、概述

    ​  RabbitMQ 是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。



    2、架构

    在这里插入图片描述

    • Broker

      ​  代理人,消息队列的主体,负责管理 消息 的接收、发送和存储。

    • VHost

      ​  虚拟主机,发挥 消息队列 内部系统隔离的作用。

    • Exchange

      ​  交换机,完成 消息 的特定传递。

      ​  常用有以下三种:

      • DirectExchange

        ​  直接交换机,点对点传输。

      • FanoutExchange

        ​  扇出交换机,广播传输。

      • TopicExchange

        ​  主题交换机,可根据 routing key 完成分组传输。

    • Queue

      ​  队列,存储消息的地方。

    • Binding

      ​  绑定关系,交换机 和 交换机、交换机 和 队列 都可完成绑定。

    • Message(routing key)

      ​  消息,指定 路由键(routing key)可使 消息 传递到特定的队列。

    • Connect

      ​  连接,客户端 和 Broker 的连接是一种长连接,消息 在该连接中的一条 信道(channel)中完成传输。

    • Producer

      ​  生产者,生产消息。

    • Consumer

      ​  消费者,消费消息。



    3、SpringBoot 整合

    /**
     * 1、创建 Exchange、Queue、Binding
     *      1.1、AmqpAdmin
     * 2、收发消息
     *      2.1、RabbitTemplate
     *      2.2、RabbitListener 类、方法
     *      2.3、RabbitHandler 方法
     *          2.3.1 用于区分 同一队列中的不同消息,不同队列中的不同消息。
     * 3、消息确认机制
     *      3.1、发送方
     *          3.1.1、confirmCallback   确认模式
     *              producer -> Exchange
     *          3.1.2、returnCallback    回退模式
     *              Exchange -> Queue
     *      3.2、接收方
     *          3.2.1、ACK机制
     *              Queue -> Consumer
     */
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    3.1、依赖
    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    3.2、创建交换机、队列、绑定关系
        @Test
        void createExchange() {
            /*
                DirectExchange(String name, boolean durable, boolean autoDelete, Map arguments)
                直接交换机 点对点
             */
            DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
            amqpAdmin.declareExchange(directExchange);
            log.info("DirectExchange 创建成功");
        }
    
        @Test
        void createQueue() {
            /*
                Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map arguments)
                队列
             */
            Queue queue = new Queue("hello-java-queue", true, false, false);
            amqpAdmin.declareQueue(queue);
            log.info("Queue 创建成功");
        }
    
        @Test
        void createBinding() {
            /*
                Binding(String destination, Binding.DestinationType destinationType, String exchange, String routingKey, @Nullable Map arguments)
                绑定关系
             */
            Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE,
                    "hello-java-exchange", "hello-java", null);
            amqpAdmin.declareBinding(binding);
            log.info("Binding 创建成功");
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    3.3、收发消息
    3.3.1、发送消息
        @Test
        void sendMessageString() {
            /*
                void convertAndSend(String exchange, String routingKey, Object object);
             */
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", "hello world");
            log.info("消息已发送");
        }
    
        @Test
        void sendMessageObj() {
            /*
                对象需要实现 Serializable 接口
             */
            Data data = new Data();
            data.setId("1");
            data.setName("小米");
            data.setTime(LocalDateTime.now());
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
            log.info("消息已发送");
        }
    
        @Test
        void sendMessageJson() {
            /*
                配置 MessageConverter -> new Jackson2JsonMessageConverter()
             */
            Data data = new Data();
            data.setId("1");
            data.setName("小米");
            data.setTime(LocalDateTime.now());
            rabbitTemplate.convertAndSend("hello-java-exchange", "hello-java", data);
            log.info("消息已发送");
        }
        
        @lombok.Data
        static class Data implements Serializable {
            private String id;
            private String name;
            private LocalDateTime time;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    3.3.2、接收消息

     注:启动类需要添加 @EnableRabbit 注解。

        /*
            Message message, Data content, Channel channel
            1、客户端 接收到消息后,Queue 中该消息会自动删除
            2、客户端 有序接收消息
         */
        @RabbitListener(queues = "hello-java-queue")
        void receiveMessage(Message message, Data content, Channel channel) {
            System.out.println("接收到消息,内容为:" + message + ", 类型为:" + message.getClass());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    @Service
    @RabbitListener(queues = "hello-java-queue")
    class TestService {
       
        @RabbitHandler
        void manualAck(Message message, Data content, Channel channel) throws IOException {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            /*
                签收
                    void basicAck(long deliveryTag, boolean multiple);
                        deliveryTag 通道自增
                        multiple 是否为批量模式
                拒收
                    void basicNack(long deliveryTag, boolean multiple, boolean requeue);
                        requeue 是否重新入队
                    void channel.basicReject(long deliveryTag, boolean multiple, boolean requeue);
             */
            channel.basicAck(deliveryTag, false);
    //        channel.basicNack(deliveryTag, false, true);
    //        channel.basicReject(deliveryTag, true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    3.4、配置
    3.4.1、配置文件
    spring:  
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        virtual-host: /
        publisher-confirm-type: correlated
        publisher-returns: true
        template:
          # 异步回调 returnCallback
          mandatory: true
        listener:
          simple:
            # 手动 ack
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    3.4.2、配置类
    package com.hong.changfeng.information.config;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Primary;
    
    
    @Slf4j
    @Configuration
    public class RabbitConfig {
    
        @Primary
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setMessageConverter(messageConverter());
            initRabbitTemplate(rabbitTemplate);
            return rabbitTemplate;
        }
    
        /**
         * 使用 JSON 序列化机制 进行消息转换
         */
        @Bean
        public MessageConverter messageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    
        /**
         * 定制 RabbitTemplate
         */
        public void initRabbitTemplate(RabbitTemplate rabbitTemplate) {
            // 设置 消息到达 Broker 的确认回调
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 * 消息到达 Broker 时触发
                 *
                 * @param correlationData 当前消息的唯一关联数据(id)
                 * @param ack 消息是否到达 Broker(Exchange)
                 * @param cause 失败的原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    log.debug("RabbitConfig -> initRabbitTemplate " +
                            " correlationData: " + correlationData + " ack: " + ack + " cause: " + cause);
                }
            });
    
            // 设置 消息到达 Queue 的确认回调
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * 消息未到达 Queue 时触发
                 *
                 * @param message 消息
                 * @param replyCode 回复的状态码
                 * @param replyText 回复的文本内容
                 * @param exchange 交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    log.debug("RabbitConfig -> initRabbitTemplate " +
                            " message: " + message + " replyCode: " + replyCode + " replyText: " + replyText +
                            " exchange: " + exchange + " routingKey: " + routingKey);
                }
            });
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
  • 相关阅读:
    Autosar架构介绍:总目录
    SQL(Structured Query Language)简介和常见 SQL 命令示例
    零代码编程:用ChatGPT自动合并多个Word文件
    Linux:进程概念的引入和理解
    offset新探索:双管齐下,加速大数据量查询
    Deformable Convolutional Networks
    【语义分割】2019-CCNet ICCV
    Day45 力扣动态规划 : 1143.最长公共子序列 |1035.不相交的线 | 53. 最大子序和
    你不知道的大像素全景,在行业应用中竟如此重要
    糖尿病患者饮食该注意些什么
  • 原文地址:https://blog.csdn.net/weixin_51123079/article/details/127136820