• RabbitMQ(消息队列)


    RabbitMQ

    它是消息中间件,是在消息的传输过程中保存消息的容器,实现应用程序和应用程序之间通信的中间产品。目前主流消息队列通讯协议是AMQP(二进制传输,支持多种语言)、JMS(HTTP传输,只支持Java)。

    特点:每秒十万左右级别、消息延迟在微秒级、完整的消息确认机制、并发能力强、性能好。

    常见MQ

    • ActiveMQ基于JMS,每秒数万级别
    • RabbitMQ基于AMQP,每秒十万级别
    • RocketMQ是阿里的产品,基于JMS,每秒十万级别,经历过双十一
    • Kafka自定义协议,每秒百万级别

    体系结构

    分为:服务器、交换器、队列;

    服务器:负责管理所有的交换器和队列,一个RabbitMQ内有多个服务器,(为了避免每次发送消息都建立TCP连接,有了很多的服务器,每个线程建立单独的服务器进行通讯)每个服务器负责一部分交换器和队列,之间通过 HTTP 协议通信;

    交换机:负责接收、路由、传递消息,支持多种交换器类型,每个类型有不同的消息传递方式和使用场景;

    队列:负责存储消息,支持多种队列,都有不同的存储方式;

    安装:

    使用docker方式

    # 拉取镜像
    docker pull rabbitmq:3.8.12-management
    # 注意:在拉取镜像,遇到missing signature key问题,需要提升docker的版本
    
    # 运行容器
    # -d 参数:后台运行 Docker 容器
    # -e 参数:设置容器内的环境变量,这里我们设置了登录RabbitMQ管理后台的默认用户和密码
    docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=123456 rabbitmq:3.8.12-management
    
    # 启动成功,MQ的客户端页面,输入所设置的用户名和密码
    # 访问:http://xxx:15672
    
    # 如果访问不通,需要开放端口
    firewall-cmd --zone=plublic -add-pord=5672/tcp --add-pord=15672/tcp --permanent
    success
    firewall-cmd --reload
    success
    

    发送消息

    需要引包

    <dependency>
         <groupId>com.rabbitmqgroupId>
         <artifactId>amqp-clientartifactId>
         <version>5.20.0version>
    dependency>
    

    生产者

    // 生产者 - 产生消息
    public static void main(String[] args) throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("IP地址");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("123456");
        // 创建连接
        Connection connection = connectionFactory.newConnection();
        // 创建频道
        Channel channel = connection.createChannel();
        // 创建队列,参数:队列名称、是否定义持久化队列、是否独占本次连接、是否在不使用的时候自动删除队列、其他参数
        channel.queueDeclare("new_queue", true, false, false, null);
        String message = "发送的消息的内容:123";
        // 参数:交换机名称,默认Default Exchange、队列名称、配置信息、消息内容
        channel.basicPublish("", "new_queue", null, message.getBytes());
        // 关闭资源  
        channel.close();
        connection.close();
    }
    

    创建一个队列,并有消息待查看,点击该队列的名称,在Get messages处,可以查看该消息的信息;

    查看队列:

    在这里插入图片描述
    在这里插入图片描述
    消费者

    // 消费者 - 要消费消息
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("IP地址");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("123456");
        // 创建连接 Connection        
        Connection connection = factory.newConnection();
        // 创建频道  
        Channel channel = connection.createChannel();
        // 接收消息  
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 回调方法,当收到消息后,会自动执行该方法  
            // 参数:标识、获取一些信息,交换机等、配置信息、数据(消息内容)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag:" + consumerTag);
                System.out.println("Exchange:" + envelope.getExchange());
                System.out.println("RoutingKey:" + envelope.getRoutingKey());
                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        // 监听程序,用来监听消息,参数:队列、是否自动确认、回调对象  
        channel.basicConsume("new_queue", true, consumer);
    }
    

    如果有被消费者消费,会在管理页中,该队列的 Ready进行-1;

    工作模式

    简单模式

    生产者(只有一个)、消费者(只有一个)、消息储存在队列中;

    工作队列模式

    生产者(只有一个)、消费者(有多个)、消息储存在队列中;消费者谁抢到算谁的。

    发布订阅模式

    生产者(只有一个)、消费者(有多个)、交换机、多个队列;

    生产者把消息发送给交换机,交换机处理消息取决于交换机的类型,交换机根据类型把消费存在对应的队列中,消费者(多个)满足规则都可以得到消息;

    交换机有3种类型

    ➢ Fanout:广播,将消息交给所有绑定到交换机的队列

    ➢ Direct:定向,把消息交给符合指定routing key 的队列

    ➢ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列

    路由模式

    队列与交换机的绑定,使用Direct,消费者监听的队列,该队列与交换机绑定的路由件匹配,该消费者可以收到消息。其他消费者也监听该队列,但路由件不匹配,不会收到消息。

    查看交换机

    在这里插入图片描述

    交换机与队列的绑定

    在这里插入图片描述
    在这里插入图片描述

    主题模式(通配符匹配)

    这个与路由模式类似,只是这个支持通配符绑定,# 匹配一个或多个词,* 匹配不多不少恰好1个词。

    创建

    创建交换机

    在这里插入图片描述

    创建队列

    在这里插入图片描述

    队列与交换机绑定

    在这里插入图片描述

    整合SpringBoot

    引包

    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    

    更改配置文件

    spring: 
      rabbitmq: 
        host: IP地址
        port: 5672 
        username: guest 
        password: 123456 
        virtual-host: /
        # 来保证消息的可靠性
        publisher-confirm-type: CORRELATED # 交换机的确认
        publisher-returns: true # 队列的确认
    

    代码

    // 生产者
    @Autowired  
    private RabbitTemplate rabbitTemplate;
      
    public void testSend() {  
        rabbitTemplate.convertAndSend("交换机","路由键","消息内容");  
    }  
    // 消费者 (durable 是否持久化)
    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名字", durable = "true"),
                exchange = @Exchange(value = "交换机"),
                key = {"路由键"}
    ))
    public void process(String dateString,Message message,Channel channel) {
        log.info("消息内容:"+ dateString);
    }
    

    配置类

    生产者保障消息的可靠性

    生产者 - 保障消息是否发送到队列或者交换机

    @Component
    public class MQAckConfig 
        implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @PostConstruct
        public void init() {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnsCallback(this);
        }
    
        /**
         * 确认消息是否发送到交换机
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            if (ack) {
                log.info("消息发送到交换机 - 成功!数据:" + correlationData);
            } else {
                log.info("消息发送到交换机 - 失败!数据:" + correlationData + " 原因:" + cause);
            }
        }
    
        /**
         * 确认消息是否发送到队列 - 只有发送失败的时候才会调用该方法
         */
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            log.info("消息主体: " + new String(returned.getMessage().getBody()));
            log.info("应答码: " + returned.getReplyCode());
            log.info("描述:" + returned.getReplyText());
            log.info("消息使用的交换器 exchange : " + returned.getExchange());
            log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
        }
    
    }
    

    消费者保障消息的可靠性

    消费者 - 保障消息真的收到,改为手动确认ACK

    spring:
      rabbitmq:
        ...
        listener:
          simple:
            acknowledge-mode: manual # 把消息确认模式改为手动确认
    

    消费者手动ACK

    @RabbitListener( 
        // 设置绑定关系
        bindings = @QueueBinding(
            // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
            value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
            // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
            exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
            // 配置路由键信息
            key = {ROUTING_KEY}
    ))
    public void processMessage2(String dataString, Message message, Channel channel) throws IOException {
        // 注意: 重置消息,需要考虑幂等性
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("消费者 - 接收消息:" + dataString);
            // System.out.println(10 / 0);  // 手动制造出异常,让消息回到队列中
            channel.basicAck(tag, false);
        } catch (Exception e) {
            // 获取信息,查看此消息是否曾经被投递过
            Boolean redelivered = message.getMessageProperties().getRedelivered();
            if (!redelivered) {
                // 没有被投递过,那就重新放回队列,重新投递,再试一次
                channel.basicNack(tag, false, true);
            } else {
                // 已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝此消息,不会回到MQ队列中
                channel.basicReject(tag, false);
            }
        }
    }
    

    消费者-限流

    大量消息进入队列中,消息队列中消息有1万,设置每次最多从队列取回1000个消息,并发能力只能处理1000个请求,消费端-最多只处理1000个请求;

    限流配置

    spring:
      rabbitmq:
        ...
        listener:
          simple:
            acknowledge-mode: manual # 把消息确认模式改为手动确认
            prefetch: 10  # 设置消费者,每次最多从消息队列服务器取回多少消息,并不是一下把队列中的消息全部取走
    

    从队列中Ready,变成所设置的值,从此起到了限流的作用,消息者处理ACK个数也下降。

    在这里插入图片描述

    消息超时

    可在创建队列的时候,设置参数:x-message-ttl = 3000 (毫秒值),当我们生产者发送消息到队列,队列里的消息没有被消费者消费时,可通过队列里的消息超时时间,进行丢弃消息。

    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    MessagePostProcessor processor = (Message message) -> {
        // 设定超时时间,单位 (毫秒)
        message.getMessageProperties().setExpiration("7000");
        return message;
    };
    rabbitTemplate.convertAndSend("交换机名", "路由键", "生产者发送消息 - 消息超时 - " + processor);
    

    死信

    一个消息无法被消费,会变成死信;产生的原因:消费者拒绝:basicNackbasicReject这两个方法不把消息重新放回队列、队列溢出:队列里的消息达到数量的限制、消息超时:超时时间未被消费;

    解决:丢弃(一般不重要数据)、入库(记录日志、后续处理)、监听(进入死信队列,监听死信队列进行处理)

    前提准备:

    创建死信交换机、死信队列、死信路由键,互相绑定;

    创建正常交换机、队列、路由键,互相绑定;
    在这里插入图片描述

    // 前提把 死信、正常都创建好,绑定好
    // 监听正常队列
    @RabbitListener(queues = {‘正常队列’})
    public void processMessageNormal(String dateString, Message message, Channel channel) throws IOException {
        // 监听正常队列,但是拒绝消息,进行消息拒绝
        log.info("【正常】接受到消息:" + dateString);;
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
    // 监听死信队列
    @RabbitListener(queues = {‘死信队列’})
    public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
        // 监听死信队列
        log.info("【死信】接受到消息 = " + dataString);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
    

    延迟队列

    方案一:

    生产者发送消息,到队列中,该队列配置消息超时时间,并没有消费者进行监听该队列(进行监听消费),超时进入死信队列,(监听死信队列)就是延迟队列的一种配置。

    方案二:

    安装插件,默认消息存放最多2天

    地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

    下载插件的网址:https://www.rabbitmq.com/community-plugins.html

    事务

    该事务处理仅仅在java层面,生产者1发送消息,生产者2发送消息,保障其中有一个发送失败,都要失败,需要添加配置类:

    @Configuration
    @Data
    public class RabbitConfig {
    
        @Bean
        public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
            return new RabbitTransactionManager(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            rabbitTemplate.setChannelTransacted(true);
            return rabbitTemplate;
        }
    }
    

    惰性队列

    一般队列创建是默认,并不是惰性队列,惰性队列适用场景:在非常长的队列(百万条消息),生产者的速度超过消费者,消费者处理慢,使用惰性队列;它把消息放在队列中,并不是马上进行持久化操作,是在有空闲时,当队列达到百分之多少时,再进行数据持久化操作。

    // x-queue-mode 参数,可在插件安装配置
    @Queue(value = 队列名字, durable = "true", autoDelete = "false", arguments = {
    	@Argument(name = "x-queue-mode", value = "lazy")
    })
    

    队列消息优先级

    创建队列,如果这个值是0,代表优先级无效,设置的值优先级不能超过该值,优先级越高占用内存资源越多。
    在这里插入图片描述

    生产者配置

    @Resource
    private RabbitTemplate rabbitTemplate;
    
    // 生产者发送消息 1  - 优先级是1
    rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:1", message -> {
        message.getMessageProperties().setPriority(1);
        return message;
    });
    // 生产者发送消息 2  - 优先级是2
    rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:2", message -> {
        message.getMessageProperties().setPriority(2);
        return message;
    });
    // 生产者发送消息 3  - 优先级是3
    rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "生产者发送消息 - 演示优先级:3", message -> {
        message.getMessageProperties().setPriority(3);
        return message;
    });
    // 生产者发送消息1、消息2、消息3的顺序(先进先出)
    // 但消费者,消费优先消费:消息3、消息2、消息1 (改变了消费的顺序)
    
  • 相关阅读:
    共享虚拟主机可以处理多少流量?
    InternImage
    Jmeter 如何监控目标服务的系统资源
    [Cesium学习]
    ES7+知识点整理使用
    cmake交叉编译时链接到x86库的问题
    SpringBoot整合JWT、实现登录和拦截
    替代for循环,让Python代码更pythonic
    Linux+qt:获取.so自身的路径(利用dladdr)
    杭州市(个体劳动者)灵活就业人员参保与公积金缴纳操作手册
  • 原文地址:https://blog.csdn.net/yanshengren520/article/details/139971784