• 【分布式】SpirngBoot 整合RabbitMQ、Exchagne模式、确认消费


    分布式


    SpringBoot 整合 RabbitMQ,消息模型,确认消费


    RabbitMQ在模块解耦,接口限流、异步通信等方面发挥重要作用,在成熟的RabbitMQ之前,项目一般采用Spring的事件驱动模型来进行异步通信

    缓存穿透 除了缓存null之外 还可以在之前加上布隆过滤器,也就是hash到一个大bitmap,查询先过滤看是否存在key,存在再查询缓存

    并且之前的Cache只是表面的知识,虽然加上分布式🔒可以抗住高并发,但是一旦缓存失效,需要准备高可用的解决方案, 并且必须使用mq限流,因为应用层容器Tomcagt的吞吐量有限,必须保证同时到达的流量不查过Tomcat集群的上限

    SpringBoot 整合RabbitMQ

    RabbitMQ是一款用于接收、存储和转发消息的开源中间件,其核心在于消息、消息模型、生产和消费者,在整合之前,先介绍生产者、消费者、消息、队列、交换机、路由等基本组件

    在这里插入图片描述

    RabbitMQ可以用邮局来类比【或者STOMP的聊天】, 邮局的核心包括邮件、邮寄箱、寄邮件用户、收邮件用户、邮递员

    投递用户A --投递---> 邮件 ---> 邮件箱
    							 |_取---> 邮递员M --> 邮件 ---> 收件人B
    							 
    STOMP聊天服务(私聊频道)
    
    用户A ---发送---> 消息 ----> chat消息频道
    						  |__ broker代理--> 用户频道 --> 用户B
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    投递用户A就是RabbitMQ产生消息的生产者,邮件就相当于消息,接收邮件用户B相当于RabbitMQ接收消息的消费者,邮件箱 就是RabbitMQ的交换机, 邮递员就是 模型中的队列

    • 生产者 Producer
      • 用于产生、发送消息的程序, 向消息队列发布消息
    • 消费者 Consumer
      • 用于监听、接收、消费和处理消息的程序, 从消息队列获取消息
    • 消息 Message
      • 消息由消息头和消息体组成。消息体是不透明的,消息头由一系列可选属性【routing-key,priority优先权、delivery-mode 是否持久性存储】
      • 可以是实际的数据,比如文字、图片等,在RabbitMQ的底层架构中,消息通过二进制的数据流进行传输
    • Channel 信道(频道)
      • 多路复用连接中的一条独立的双向数据流通道,信道建立在真实的TCP内的虚拟连接,复用TCP连接的通道
    • 队列 Queue
      • 存储消息的一种数据结构,保存消息,是消息的容器,也是消息的终点
      • 消息的暂存区或存储区,可以看作中转站,消息经过队列传输到消费者手里
      • 一个消息可以投入一个或者多个队列,一直在Queue中,等待Consumer连接Queue将消息取走
      • 多个消费者同时订阅一个Queue,Queue中的Message平均分摊给多个消费者,不是每个消费者都收到所有的消息并处理
      • 每一个消息只能被一个订阅者接收
    • 交换机 Exchange 路由器
      • 消息的中转站、路由器,提供Producer到Queue之间的路由匹配,接收Procuder的消息,将消息按照路由规则转发到消息队列,路由器只是转发消息,不会存储消息,如果没有Queue半岛到Exchange,会丢弃Producer的Message,用于接收和分发消息
      • 其有四种消息调度策略: Headers,Fanout(广播)、Direct、Topic
    • 路由键 Routing Key
      • 消息头的一个属性,标记消息的路由规则,,一般不单独使用,而是和交换机绑定在一起,决定消息路由到指定的队列,最大长度255字节
    • Binding 绑定
      • 用于建立Exchange和Queue之间的关联, 一个绑定就是基于Binding Key将Exchange和Queue连接起来的路由规则, 交换器理解就是一个由Binding构成的路由表
    • Binding Key 绑定键
      • Exchange和Queue的绑定关系,匹配Routing Key, 最大长度255字节

    Broker就是RabbitMQ的部署的服务器,实现消息的通信代理

    RabbitMQ的消息模型主要有队列、交换机、路由组成

    						——-->交换机-|
    生产者 --产生---> 消息 ---|			---消息-->队列 ---监听接收---> 消费者
    						——-> 路由 --|
    
    • 1
    • 2
    • 3

    整合过程

    1. 要在项目中使用RabbitMQ,首先就是要引入相关的Starter,在server模块加入, 因为RabbitMQ是基于AMQP协议的,所以springboot是直接抽象的amqp起步依赖
    <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    1. 配置RabbitMQ 【yml】,分布式架构下,RabbitMQ也是单独部署的,可以搭建集群,所以需要在项目中像redis一样指定RabbitMQ服务器所在的Host、端口号、用户名、密码
    #rabbit配置
      rabbitmq:
        virtual-host: /
        host: 192.168.204.100
        port: 5672
        username: cfeng
        password: a123456sgssdhhhs7890bfagds
        publisher-confirm-type: correlated  #发送消息之后进行确认  发送到交换机触发回调 publisher-confirm 废弃
        publisher-returns: true  #发送消息后返回确认信息
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    因为SpirngBoot的自动配置【可以参见之前的blog和封装minio-starter】,快速使用rabbitMQ【 springBoot、Docker】让使用一个服务变得非常简单

    1. 自定义配置Bean , 配置文件

    上面的简易配置只能够使用默认的简易功能,为了自定义使用Bean,创建RabbitmqConfig配置文件

    除了@Value(${})可以调取yml的配置项之外,还可以直接使用Environment对象的getProperty获取yml中的所有的配置项 非常方便,注入Environment对象即可

     * rabbitmq自定义配置文件,需要配置单一消费者工厂,多消费者工厂、消息发送Template对象
     */
    
    @Slf4j
    @Configuration
    @RequiredArgsConstructor
    @EnableRabbit  //开启rabbitmq的注解方式
    public class RabbitmqConfig {
    
        //自动装配的连接工厂,连接到rabbitMQ服务器
        private final CachingConnectionFactory connectionFactory;
    
        //自动装配的简单Rabbit消息监听器所在容器工厂的配置对象
        private final SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
        
        //方便快捷代替配置文件,Environment可以方便获取yml文件中的配置项
        private final Environment environment;
    
        /**
         * 单一消费者实例监听器容器工厂
         */
        @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
            //消费者监听器所在容器工厂
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            //设置连接的实例
            factory.setConnectionFactory(connectionFactory);
            //设置消息的阐述格式,JSON
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            //设置并发消费者实例最大数量(Security的并发最大)
            factory.setMaxConcurrentConsumers(1);
            //设置并发消费者实例每个实例拉取的消息数量 1
            factory.setPrefetchCount(1);
    
            return factory;
        }
    
        /**
         * 多消费者实例工厂,针对高并发的业务场景
         */
        @Bean(name = "multiListenerContainer")
        public SimpleRabbitListenerContainerFactory multiListenerContainer() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            //设置连接工厂,可以使用configure对象配置
            factoryConfigurer.configure(factory,connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
            //消息的确认消费模式,先设置为NONE,表示不需要确认消费
            factory.setAcknowledgeMode(AcknowledgeMode.NONE);
    
            //高并发并发消费者实例最大数量  初始状态
            factory.setConcurrentConsumers(10);
            //并发的最大数量
            factory.setMaxConcurrentConsumers(15);
            //并发消费者实例每个实例拉取的消息数量
            factory.setPrefetchCount(10);
    
            return factory;
        }
    
        /**
         * 配置RabbitMQ发送消息组件RabbitTemplate
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            //设置发送消息后进行确认, connectionFactory的配置都直接在配置文件配置
    //        connectionFactory.setPublisherConfirms(true);
    //        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            //生产者通过调用channel.addReturnListener()方法来添加ReturnListener监听器,实现获取没有被正确路由到合适队列的消息
            rabbitTemplate.setMandatory(true);
    
            //发消息如果成功,给出反馈信息
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, String s) {
                    log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s);
                }
            });
    
            //发送消息失败,给出反馈
            rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
                @Override
                public void returnedMessage(ReturnedMessage returnedMessage) {
                    log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message({})",returnedMessage.getExchange(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage());
                }
            });
    
            return rabbitTemplate;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89

    当rabbitMQ处理高并发业务场景,使用多消费者实例,正常情况即不需要并发监听消费处理时,只需要配置单一消费者实例的容器工厂

    配置完成就可以开始使用,使用rabbitMQ完成生产者发送一串简单的字符串信息到基本的消息模型,由消费者进行监听消费处理

    继续在RabbitmqConfig中配置创建队列、交换机、路由及其绑定,也就是创建Queue、基础的DirectExchange、Binding对象

    	//方便快捷代替配置文件,Environment可以方便获取yml文件中的配置项
        private final Environment environment;
    
    /**
         * 配置简单的消息模型 :队列、路由、交换机
         */
        @Bean(name = "basicQueue")
        public Queue basicQueue() {
            return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-name")),true);
        }
    
        @Bean
        public DirectExchange basicExchange() {
            return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-name"),true,false);
        }
    
        @Bean
        public Binding basicBinding() {
            return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(environment.getProperty("spring.rabbitmq.route-name"));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    之后编写消息的生产者BasicPulisher,生产者发送消息

     * 基础模型的生产者
     */
    
    
    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class BasicPublisher {
    
        private final ObjectMapper objectMapper;
    
        private final  RabbitTemplate rabbitTemplate;
    
        //配置项获取
        private final Environment environment;
    
        /**
         * 发送消息
         */
        public void sendMsg(String message) {
            //首先要判断消息是否为空
            if(!Strings.isNullOrEmpty(message)) {
                try {
                    //传输数据格式JSON
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    //指定消息模型的交换机
                    rabbitTemplate.setExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-name")));
                    //指定消息模型的路由
                    rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-name")));
                    //消息以二进制字符流传输 , 可以使用writeValueAsBytes,不然转换失败
                    Message msg = MessageBuilder.withBody(message.getBytes("utf-8")).build();
                    //转化发送消息
                    rabbitTemplate.convertAndSend(msg);
                    log.info("基本模型: 生产者发送消息: {}",message);
                } catch (Exception e) {
                    log.error("基本消息模型 生产者 发送消息发生异常: {}", e.fillInStackTrace());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    这里的消息发送依赖的就是RabbitTemplate, 之前STOMP服务消息发送依赖的是SimpleMessageOpreations, 也就是一个messageTemplate

    之后再简单创建一个基础的消费者实例BasicConsumer

     * rabbitMQ基础模型 消费者,监听接收消息
     */
    
    @Component
    @Slf4j
    @RequiredArgsConstructor
    public class BasicConsumer {
    
        private final ObjectMapper objectMapper;
    
    
        /**
         *  监听接收消费队列中的消息, 这里采用单一容器工厂
         * @param msg  传输的二进制消息,这里的@PayLoad和之前的STOMP的是类似的就是将消息转化为给定的类型
         *  这里使用@RabbitListener 类似之前STOMP的@MessageMapping类似
         */
        @RabbitListener(queues = "${spring.rabbitmq.queue-name}",containerFactory = "singleListenerContainer")
        public void consumeMsg(@Payload byte[] msg) {
            try {
                String message = new String(msg,"utf-8");
                log.info("基本消费模型 消费者:监听到消息: {}", message);
            } catch (Exception e) {
                log.error("基本消息模型-消费者 发生异常 : {}",e.fillInStackTrace());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这样也就创建好了基本的生产消费模型,和之前的SpringBoot的事件驱动模型类似,这里编写测试类调用生产者发送消息【主要依靠的是RabbitmqTemplate的convertAndSend】,消费者位置监听器需要指定队列和监听器的容器工厂

    @Test
        public  void testProduceMessage() {
            basicPublisher.sendMsg("kiss");
        }
    
    • 1
    • 2
    • 3
    • 4

    之后测试发送,容易出现的问题就是convert失败,所以这里需要注意生产者写入消息的方式,可以直接采用ObjectMapper的writeValueAsBytes

    2022-09-16 09:53:04.392  INFO 4428 --- [           main] c.s.rabbitmq.producer.BasicPublisher     : 基本模型: 生产者发送消息: kiss
    2022-09-16 09:53:04.397  INFO 4428 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)
    
    2022-09-16 09:53:04.421  INFO 4428 --- [ntContainer#0-1] c.s.rabbitmq.consumer.BasicConsumer      : 基本消费模型 消费者:监听到消息: �+,
    2022-09-16 09:53:04.427  INFO 4428 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
    2022-09-16 09:53:05.431  INFO 4428 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    可以生产者生产消息之后,消费者成功监听,监听线程也是单独的【main线程发送消息】

     BasicProducer ---Msg-->  交换机Exchange
     							|__ Binding -- 队列BasicQueue <---listen--BasicConsumer
    
    • 1
    • 2

    RabbitMQ除了可以简单发送字节型(通过getBytes方法或者序列化方法)的消息和采用@RabbitListener监听字节数组类型消息之外,还可以通过发送、接收 对象类型的方式实现消息的发送和接收,下面也简单演示一下

    建立一个测试的对象类型用于传输,因为要网络传输,所以需要序列化

     * 测试rabbitMQ以对象作为消息类型,不简单为String
     */
    
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class Student {
    
        private Integer id;
    
        private String realName;
    
        private String username;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    再单独配置对象类型的队列、交换机和路由

    ##自定义配置对象类型队列、交换机、路由名称
        queue-object-name:  middleware.mq.object.info.queue
        exchange-object-name: middleware.mq.object.info.exchange
        route-object-name: middleware.mq.object.info.route
        
        
       /**
         * 配置简单类型的消息模型: 对象类型   : 队列、路由、交换机
         */
        @Bean(name = "objectQueue")
        public Queue objectQueue() {
            return new Queue(environment.getProperty("spring.rabbitmq.queue-object-name"));
        }
    
        @Bean
        public  DirectExchange objectExchange() {
            return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"),true,false);
        }
    
        @Bean
        public Binding objectBinding() {
            return BindingBuilder.bind(objectQueue()).to(objectExchange()).with(environment.getProperty("route-object-name"));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    之后就是再同样建立对象之间的消息传播的基本模型: 消费者、生产者、消息

    public void sendObjectMsg(Student student) {
            //首先选哟判断是否不为null
            if(!Objects.isNull(student)) {
                try {
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    //消息的交换机和binding
                    rabbitTemplate.setExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"));
                    rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-object-name")));
                    //发送消息
                    rabbitTemplate.convertAndSend(student, message -> {
                        //获取消息属性
                        MessageProperties properties = message.getMessageProperties();
                        //持久化模式,PERSISTENT
                        properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //设置消息的类型,消息头设置AbstractJavaTypeMapper的默认的类型
                        properties.setHeader(AbstractJavaTypeMapper.DEFAULT_KEY_CLASSID_FIELD_NAME,Student.class);
                        //返回配置后的消息
                        return message;
                    });
                    log.info("基本消息模型-生产者-发送对象类型消息: {}",student.toString());
                } catch (Exception e) {
                    log.error("基本消费模型-生产者, 发送消息失败: {}", student.toString(),e.fillInStackTrace());
                }
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25

    对比简单的消息发送,这里主要就是再发送消息时,通过Process指定消息的属性: 消息的持久化模式、消息头【类型】

    @RabbitListener(queues = "${spring.rabbitmq.queue-object-name}",containerFactory = "singleListenerContainer")
        public void consumeObjMsg(@Payload Student student) {
            try {
                log.info("基本消费模型 --- 消费者: 监听到消息: {}",student);
            } catch (Exception e) {
                log.error("基本消费模型 --- 消费者: 监听消息异常 {}", e.fillInStackTrace());
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    之后简单调用Producer的API发送消息即可

    2022-09-16 19:46:48.860  INFO 17068 --- [           main] c.s.rabbitmq.producer.BasicPublisher     : 基本消息模型-生产者-发送对象类型消息: Student(id=1, realName=zs, username=cfeng)
    2022-09-16 19:46:48.872  INFO 17068 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)
    
    2022-09-16 19:46:48.890  INFO 17068 --- [ntContainer#0-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
    2022-09-16 19:46:48.892  INFO 17068 --- [ntContainer#1-2] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
    2022-09-16 19:46:48.905  INFO 17068 --- [ntContainer#0-1] c.s.rabbitmq.consumer.BasicConsumer      : 基本消费模型 --- 消费者: 监听到消息: Student(id=1, realName=zs, username=cfeng)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    这里演示采用的都是Direct交换机模型,一共四种交换机模型

    多种消息模型 — Exchange调度策略

    调度策略就是Exchange交换机收到生产者发送的消息后按照什么规则转发消息,调度策略的三个因素:Exchange Type、 Binding Key 和 消息的标记信息(Routing Key 和headers, Exchagne根据消息的Routing Key和Exchange 绑定Queue 的Binding Key分配消息, 生产者发送消息时,一般会指定Routing Key,指定路由规则, Exchange Type和 binding key确定时,

    Producer指定routing key即可决定消息的流向, Exhange路由有多种不同的模式, 也就决定了几种不同的消息模型

    Fanout 订阅 、广播模式 ---- 适用于 业务数据需要广播场景: 用户操作写日志

    Fanout Exhange,Fanout交换机具有广播消息的作用,当消息进入Exchange之后,交换机会检查 binding了哪些Queue,找到之后,将消息发送到相关的Queue中【每一个Queue】, 由队列对应的消费者监听使用

    在这里插入图片描述

    订阅模式 与 Binding Key和Routing Key无关, 同时,Fanout交换机转发消息最快,交换器会将消息分发给所有有绑定关系的消息Queue中, 就像子网广播一样,都获得一份复制的消息; 就算绑定了路由也是无用的; 队列数N >= 1

    其实该订阅模式就类似STOMP协议下的服务,订阅相关的频道,就可以介绍到发送到该频道的消息,所以进入的用户都订阅public频道就可以接收消息

    这里简单的演示消息的发送,封装消息实体ChatMessage,建立多个队列

    ###自定义配置广播、订阅模型的多个Queue、交换机,不需要路由,因为绑定了也无用,会子网官博,不是独享
        queue-one-name: middleware.mq.fanout.one.queue
        queue-two-name: middleware.mq.fanout.two.queue
        exchange-fanout-name: middleware.mq.fanout.info.route
        
    /**
         * 广播模式交换机、 不需要配置路由信息,因为路由无用,子网广播
         */
        @Bean(name = "fanoutQueueOne")
        public Queue fanoutQueueOne() {
            return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-one-name")));
        }
    
        @Bean(name = "fanoutQueueTwo")
        public Queue fanoutQueueTwo() {
            return new Queue(Objects.requireNonNull(environment.getProperty("queue-two-name")));
        }
    
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-fanout-name")),true,false);
        }
    
        /**
         * 只需要将交换机和队列绑定再一起即可,不需要指定Bindinging Key
         */
        @Bean
        public Binding fanoutBindingOne() {
            return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
        }
    
        @Bean
        public Binding fanoutBindingTwo() {
            return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35

    之后简单封装消息

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    @ToString
    public class MessageEntity implements Serializable {
    
        private static final long serialVersionUID = -6995434700637844150L;
    
        private Integer id;
    
        private String message;
    
        private Student sender;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    消息生产者ModelPubilsher

    @Component
    @RequiredArgsConstructor
    @Slf4j
    public class FanoutPublisher {
    
        private final RabbitTemplate rabbitTemplate;
    
        private final ObjectMapper objectMapper;
    
        private final Environment environment;
    
        public void sendMsg(MessageEntity messageEntity) throws Exception {
            if(!Objects.isNull(messageEntity)) {
                try {
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    
                    //这里只需要指定交换机,不需要给出Routing key
                    rabbitTemplate.setExchange(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.exchange-fanout-name")));
                    //创建消息Message,转换
                    Message message = MessageBuilder.withBody(objectMapper.writeValueAsBytes(messageEntity)).build();
                    log.info("消息模型Fanout 生产者发送消息: {}",messageEntity);
                } catch (Exception e) {
                    log.error("消息模型Fanout 生产者发送消息异常: {}", messageEntity,e.fillInStackTrace());
                }
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    监听队列的消费者

    @Component
    @Slf4j
    @RequiredArgsConstructor
    public class ModelConsumer {
    
        private final ObjectMapper objectMapper;
    
        @RabbitListener(queues = "${spring.rabbitmq.queue-one-name}",containerFactory = "singleListenerContainer")
        public void consumeFanoutMsgOne(@Payload byte[] msg) {
            try {
                MessageEntity messageEntity = objectMapper.readValue(msg,MessageEntity.class);
                log.info("消息模型fanoutOne 监听到消息:{}", messageEntity);
            } catch (Exception e) {
                log.info("消息模型——消费者one 发生异常: {}",e.fillInStackTrace());
            }
        }
    
        @RabbitListener(queues = "${spring.rabbitmq.queue-two-name}",containerFactory = "singleListenerContainer")
        public void consumeFanoutMsgTwo(@Payload byte[] msg) {
            try {
                MessageEntity messageEntity = objectMapper.readValue(msg,MessageEntity.class);
                log.info("消息模型fanoutTWO 监听到消息:{}", messageEntity);
            } catch (Exception e) {
                log.info("消息模型——消费者two 发生异常: {}",e.fillInStackTrace());
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    之后调用生产者发送消息即可

    2022-09-16 21:15:27.775  INFO 15684 --- [           main] c.s.rabbitmq.producer.FanoutPublisher    : 消息模型Fanout 生产者发送消息: MessageEntity(id=1, message=你好,春风, sender=Student(id=1, realName=zs, username=cfeng))
    2022-09-16 21:15:27.783  INFO 15684 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)
    2022-09-16 21:15:27.795  INFO 15684 --- [ntContainer#2-1] c.s.rabbitmq.consumer.FanoutConsumer     : 消息模型fanoutTWO 监听到消息:(Body:'[B@2855a650(byte[87])' 
    2022-09-16 21:15:27.795  INFO 15684 --- [ntContainer#3-1] c.s.rabbitmq.consumer.FanoutConsumer     : 消息模型fanoutOne 监听到消息:
    
    • 1
    • 2
    • 3
    • 4

    使用Fanout交换机可以让一个交换机绑定多个队列,从而对应多个消费者, 不需要指定binding Key, 所以发送消息也不需要Routing Key

    广播式、订阅式消息模型适用于 业务数据需要广播式传播, 说白了,也就是多个地方需要使用该数据,比如用户操作日志 — 需要将操作信息发送给数据库,同时也需要将其发送给专门的日志系统进行存储, 将其操作日志封装为消息实体,进行传播即可

    Direct 路由模式 — 业务数据直接传输消费

    Direct交换器也就是直接交换器,就是最直接的意思,需要Routing key和Binding key, 识别消息头中的Routing Key,查找Exchange和Queue之间的Binding key,如果匹配,将消息分发到该队列,Direct 式Exchange的默认模式

    默认提供了一个Exchange,名称为空, 类型为Direct,绑定到所有的Queue, 每一个Queue和该Exchange的Binding Key为Queue的名称, 所以不交换器的情况下是可以借助默认交换器发送消息

    在这里插入图片描述

    该模式可以参见上面的整合过程的Basic Queue

    此模型需要严格意义的绑定,也就是在Binding配置时,在配置Exchange和Queue之间的关系时,同时需要指定Binding Key; 这样Producer发送数据时,会传递一个Routing Key检验

    rabbitTemplate.setExchange(environment.getProperty("spring.rabbitmq.exchange-object-name"));
                    rabbitTemplate.setRoutingKey(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.route-object-name")));
    
    
    这里设置的Routing-key和配置conifg的Binding Key相同
    @Bean
        public Binding basicBinding() {
            return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(environment.getProperty("spring.rabbitmq.route-name"));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    这个时候就不会像Fanout一样广播了,消息只会投递到绑定的且满足路由Routing Key的Queue上,不会投递到所有的绑定Queue上

    该模式适合业务数据直接传输消费,比如业务服务模块之间的信息交互,一般业务服务模块之间的消息通信时直接、实时的可以借助DirectExchange, 也是最常见的消息通信模式 == 路由模式

    Topic 通配符模式

    TopicExchange 是一种 发布-主题–订阅 的消息模型,Topic模式也要求交换机、路由、队列严格绑定构成, 和之前的Direct不同,其支持通配方式的路由,可以通过为路由的名称指定特定的 通配符 * 或者 #, 从而绑定到不同的队列中; Routing Key 是一个以 句号. 为分隔的字符串

    在这里插入图片描述

    通配符 * :表示一个特定的单词

    通配符 #: 表示任意的单词,可以0到多个 # > *

    如果direct是正规军,那么Topic就是王牌军,其可以统治direct, 当路由名称包含* 时,*代表一个单词,所以就会降级为基于DirectExchange的消息模型

    而路由名称包含# 时,由于匹配多个单词所以绑定的路由不再起作用,相当于绑定基于FanoutExchange的消息模型, 哪怕有Routing Key,也会匹配到所有的,不再起作用

    在这里插入图片描述

    可以看出Topic和Direct的最主要的不同就是绑定规则Binding Key 是通配的

    这里Cfeng 就只是大概阐述一下代码了,提一下和Direct不同的部分

    ###首先就是定义的route-key 是包含通配符的
    route-one-name: middleware.mq.object.*.route 【*匹配一个单词】
    route-two-name: middleware.mq.#.info.route  【这里#可以匹配任意长度route字符串】
    
    之后就是config中对上面两个路由的绑定,这里创建的是Topic交换机实例
        @Bean
        public  TopicExchange objectExchange() {
            return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-topic-name"),true,false);
        }
        
    @Bean
        public Binding fanoutBindingOne() {
            return BindingBuilder.bind(topicQueueOne()).to(topicExchange()).with(Objects.requireNonNull(environment.getProperties("spirng.rabbitmq.route-one-name")));
        }
    
        @Bean
        public Binding fanoutBindingTwo() {
            return BindingBuilder.bind(topicQueueTwo()).to(topicExchange()).with(Objects.requireNonNull(environment.getProperties("spirng.rabbitmq.route-two-name")));
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    建立交换机和Queue的Binding关系之后,就直接使用Producer向Exchagne发送数据,这里可以测试多个Routing Key验证通配

     ......
         public void sendTopicMsg(MessageEntity messageEntity, String route) throws Exception {
         try {
             rabbitTemplate.setMessageConverter(XXX);
             rabbitTemplatesetExchange(environment.getProperties("XXX"));
             //路由绑定
             rabbitTemplate.setRoutingKey(route);
             //创建消息
             Message msg = ....
             rabbitTemplate.sendAndConvert(msg);
         } catch(Exception e) {
             ....
         }
     }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    之后消费者就是@RabbitListener中指定queues即可,消费容器工厂还是单个消费者即可

    测试的时候,可以测试输入不同的Routing Key,可以发现只要符合匹配规则,对应的消费者都能够消费消息

    确认消费机制 RPC

    RabbitMQ 是高性能、高可用的消息分布式中间件,不只是因为消息异步通信和业务模块解耦、 接口限流、消息延迟处理等功能, 重要的是其在消息发送过程中,可以保证消息成功发送、不会丢失、确认消费

    • 保证消息成功发送: 生产者的生产确认机制( Publisher的Confirm机制)
    • 消息不丢失、确认消费: 面向消费者确认消费而言
    消息高可用 – 确认消费

    分布式消息中间件RabbitMq本身是基于异步的消息处理, 前面的实例就可以知道: 生产者P将消息发送到RabbitMQ后不会知道消费者处理成功或者失败, 甚至都不知道是否有消费者来处理消息

    • 消息是否真正发送成功,P自认为发送成功,可是采用template发送时,绑定模型如果不存在,实际上是发送失败的 【之前的一个demo Cfeng甚至没有bind,但是P端仍然认为投递成功】
    • 由于特殊原因,RabbitMQ服务挂掉了,导致需要重启,但是这时队列中还有大量消息没有消费,可能重启过程中丢失
    • 消费者监听消息时,可能出现监听失败的问题,导致消息所在队列找不到消费者而不断重新入队,重复消费

    在这里插入图片描述

    但是在实际场景中,可能需要同步处理,同步等待服务端将消息处理完成之后再进行下一步的操作,相当于RPC(remote procedure Call 远程过程调用)

    RabbitMQ实现RPC机制:

    • P 发送消息,在消息头属性MessageProperties中设置属性 ReplyTo — Queue名称 【消费者处理完成后的消息发送到该Queue】,和correlationId: 此次请求的标识,消费者处理完成之后将此属性返还,P由此确认消息是否成功执行
    • 消费者收到消息进行处理
    • 处理完成生成应答消息replyTo指定的Queue,同时带上correlationId
    • P 订阅了replyTo指定的Queue,从中收到应答消息,根据correlationId确定执行情况
    消息生产确认

    为了确认消息是否真的发送成功,RabbitMQ要求生产者在发送消息后进行发送确认,代表消息成功发出,配置的规则很easy,

    就是配置Template时设置PublisherConfirms,和其反馈信息Returns都为True,同时给出ConfirmCallBack作为发送成功的处理

    publisher-confirm-type: correlated  #发送消息之后进行确认  发送到交换机触发回调 publisher-confirm 废弃
        publisher-returns: true  #发送消息后返回确认信息
    
    rabbitTemplate.setConnectionFactory(connectionFactory);
            //发消息如果成功,给出反馈信息
            rabbitTemplate.setConfirmCallback((correlationData, b, s) -> log.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,b,s));
    
            //发送消息失败,给出反馈
            rabbitTemplate.setReturnsCallback(returnedMessage -> log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message({})",returnedMessage.getExchange(),returnedMessage.getRoutingKey(), returnedMessage.getReplyCode(),returnedMessage.getReplyText(),returnedMessage.getMessage()));
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    confirm为成功,returns为失败,这样P就可以知道消息是成功发送成功

    rabbitMQ宕机、消息积压 — 持久化

    rabbitMQ可能出现消息积压的情况 — Producer生产能力强,Consumer宕机、Consumer消费能力弱

    消息积压最简单的方法就是增加rabbitMQ机器的数量,如果不行,那就手工增加消费能力 — 并发消费【多个消费者消费,同时设置消费最大数量】 ,还可以再开一个消费者,将MQ的消息全部录入数据库,后续处理

    而RabbitMQ本身为保证消息不丢失,建议创建队列、交换机时设置持久化参数durable为true, 也就是durable参数取值为true

    @Bean(name = "basicQueue")
        public Queue basicQueue() {
            return new Queue(Objects.requireNonNull(environment.getProperty("spring.rabbitmq.queue-name")),true);
        }
    
        @Bean
        public DirectExchange basicExchange() {
            return new DirectExchange(environment.getProperty("spring.rabbitmq.exchange-name"),true,false);
        }
    
    
    properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    持久化之后,RabbitMQ服务器崩溃重启操作,队列、交换机依然存在并且消息不会丢失

    但是可能发生RabbitMQ收到消息还未持久化,就宕机了【和数据库一样】,那么这里也可以采用相似的方式 : 事务

    AMQP的重要特点 – 支持事务, 如果生产者将持久化消息发送给服务器,consume命令本身无Response返回,为了应对发出消息服务器崩溃,可以预先通过

    • txSelect()开启事务
    • txCommit() 提交事务
    • txRollBack() 回滚事务
    消息不重复消费 — ACK模式

    为了保证消息不重复消费、能够准备被消费,提供了消息确认机制,ACK模式

    Ack — acknowledge key确认字符,接收方在收到报文后,若未发现错误,就给方法放确认回答ACK,表明信息正确接收

    TCP报文的控制位就有一个ACK,当然AMQP协议中基于的信道Channel,不是TCP

    为了提高信息的可用性、防止消息重复消费, 一般都会使用消息确认机制 — 当消息确认消费后,消息才会从队列中移除

    RabbitMQ提供的取人模式: AcnowledgeMode : NONE、MANUAL手动,AUTO自动,电商平台支付的支付金额、游戏充值的消息提示是必须进行确认消费的,不然就不够严谨可靠

    NONE – 无需确认

    生产者发送消息到队列,消费者消费之后不给出任何反馈消息, 一股脑发送即可,不需要考虑丢失等异常,就像UDP的感觉, 这样处理流程更短,更快一点,但是实际生产中很少使用

    类似用户禁用APP更新提醒,P发送消息提示更新,用户消费之后并不会提示已消费

    AUTO — 自动确认

    生产者发送消息到队列,消费者监听消息之后,需要发送一个AUTO ACK的反馈给服务器, 之后消息出队

    RabbitMQ自动触发, 依靠的是RabbitMQ的内置组件

    Producer ---msg--> Exchange---bind-->Queue <--listen--- Consumer
    	|___RabbitMQ服务器 --- AUTO ACK <---- rabbitMQ 内置组件---|
    
    • 1
    • 2

    而确认消费的模式,是在容器工厂配置中配置setAcckownledgeMode即可, 之前为了流程简单,模式设置的为NONE

    @Bean(name = "multiListenerContainer")
        public SimpleRabbitListenerContainerFactory multiListenerContainer() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            //设置连接工厂,可以使用configure对象配置
            factoryConfigurer.configure(factory,connectionFactory);
            factory.setMessageConverter(new Jackson2JsonMessageConverter());
    
            //消息的确认消费模式,先设置为NONE,表示不需要确认消费
            factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    而这里的交换机模式选择Direct,也就是最初是演示使用的,路由模式

    生产者还是BasicProducer, 确认消费模式,主要的变化是在消费者端, 但是这里是自动消费, 消费者的@RabbitListener中的container就会指定容器工厂,设置了Auto

    当然,也可以直接在配置文件中指定:

    spring:
    	rabbitmq:
    		listener:
          		direct:
           		 acknowledge-mode: auto
    
    • 1
    • 2
    • 3
    • 4
    • 5

    和之前的不同就是增加了ACK确认,重新执行代码

    [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)
    
    • 1

    可以看到ack为true,也就是确认消费

    MANUAL 手动确认

    人工手动确认消息消费,生产者发送消息到队列后,消费者监听到该消息时需要 手动以代码方式发送一个ACK 给服务器,之后消息出队,告知P 成功消费

    Producer ---msg--> Exchange---bind-->Queue <--listen--- Consumer
    	|___RabbitMQ服务器 --- ACK   信息 <---- 人为手动ACK  ---|
    
    • 1
    • 2

    手动确认 需要在消费者 消费 消息逻辑 之后 编写确认消费的逻辑, 消息出队,避免消息重复投入队列而重复消费, 还是使用Direct模式交换机

    和AUTO不同, MANUAL需要指定容器Container,并且当容器监听的队列和监听的Consumer

    /**
         * 配置监听容器
         * 消费者监听实例
         */
        @Resource
        private BasicConsumer basicConsumer;
    
        @Bean(name = "simpleContainerManual")
        public SimpleMessageListenerContainer simpleMessageListenerContainer(@Qualifier("basicQueue") Queue basicQueue) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            //消费实例配置
            container.setConcurrentConsumers(1);
            container.setMaxConcurrentConsumers(1);
            container.setPrefetchCount(1);
    
            //消息确认模式
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    
            container.setQueues(basicQueue);
    
            //手动确认消费 消费者需要实现ChannelAwareMessageListener接口
            container.setMessageListener(basicConsumer);
    
            return container;
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    这里的消费者需要实现ChannelAwareMessageListener接口

    @Component
    @Slf4j
    @RequiredArgsConstructor
    public class BasicConsumer implements ChannelAwareMessageListener {
    
        private final ObjectMapper objectMapper;
    
        /**
         * @param message
         * @param channel 通道实例
         * @throws Exception
         */
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            //这里就是rabbit的一个Message监听器,可以监听到消息
            MessageProperties properties = message.getMessageProperties();
            //获取消息分发的全局标识
            long deliveryTag = properties.getDeliveryTag();
            try {
                byte[] msg = message.getBody();
                //解析消息
                String entity = objectMapper.readValue(msg,String.class);
                //log
                log.info("确认消费模式 - 人为手动确认消息: {}", entity);
                //执行业务逻辑后,手动ACK, deliverTag 为全局分发标识(唯一)  是否允许批量消费: 这里设置为true
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                log.info("人为手动确认消费- 监听到消息: {}", e.fillInStackTrace());
                //发生异常,拒绝ACK
                channel.basicReject(deliveryTag,false);
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    基于MANUAL确认消费模式 需要在执行完实际的业务逻辑业务之后,手动调用相关方法比如BasicAck,Reject,当处理过程发生异常,需要执行确认消费, 避免消息一直留在队列中,导致重复消费

    INFO 2268 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [192.168.204.100:5672]
    2022-09-17 16:56:38.394  INFO 2268 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: rabbitConnectionFactory#28fc1132:0/SimpleConnection@3c82bac3 [delegate=amqp://cfeng@192.168.204.100:5672/, localPort= 54514]
    2022-09-17 16:56:38.477  INFO 2268 --- [           main] c.server.RabbitmqBasicMessageTest        : Started RabbitmqBasicMessageTest in 5.256 seconds (JVM running for 6.762)
    
    2022-09-17 16:56:39.102  INFO 2268 --- [           main] c.s.rabbitmq.producer.BasicPublisher     : 基本模型: 生产者发送消息: kiss
    2022-09-17 16:56:39.116  INFO 2268 --- [nectionFactory1] c.server.config.RabbitmqConfig           : 消息发送成功:correlationData(null),ack(true),cause(null)
    2022-09-17 16:56:39.121  INFO 2268 --- [ntainerManual-1] c.s.rabbitmq.consumer.BasicConsumer      : 确认消费模式 - 人为手动确认消息: kiss
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    用户登录成功 写入日志

    用户登录操作,作为常规系统的基础操作,一些应用需要跟踪用户的登录、操作轨迹,需要记录用户的登录的日志,将相关的记录录入数据库

    用户登录是主流程, 记录用户登录日志是辅助流程, 所以实际过程中,记录用户登录日志操作不应该影响用户登录,也就是不应该是同步的,因为再次数据库操作耗时; 这个时候就可以RabbitMQ进行解耦

    在这里插入图片描述

    常规操作也可以采用之前抢红包操作录入数据库的@Async; 整体业历程:包括 登录模块 和 日志记录模块, 这两个模块不应该相互影响,异步操作

    这里为了模拟整个流程,就大概先分析一下思路,首先网站的用户登录成功进入后台的判断逻辑,判断之后就发送一个消息,借助RabbitTemplate即可,发送之后,日志监听消费者监听到消息获取相关的登录信息录入数据库, 这里采用Direct交换机, 手动ACK

    这里也就不完整写代码了,没什么特别的地方,这里Cfeng就简单写一些代码说明整个流程【 结合之前的Cfeng.net 使用的Security】

    //首先就是用户信息, 就是之前的ChatUser -- 网站用户
    //之后封装一个登录日志实体类  SysLog   包括id、用户id、用户操作所属模块、操作的数据、备注、操作时间等
    
    之前Security登录成功后设置的SuccessfulURL, 现在因为需要登录成功后进行后处理操作,所以这里需要自定义SuccessfulHandler, 在其中调用  LogPublisher 将当前用户的登录信息发布到Exchange  ; 之后再跳转进入成功的欢迎页面
    
    LogPbulisher的操作就是封装对象类型的消息, MessageProcessor就可以完成,指定消息头的java type为SysLog
    
    在rabbitMQ中配置Direct交换机, 配置日志系统的消费的队列LogQueue, 并且指定日志系统的消费者LogConsumer,配置相关的监听容器container,指定队列和消费者, LogConsumer实现Listener接口 【确认消费】, 在message方法中进行数据库的写入操纵, 当然需要try catch, 如果出现异常手动Direct, 正常就ACK,消息出队避免重复消费,导致数据库消息异常
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    如果有相关friend好奇,我也可以详细说一下具体的代码🌳

  • 相关阅读:
    【uniapp】引入uni-ui组件库
    JavaEE——No.2 套接字编程(TCP)
    MySQL是怎样运行的:从根儿上理解MySQL | B+树索引以及索引优化
    区块链实训教程(6)--开发、编译、部署、调用HelloWorld合约
    I2C通信协议
    14、ffmpeg中进进行硬件编码和解码的片段程序_cuda进行rgb2yuv和yuv2rgb
    m序列生成器的Matlab实现
    基于 NNCF 和 Optimum 面向 Intel CPU 对 Stable Diffusion 优化
    Python open with as---文件处理
    南卡电容笔和益博思哪个更好用?平板电脑值得入手电容笔对比
  • 原文地址:https://blog.csdn.net/a23452/article/details/126908753