• 高并发高可用之RabbitMQ


    MQ应用场景

    1. 异步处理:替换多线程的方式实现异步操作
    2. 应用解耦:双方只需要写消息读消息,不需要关系对方的接口参数是否可能发生变更
    3. 流量削峰:高并发下让请求排队

    MQ的类型

    MQ的两种消息模型

    1. 点对点(point to point):一个消息只会有一个消费者通过争抢能拿到消息
    2. 发布订阅(pub/sub):一个消息所有订阅的消费者都能同时拿到消息

    MQ的类型

    1. JMS(Java Message Service):如ActiveMQ
    2. AMQP(Advanced Message Queuing Protocol):如RabbitMQ
      在这里插入图片描述

    RabbitMQ概念

    注意:发消息是发给交换机,收消息是监听队列。
    在这里插入图片描述
    一个连接有很多个信道,每个信道用于收发一个队列。
    虚拟主机可以用来环境隔离,如生产环境和开发环境隔离。

    Docker下安装RabbitMQ

    #启动容器(可以自动下载安装)
    docker run -p 1883:1883 -p 4369:4369 -p 5671:5671 \
    -p 5672:5672 -p 8883:8883 \
    -p 15672:15672 -p 25672:25672 \
    -d --name rabbitmq \
    -e RABBITMQ_DEFAULT_USER=admin \
    -e RABBITMQ_DEFAULT_PASS=admin \
    rabbitmq:management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述
    使用15672端口号访问MQ管理网页界面。

    Exchange交换机类型

    headers交换器和direct交换器完全一致但性能更差,所以几乎不用。常用交换机类型有:

    1. direct:点对点的消息模型,通过路由键把完全精准匹配的路由到指定队列。
    2. fanout:不匹配路由键,把消息广播到所有已绑定的队列。
    3. topic:通过路由键匹配进行广播到符合条件的一批队列。支持路由键通配符。

    (路由键通配符:#匹配0个或多个单词,*匹配一个单词)
    (交换机绑定队列时需要设置一个路由键且如果是topic类型支持设置带有通配符的路由键。客户端发送消息时也需要携带一个消息的路由键给交换机)

    SpringBoot整合RabbitMQ

    1. 引入maven依赖

      <dependency>
           <groupId>org.springframework.boot</groupId>
           <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      
      • 1
      • 2
      • 3
      • 4
    2. SpringBoot配置文件中配置

      spring.rabbitmq.host=192.168.239.135
      spring.rabbitmq.port=5672
      spring.rabbitmq.virtual-host=/
      spring.rabbitmq.username=admin
      spring.rabbitmq.password=admin
      
      • 1
      • 2
      • 3
      • 4
      • 5
    3. 启动类上添加注解@EnableRabbit

    4. 发送消息

      @Service
      public class OrderServiceImpl implements OrderService {
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          public void sendMessage(){
              //发送一个对象消息,默认会把对象序列化后发送,对象类必须实现Serializable接口
              Person person = new Person();
              person.setName("张三");
              person.setAge(20);
              rabbitTemplate.convertAndSend("exchange.direct","ycy.news",person);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13

      默认会把对象序列化后发送,但可配置为Json转换器:

      @Configuration
      public class RabbitMQConfig {
      
          /**
           * 注入一个把对象转成Json的消息转换器,无则使用默认序列化转换器
           * @return
           */
          @Bean
          public MessageConverter messageConverter(){
              return new Jackson2JsonMessageConverter();
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
    5. 接收消息
      @RabbitListener(queues = {"队列名"})监听哪些队列
      @RabbitHandler重载区分不同的消息类型

      @RabbitListener(queues = {"ycy.news"})
      @Service
      public class ConponServiceImpl implements ConponService {
      
      
          //监听方法参数必须和发送时为同一类型,不同微服务之间收发消息注意类型统一,可以封装一个专门用来传输消息的DTO实体类放在commmon模块下
          @RabbitHandler
          public void receiveMessage(Object object){
              System.out.println("receiveMessage1");
              System.out.println("object:"+object.getClass());
          }
      
          @RabbitHandler
          public void receiveMessage(Message message, Person person, Channel channel){
              System.out.println("receiveMessage2");
              System.out.println("message:"+message);
              System.out.println("person:"+person);
              System.out.println("channel:"+channel);
          }
      
          @RabbitHandler
          public void receiveMessage(Animal animal){
              System.out.println("receiveMessage3");
              System.out.println("animal"+animal);
          }
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27

    注:微服务如果做了集群,这时各个微服务节点会争抢消息,但一个消息只会被一个微服务节点获取到。

    RabbitMQ消息确认机制-可靠抵达

    保证消息不丢失,可靠抵达,可以使用事务,但性能下降250倍,为此引入了确认机制。
    在这里插入图片描述

    1. p->b

    配置文件中添加:

    spring.rabbitmq.publisher-confirms=true
    
    • 1

    RabbitMQ配置类中编写回调方法:

    @Configuration
    public class RabbitMQConfig {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * 注入一个把对象转成Json的消息转换器,无则使用默认序列化转换器
         * @return
         */
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
        
    
        /**
         * 定制rabbitTemplate
         * 注解@PostConstruct :RabbitMQConfig在创建完成后调用此方法
         */
        @PostConstruct
        public void initRabbitTemplate() {
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 * p->b
                 * 发送到broke时回调,确认消息是否到达
                 * @param correlationData 消息唯一关联数据
                 * @param ack 是否成功还是失败
                 * @param cause 失败原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("correlationData==" + correlationData);
                    System.out.println("ack==" + ack);
                    System.out.println("cause==" + cause);
                }
            });
    
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    发送消息的时候指定每个消息的ID:
    (后续可以记录到数据库中,定期扫描重发发送失败的消息)

     rabbitTemplate.convertAndSend("exchange.direct","ycy.news",person,new CorrelationData("12345678"));
    
    • 1
    1. e->q

    配置文件中添加:

    spring.rabbitmq.publisher-returns=true
    spring.rabbitmq.template.mandatory=true
    
    • 1
    • 2

    RabbitMQ配置类中编写回调方法:

    @Configuration
    public class RabbitMQConfig {
        @Autowired
        RabbitTemplate rabbitTemplate;
    
        /**
         * 注入一个把对象转成Json的消息转换器,无则使用默认序列化转换器
         * @return
         */
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    
        
    
        /**
         * 定制rabbitTemplate
         * 注解@PostConstruct :RabbitMQConfig在创建完成后调用此方法
         */
        @PostConstruct
        public void initRabbitTemplate() {
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                /**
                 * p->b
                 * 发送到broke时回调,确认消息是否到达
                 * @param correlationData 消息唯一关联数据
                 * @param ack 是否成功还是失败
                 * @param cause 失败原因
                 */
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                    System.out.println("correlationData==" + correlationData);
                    System.out.println("ack==" + ack);
                    System.out.println("cause==" + cause);
                }
            });
    
    
    
    
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * e->q
                 * 只有消息没有到达队列才回调
                 * @param message 失败消息
                 * @param replyCode 失败码
                 * @param replyText 回复内容
                 * @param exchange 发送的交换机
                 * @param routingKey 交换机key
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                    System.out.println("fail message==" + message);
                    System.out.println("replyCode" + replyCode);
                    System.out.println("replyText" + replyText);
                    System.out.println("exchange==" + exchange);
                    System.out.println("routingKey==" + routingKey);
                }
            });
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    此阶段常见的错误有:发送的消息找不到匹配的路由键

    1. q->c

    默认监听队列会自动ack签收,但是如果无法确定此消息是否被处理完成, 或者成功处理,我们可以切换成手动ack签收模式。
    确认签收成功消息才会从MQ队列中删除。
    配置文件中添加:

    #手动ack消息
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    
    • 1
    • 2

    在监听消息处理方法中手动签收:

    @RabbitHandler
        public void receiveMessage(Message message, Object object, Channel channel){
            System.out.println("receiveMessage2");
            System.out.println("message:"+message);
            System.out.println("object:"+object);
            System.out.println("channel:"+channel);
    
    
            //手动签收,非批量模式
            try {
                long deliveryTag = message.getMessageProperties().getDeliveryTag();
                channel.basicAck(deliveryTag,false);
            } catch (IOException e) {
                //网络中断
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    延时队列(更实时精准的定时任务)

    TTL(Time To Live)
    消息的TTL就是消息的存活时间。
    • RabbitMQ可以对队列和消息分别设置TTL。
    • 对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
    • 如果队列设置了,消息也设置了,那么会取小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。

    Dead Letter Exchanges(DLX)
    死信路由。一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列, 一个路由可以对应很多队列。
    • 被消费者拒收了
    • TTL过期了
    • 队列的长度限制满了

    延时队列实现方案:
    过期的消息会自动进入配置的死信路由,再由路由转到发真正的业务队列 即可实现延时队列的效果。

    ①队列过期的方式:
    在这里插入图片描述
    ②消息过期的方式:
    在这里插入图片描述

    推荐使用队列过期的方式:
    消息过期方式可能会有问题,如先进来一个5分钟过期的消息,又进来一个3秒过期的消息,那么第二个消息实际被仍出去是在5分零3秒的时候。而队列过期方式由于所有消息过期时间都相同就不会出现上述问题。

    延时队列代码实现:
    使用第一种方式的变种
    在这里插入图片描述

    1. 引入RabbitMQ依赖
      参照整合SpringBoot章节。

    2. 创建队列,交换机,绑定关系

      @Configuration
      public class MyMQConfig {
      
          /**
           * 创建过期队列
           * @return
           */
          @Bean
          public Queue orderDelayQueue(){
              //配置过期时间和死信交换机和死信路由键
              Map<String, Object> arguments = new HashMap<>();
              arguments.put("x-dead-letter-exchange","order-event-exchange");
              arguments.put("x-dead-letter-routing-key","order.release.order");
              arguments.put("x-message-ttl",60000);
      
              Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
              return queue;
          }
      
          /**
           * 创建普通队列
           * @return
           */
          @Bean
          public Queue orderReleaseOrderQueue(){
              Queue queue = new Queue("order.release.order.queue", true, false, false);
              return queue;
          }
      
          /**
           * 创建交换机
           * @return
           */
          @Bean
          public Exchange orderEventExchange(){
              return new TopicExchange("order-event-exchange",true,false);
          }
      
      
      
          /**
           * 交换机绑定过期队列
           * @return
           */
          @Bean
          public Binding orderCreateOrderBinding(){
              return new Binding("order.delay.queue",
                      Binding.DestinationType.QUEUE,
                      "order-event-exchange",
                      "order.create.order",
                      null);
          }
      
          /**
           * 交换机绑定普通队列
           * @return
           */
          @Bean
          public Binding orderReleaseOrderBinding(){
              return new Binding("order.release.order.queue",
                      Binding.DestinationType.QUEUE,
                      "order-event-exchange",
                      "order.release.order",
                      null);
          }
      
      
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19
      • 20
      • 21
      • 22
      • 23
      • 24
      • 25
      • 26
      • 27
      • 28
      • 29
      • 30
      • 31
      • 32
      • 33
      • 34
      • 35
      • 36
      • 37
      • 38
      • 39
      • 40
      • 41
      • 42
      • 43
      • 44
      • 45
      • 46
      • 47
      • 48
      • 49
      • 50
      • 51
      • 52
      • 53
      • 54
      • 55
      • 56
      • 57
      • 58
      • 59
      • 60
      • 61
      • 62
      • 63
      • 64
      • 65
      • 66
      • 67
      • 68

      注意:只有先编写监听队列方法后才会触发创建队列交换机等操作

    3. 收发消息,测试延时队列

      @Service
      public class OrderServiceImpl implements OrderService {
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          public void sendMessage(){
              //发送一个对象消息,默认会把对象序列化后发送,对象类必须实现Serializable接口
              Person person = new Person();
              person.setName("张三");
              person.setAge(20);
              rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",person,new CorrelationData("12345678"));
          }
      
      
          @RabbitListener(queues = "order.release.order.queue")
          public void receiveMessage(Person person){
              System.out.println("延时触发:"+person);
          }
      }
      
      • 1
      • 2
      • 3
      • 4
      • 5
      • 6
      • 7
      • 8
      • 9
      • 10
      • 11
      • 12
      • 13
      • 14
      • 15
      • 16
      • 17
      • 18
      • 19

    高并发高可用下的问题

    1. 消息丢失
      见可靠抵达章节。
    2. 消息重复
      消费者的消费方法设计编写为幂等性方法。或新建一个数据库防重表来判断是否被消费过。
    3. 消息积压
      增加消费者。或者先把消息批量取到数据库,离线慢慢消费。

    RabbitMQ集群

    RabbitMQ 中的节点类型:

    • 内存节点(RAM node):内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快。
    • 磁盘节点(Disk node):将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息

    集群中至少要有一个Disk节点。

    RabbitMQ 集群的两种模式:

    1. 普通集群(默认)
      该模式还是会造成单点故障,无法保证高可用。消息只会存在集群中的一个节点,且不会同步到其它队列。对于消费者,如果消息进入A节点,当从B节点拉取时,MQ会将消息从A中取出,并经过B发送给消费者。
      该模式适用于消息无需持久化的场景,如日志队列。

    2. 镜像集群
      集群中的节点会自动同步消息数据,有一套选举算法,1个master,n个slaver,生产者消费者的请求都会转至master。
      缺点:若镜像节点过多且消息体量大,集群内部同步数据会消耗大量网络带宽

    镜像集群也是基于普通集群,即只有先搭建普通集群然后才能设置镜像集群。
    若消费过程中,master挂了,则选举新master,若没来得及确认,可能会重复消费

  • 相关阅读:
    Android—PMS: installPackagesLI
    为什么会出现,HR到处抱怨招不到测试员,测试员到处抱怨市场饱和,找不到工作?
    05_TCP并发服务器
    Unity——导航系统补充说明
    弹跳的小球
    开源任务调度框架
    插入排序/折半插入排序
    springboot高校学生健康档案管理系统java ssm
    分析各种表达式求值过程
    2023贵州财经大学计算机考研信息汇总
  • 原文地址:https://blog.csdn.net/m0_48268301/article/details/125460866