• RabbitMQ学习(1)


    1. 消息队列

    1. 什么是MQ

    本质上是个队列,队列中存放的是message,还是一种跨进程的通信机制,用于上下游传递信息。是一种“逻辑解耦+物理解耦”的消息通信服务。使用MQ之后,消息发送上游只需要依赖MQ,不用关注其他服务

    2. 为什么要是用MQ

    a. 流量消峰

    b. 应用解耦

    c. 异步处理

    3. MQ分类

    a. ActiveMQ

    单机吞吐量万级,时效性ms级,可用性高,基于主从架构实现,可靠性较高。但是高吞吐量场景较少使用

    b. Kafka

    百万级TPS的吞吐量,分布式的,一个数据多个副本,少数机器宕机不会丢失数据,有第三方web管理界面。主要特点是基于Pull模式来处理消息消费,追求高吞吐量,用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。

    c. RocketMQ

    出自alibaba,用Java实现,单机吞吐量十万级,可用性高,分布式架构。消息做到0丢失,MQ功能较为完善,扩展性较好。常用于金融互联网领域,尤其在电商里面的订单扣款,以及业务消峰,在大量交易涌入时,后端可能无法及时处理,使用RocketMQ在稳定性上更为可靠

    d. RabbitMQ

    MQ功能比较完善,健壮,稳定,易用,跨平台,支持多种语言;开源提供的管理界面,更新频率相当高

    4. 四大核心概念

    a. 生产者

    产生数据发送消息的程序

    b. 交换机

    接受来自生成者的消息,将消息推送到队列中。交换机必须要知道如何处理他接收到的消息,将这些消息推送到特定队列还是推送到多个队列;又或者是将消息丢弃,由交换机类型决定

    c. 队列

    本质上是一个大的消息缓冲区,队列仅受主机的内存和磁盘限制的约束,许多生产者可以将消息发送到一个队列,许多消费者可以从一个队列接受数据

    d. 消费者

    消费者大多是等待接受消息的程序,消费者和消费中间件在很多时候并不在同一机器上,同一个应用程序既可以是生产者又可以是消费者

    5. MQ工作原理

    请添加图片描述
    Broker:接受和分发消息的应用,RabbitMQ Server就是Message Broker

    Virtual host: 当多个用户使用同一个RabbitMQ Server提供服务时,可划分出多个vhost,每个用户在自己的vhost上创建exchange/queue等

    Connection: publisher/consumer和broker之间的TCP连接

    Channel:在connection内部建立的逻辑连接,如果支持多线程,每个thread创建单独的channel进行通讯,完全隔离,减少了建立TCP connection的开销

    Exchange:交换机,根据分发规则,匹配查询表中的routing key,分发消息到队列中。常用的类型有:direct,topic(publish-subscribe)以及fanout(multicast)

    Queue:消息最终被送到队列中等待被消费者取走

    Binding:exchange和queue之间的虚拟连接,可以包含routing key,被保存到exchange中的查询表中,用于message的分发

    6. Linux安装

    针对CentOS7,需要安装对应的elang和rebbitmq server安装包

    请添加图片描述

    • rpm -ivh erlang-23.2.3-1.el7.x86_64.rpm

    • yum install socat -y(yum命令安装socat依赖)

    • rpm -ivh rabbitmq-server-3.8.11-1.el7.noarch.rpm

    • 添加开机自启RabbitMQ服务(可不做):chkconfig rabbitmq-server on

    • 启动服务:/sbin/service rabbitmq-server start

    • 查看服务状态:/sbin/service rabbitmq-server status
      请添加图片描述

    • 停止RabbitMQ服务:/sbin/service rabbitmq-server stop

    • 停止MQ服务后再开启web管理插件 rabbitmq-plugins enable rabbitmq_management
      再次启动RabbitMQ服务,使用默认账号密码(guest)访问:http://主机IP:15672(如不能访问,需要先关闭防火墙)
      请添加图片描述

    • 需要添加一个新用户并且设置权限完成登录

    创建账号:rabbitmqctl add_user admin hz1234

    设置用户角色:rabbitmqctl set_user_tags admin administrator

    设置用户权限(在vhost/下设置配置,读写权限)
    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

    列举当前用户和角色信息:rabbitmqctl list_users

    • 使用新账户完成登录
      请添加图片描述

    2. MQ简单模式Hello World

    1. 创建maven工程,导入依赖
     <dependencies>
            <!--rabbitmq的依赖客户端-->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.8.0</version>
            </dependency>
            <!--操作IO流的依赖-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>
        </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    1. 生产者代码
    /**
     * 生产者 发送消息
     */
    public class Producer {
    
        public static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) {
            // 创建RabbitMQ连接工厂 设置工厂IP,用户名以及密码
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("linux1");
            factory.setUsername("admin");
            factory.setPassword("hz1234");
            // 创建连接 获取信道channel 生成队列
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                // 参数列表: 队列name
                // 队列中的消息是否持久化(默认将message存储在内存中)
                // 该队列是否允许多个消费者消费(默认不允许)
                // 是否自动删除,最后一个消费者断开连接之后,queue是否自动删除(true表示自动删除)
                // 其他参数
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World";
                // 参数列表:
                // 发送到哪个交换机(交换机名String)
                // 路由的key值是哪一个(queue name)
                // 其他参数
                // 消息体的字节类型
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
            System.out.println("message transmit is over...");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    请添加图片描述

    1. 消费者代码
    public class Consumer {
    
        public static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 创建RabbitMQ连接工厂 设置工厂IP,用户名以及密码
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("linux1");
            factory.setUsername("admin");
            factory.setPassword("hz1234");
            // 创建连接 获取信道channel 生成队列
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 成功接受消息的回调函数(lambda表达式)
            DeliverCallback callback = (tag, message) -> {
                System.out.println(new String(message.getBody()));
            };
            // 消息接受失败的回调函数
            CancelCallback cancelCallback = tag -> {
                System.out.println("消息被中断...");
            };
            // 接受消息basicConsume参数列表:
            // 消费哪个队列
            // 是否需要自动应答true(手动应答false)
            // 消息接受成功的反馈
            // 消息接受失败的反馈
            channel.basicConsume(QUEUE_NAME, true, callback, cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    3. MQ工作模式Work Queues

    Work Queues任务队列的主要思想是避免立即执行资源密集型任务,而不得不等待他完成。将任务封装成消息并将其发送到队列,在后台运行的工程进程将弹出任务并最终执行作业,有多个工作线程时,这些工作线程将一起处理这些任务

    1. 轮询分发消息

    启动三个工作线程(consumer),一个消息发送线程(producer),注意同一个消息只能被某一个工作线程处理一次,不能被处理多次

    1. 工作线程
    /**
     * 工作线程1:消费者
     */
    public class Worker1 {
    
        public static final String QUEUE_NAME = "Work Queues--轮询分发";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMQUtils.getChannel();
            // 接受消息的反馈
            DeliverCallback receiveCallback = (tag, message) -> {
                System.out.println("receive message:" + new String(message.getBody()));
            };
            // 取消接受的回调
            CancelCallback cancelCallback = tag -> {
                System.out.println(tag + "message cancel receive");
            };
            System.out.println("W1等待接受消息...");
            channel.basicConsume(QUEUE_NAME, true, receiveCallback, cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    1. 抽取连接工厂的工具类
    /**
     * 连接工厂创建信道的工具类
     */
    public class RabbitMQUtils {
    
        public static Channel getChannel() throws Exception{
            // 创建RabbitMQ连接工厂 设置工厂IP,用户名以及密码
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("linux1");
            factory.setUsername("admin");
            factory.setPassword("hz1234");
            // 创建连接 获取信道channel 生成队列
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            return channel;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. 生产者
    public class Prod {
    
        public static final String QUEUE_NAME = "Work Queues--轮询分发";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            Scanner scanner = new Scanner(System. in);
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("发送消息:" + message);
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    1. IDEA更改Worker1的配置,允许并行允许
      请添加图片描述
    2. 启动三个工作线程,再启动生产者,查看工作线程是否是轮询的处理不同的消息

    2. 消息应答

    消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个时间较长的任务只完成部分突然就宕机了。而RabbitMQ向消费者传递一条消息,立即将这条消息标记为删除,因此会丢失正在处理中的消息。

    为了保证消息在发送过程中不发生丢失,RabbitMQ引入消息应答机制,消息应答就是:消费者在接收到消息并且处理完该消息之后,告诉RabbitMQ他已经处理完成,此时RabbitMQ可以将这条消息进行删除

    a. 自动应答

    消息一旦发送立即被认为已经传送成功,由于消费者可能出现连接channel关闭,那么消息就丢失了,而生产者传递的消息数量并没有限制,可能会导致消费者这边还没来得及处理,消息的积压造成内存耗尽,最终这些消费者进程被系统杀死。该模式仅适用于消费者可以高效并以某种速率能够处理这些消息的情况下使用

    b. 手动应答

    手动应答的优点:可以批量应答并且减少网络拥堵

    • Channel.basicAck (tag, true)(用于肯定确认):第二个参数如果设置为true表示批量应答channel上未应答的消息,如当前tag=1,2,3,4,此时来到4,之前的都会被应答;反之只有当前时刻tag=4会被应答
    • Channel.basicNac(long var1, boolean var3, boolean var4)(用于否定确认)
    • Channel.basicReject(long var1, boolean var3)(用于否定确认)

    c. 消息重新入队

    如果消费者由于某些原因失去连接,导致消息未发送ACK确认,MQ知道该消息未被处理,将对其重新排队。如果此时其他消费者可以处理,将消息重新分发

    1. 生产者代码
    public class autoAskProd {
    
        public static final String TASK_QUEUE_NAME = "ack_queue1";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
    
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                String msg = scanner.next();
                channel.basicPublish("", TASK_QUEUE_NAME, null, msg.getBytes("UTF-8"));
                System.out.println("producer send message: " + msg);
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    1. 消费者1代码
    public class autoAskConsumer1 {
        public static final String TASK_QUEUE_NAME = "ack_queue1";
    
        public static void main(String[] args) throws Exception {
    
            Channel channel = RabbitMQUtils.getChannel();
            System.out.println("W1等待1s之后接受消息...");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String msg = new String(delivery.getBody(), "UTF-8");
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("接收到消息: " + msg);
                // 接收到之后进行手动应答 basicAck(long var1, boolean var3)
                // 第一个参数表示消息标记tag
                // 第二个参数设置为false,代表只应答接收到的当前消息,为true则批量应答
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            // 手动应答设置为false
            boolean autoAsk = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, consumerTag -> {});
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    1. 消费者2代码
    public class autoAckConsumer2 {
    
        public static final String TASK_QUEUE_NAME = "ack_queue1";
    
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMQUtils.getChannel();
            System.out.println("W2等待10s之后接受消息...");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String msg = new String(delivery.getBody(), "UTF-8");
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("接收到消息: " + msg);
                // 接收到之后进行手动应答 basicAck(long var1, boolean var3)
                // 第一个参数表示消息标记tag
                // 第二个参数设置为false,代表只应答接收到的当前消息,为true则批量应答
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
            // 手动应答设置为false
            boolean autoAsk = false;
            channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, consumerTag -> {});
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    测试,当轮序接受消息时,由于消费者2等待时间较长,如果在等待期间消费者2宕机了,那么消费者2收到的消息不会丢失,MQ将其重新放入队列,交由消费者1进行处理。
    请添加图片描述
    请添加图片描述请添加图片描述

    3. MQ持久化

    a. 队列和消息持久化的实现

    之前创建的队列都是非持久化的(只存储在内存中),MQ一旦重启,队列就会被删除。

    如何保证MQ服务停止后消息生成者发送的消息不丢失?
    在声明队列时将durable参数设置为true和消息都标记为持久化

    注意:如果之前声明的队列是非持久化的,需要删除原队列,或者新建一个持久化队列,否则会报错

    // 队列持久化
    boolean durable = true;
    channel.queueDeclare(TASK_QUEUE_NAME, durable, false, false, null);
    // 消息持久化 MessageProperties.PERSISTENT_TEXT_PLAIN告诉MQ将消息存到磁盘
    channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("UTF-8"));
    
    • 1
    • 2
    • 3
    • 4
    • 5

    b. 不公平分发

    MQ分发消息采用的是轮询方式,如果某个消费者处理消息速度很快,某个消费者处理消息很慢,就会导致某个消费者大部分时间处于空闲,有的则一直在忙碌。为了避免这种情况,在消费者方设置参数 channel.basicQos(1);

    // 设置不公平分发(默认为0轮询分发)
    int prefetchCount=1;
    channel.basicQos(prefetchCount);
    // 手动应答设置为false
    boolean autoAsk = false;
    channel.basicConsume(TASK_QUEUE_NAME, autoAsk, deliverCallback, consumerTag -> {});
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    c. 预取值

    消息发送是异步的,那么channel中肯定不止一个消息来自其他消费者的手动确认,存在一个未确认的消息缓冲区需要限制缓冲区的大小,避免缓冲区中无限制的未确认消息问题。

    使用basicQos()方法设置预取计数值来完成,定义channel上允许的未确认消息的最大数量。通常,增加预取值会提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是已传递但未处理的消息也会增多,从而增加了消费者的RAM消耗。

    修改两个消费者中的prefetchCount预取值,较慢的为2,处理快的为10

    请添加图片描述

  • 相关阅读:
    Flink代码单词统计 ---批处理
    微信小程序多端应用 Donut 多端编译
    Linux-编译器
    SIMULIA-Simpack 2022x新功能介绍
    【ES6】阮一峰ES6学习(一) let、const、解构赋值
    我的创作纪念日,成为创作者的第1024天。| ⑰【知识图谱·跨模态·自监督学习SSL】知识增强型推荐系统,跨模态对比学习方案 | 附:分享两款好用的WOS文献阅读插件
    AC-PEG-NH2,Acrylate-PEG-Amine,丙烯酸酯PEG氨基含有PEG间隔基
    【JavaScript进阶之旅 ES6篇 第七章】箭头函数的实质、箭头函数this指向、箭头函数的使用场景
    豆瓣评分9.0!《Java核心技术与面试》神作,已助1374人拿到Offer
    [数据集][目标检测]航空发动机缺陷检测数据集VOC+YOLO格式291张4类别
  • 原文地址:https://blog.csdn.net/m0_45971439/article/details/125429000