• 微服务入门到入土(08)-消息队列RabbitMQ


    RabbitMQ基础

    1.1 安装RabbitMQ

    这里只介绍基于CentOS7的docker单机部署。

    在线拉取

    docker pull rabbitmq:3-management
    
    • 1

    执行下面的命令来运行MQ容器:

    docker run 
    
    • 1

    -e RABBITMQ_DEFAULT_USER=itcast
    -e RABBITMQ_DEFAULT_PASS=123321
    –name rabbitmq
    –hostname mq1
    -p 15672:15672
    -p 5672:5672
    -d
    rabbitmq:3-management

    1.2 常见消息模型

    1.2.1 基本消息模型(BasicQueue)

    在这里插入图片描述

    2. 高级特性

    2.1 消息的可靠性

    可能发送消息丢失的几种情况:

    • 生产者发送消息,会先发送到交换机。
      • 消息还未到交换机就丢失了。
      • 消息到达交换机了,但是交换机还未转发给队列就丢失了。
    • 消息已到达队列。
      • MQ服务器宕机了,由于消息是存在内存中,所有服务器宕机消息就丢失了。
    • 消息已经发送出去
      • 消息以及发走,但是消费者还未来得急消费就宕机了,消息也会丢失。

    在这里插入图片描述

    因此要确保消息的可靠性,就需要做到以下几点:

    • 生产者确认机制
    • 消息持久化
    • 消费者确认机制
    • 消费失败重试机制

    2.1.1 生产者确认机制

    RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ后,会返回一个结果给发送者,表示消息是否处理成功。

    • publisher-confirm,发送者确认
      • 消息成功投递到交换机,返回ack
      • 消息为投递到交换机,返回nack
    • publisher-return,发送者回执
      • 消息投递到交换机了,但是没有路由到队列。返回ack,以及失败原因

    在这里插入图片描述

    注意:确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同的消息,避免ack冲突。

    SpringAMQP实现生产者确认

    1. 发送者服务配置如下:

      spring:
      rabbitmq:
      publisher-confirm-type: correlated
      publisher-returns: true
      template:
      mandatory: true

    • publisher-confirm-type: 开启publisher-confirm,支持两种类型。
      • simple: 同步等待confirm结果,直到超时
      • correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback。
    • publisher-returns:开启publisher-return功能,同样时基于callback机制,不过是定义ReturnCallback。
    • template.mandatory:定义消息路由失败时的策略。true:调用ReturnCallback;false:直接丢弃消息。
    1. 每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置:

      @Sjf4j
      @Configuration
      public class CommonConfig implements ApplicationContextAware {
      @Override
      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException{
      RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
      rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
      log.info(“消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}”,replyCode,replyText,exchange,routingKey,message.toString());
      });
      }
      }

    2.1.2 消息持久化

    消息默认实在内存中存储,MQ宕机的时候自定义的交换机和队列都会消失,消息自然而然也会丢失。交换机和队列在声明的时候可以指定持久化。

        @Bean
        public Queue orderSeckillOrderQueue() {
        	//四个参数分别为:队列名称、是否持久化、是否排他、是否自动删除
            return new Queue(MQConstant.ORDER_SECKILL_ORDER_QUEUE, true, false, false);
        }
    
        @Bean
        public Exchange orderEventExchange() {
        	//三个参数分别为:交换机名称、是否持久化、是否自动删除【true->当没有队列绑定的时候就会删除】
            return new TopicExchange(MQConstant.ORDER_EVENT_EXCHANGE, true, false);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    消息也可以进行持久化,

    Message message = MessageBuilder.withBody("message".getBytes(StandardCharsets.UTF_8))
                    .setDeliveryMode(MessageDeliveryMode.PERSISTENT) //持久化
                    .build();
    
    • 1
    • 2
    • 3

    注意:其实默认声明的交换机、队列和消息都是持久的,可以不用单独去配置。

    2.1.3 消费者确认机制

    在这里插入图片描述
    在这里插入图片描述
    手动确认消息代码

    @RabbitHandler
    public void listener(SeckillOrderTo seckillOrder, Channel channel, Message message) throws IOException {
        try {
            log.info("准备创建秒杀订单的详细信息。。。");
            orderService.createSeckillOrder(seckillOrder);
            //手动确认消息
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        } catch (IOException e) {
            //出现异常拒绝消息,重新放回队列,如果一直异常就会一直重试,需要优化
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    2.1.4 失败重试机制

    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    注意:此种失败重试机制只适合在消费者自动确认模式下使用,如果是手动确认的话,消息路由到error.queue后,在原队列里还会存在并且是Unacked状态。

    2.2 死信交换机

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    2.3 惰性队列

    2.3.1 消息堆积

    当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,就可能会成为死信,会被丢弃,这就是消息堆积的问题。

    解决消息堆积的三种思路:

    • 增加更多消费者,提高消费速度
    • 在消费者内开启线程池加快消息处理速度
    • 扩大队列容积,提高堆积上限

    2.3.2 惰性队列

    RabbitMQ从3.6.0版本开始就加入了Lazy Queues的概念,也就是惰性队列。
    惰性对了特性如下:

    • 接收到的消息直接存入磁盘,而不是内存
    • 消费者要消费消息时才会从磁盘中读取并加载到内存
    • 支持数百万条的消息存储

    注意
    普通队列,会先将消息存到内存,然后再刷到磁盘,存到磁盘的操作叫做Paged Out。
    而惰性队列,不会经过内存,会直接将消息存到磁盘,存到磁盘的操作叫做Paged Out。

    模仿普通队列高并发下接收消息:
    在这里插入图片描述

    模仿惰性队列高并发下接收消息:
    在这里插入图片描述

    如何设置惰性队列?
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    3. 集群

    在这里插入图片描述

    3.1 普通集群

    在这里插入图片描述

    3.2 镜像集群

    在这里插入图片描述
    设置镜像队列的方式:
    在随便一个节点上执行以下命令

    rabbitmqctl set_policy -p /test ha-all "^all." '{"ha-mode":"all"}'
    
    • 1

    -p /test 指定虚拟机
    ha-all 策略名称
    ^all. 正则表达式,代表所有以all开头的队列
    ha-mode:all 指定镜像队列的模式,这里是all,也就是集群中所有的节点都作为从节点。

    3.3 仲裁队列

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    docker exec -it rabbitmq cat /var/lib/rabbitmq/.erlang.cookie

    EZDJGLMTEWCVVAYLCLRD

    docker rm -f rabbitmq

    mkdir -p /mydata/rabbitmq
    cd rabbitmq
    touch rabbitmq.conf

    loopback_users.guest = false
    listeners.tcp.default = 5672
    cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config
    cluster_formation.classic_config.nodes.1 = rabbit@node1
    cluster_formation.classic_config.nodes.2 = rabbit@node2
    cluster_formation.classic_config.nodes.3 = rabbit@node3

    touch .erlang.cookie

    echo “EZDJGLMTEWCVVAYLCLRD” > .erlang.cookie

    chmod 600 .erlang.cookie

    docker run -e RABBITMQ_DEFAULT_USER=itcast -e RABBITMQ_DEFAULT_PASS=123321 --name rabbitmq --hostname mq1 -p 8081:15672 -p 8071:5672 -d rabbitmq:3.8-management

    docker run -d
    -v /mydata/rabbitmq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    -e RABBITMQ_DEFAULT_USER=hupp
    -e RABBITMQ_DEFAULT_PASS=mq123_
    –name mq1
    –hostname mq1
    -p 8071:5672
    -p 8081:15672
    rabbitmq:3.8-management

    docker run -d
    -v /mydata/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    -v /mydata/rabbitmq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    -e RABBITMQ_DEFAULT_USER=hupp
    -e RABBITMQ_DEFAULT_PASS=mq123_
    –name mq2
    –hostname mq2
    -p 8072:5672
    -p 8082:15672
    rabbitmq:3.8-management

    docker run -d
    -v /mydata/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    -v /mydata/rabbitmq/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie
    -e RABBITMQ_DEFAULT_USER=hupp
    -e RABBITMQ_DEFAULT_PASS=mq123_
    –name mq3
    –hostname mq3
    -p 8073:5672
    -p 8083:15672
    rabbitmq:3.8-management


    docker run -d --net mq-net 
    -v /mydata/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 
    -v /mydata/mq1/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie 
    -e RABBITMQ_DEFAULT_USER=hupp 
    -e RABBITMQ_DEFAULT_PASS=mq123_ 
    --name mq1 
    --hostname mq1 
    -p 8071:5672 
    -p 8081:15672 
    rabbitmq:3.8-management
    
    
    docker run -d --net mq-net 
    -v /mydata/mq2/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 
    -v /mydata/mq2/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie 
    -e RABBITMQ_DEFAULT_USER=hupp 
    -e RABBITMQ_DEFAULT_PASS=mq123_ 
    --name mq2 
    --hostname mq2 
    -p 8072:5672 
    -p 8082:15672 
    rabbitmq:3.8-management
    
    docker run -d --net mq-net 
    -v /mydata/mq3/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf 
    -v /mydata/mq3/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie 
    -e RABBITMQ_DEFAULT_USER=hupp 
    -e RABBITMQ_DEFAULT_PASS=mq123_ 
    --name mq3 
    --hostname mq3 
    -p 8073:5672 
    -p 8083:15672 
    rabbitmq:3.8-management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
  • 相关阅读:
    计算机科学速成课
    什么是Java虚拟机(JVM),它的作用是什么?
    [附源码]计算机毕业设计springboot学分制环境下本科生学业预警帮扶系统
    Linux 权限
    【机器学习Python实战】线性回归
    机器学习入门(五)回归问题中的曲线过拟合问题及解决
    【Linux常见指令1】
    word修改公式默认字体并打出漂亮公式
    Linux系统中如何更改用户密码,以及验证用户密码是否更改成功
    初学者需掌握的12条基本 Linux 命令
  • 原文地址:https://blog.csdn.net/m0_59092234/article/details/126577968