• 微服务学习计划——消息队列


    微服务学习计划——消息队列

    我们在微服务中一个命令会逐渐调用各个微服务,但如果一一调用不仅需要微服务实时同步交互还会浪费效率

    所以我们通常会采用MQ,也就是消息队列Message Queue来处理这个问题

    下面我们会通过几个方法介绍消息队列:

    • MQ
    • RabbitMQ
    • SpringAMQP

    MQ

    首先我们先来介绍消息队列的各个信息

    同步通信

    首先我们需要先去了解同步通信:

    • 概念:当一个微服务与另一个微服务建立连接时,双方必须同时接受并且处于空闲状态便于两者交互
    • 举例:比如我们使用手机打电话,我们这边发送打电话的请求,另一方必须也处于空闲状态并接收这个请求,两者才能打电话成功
    • 微服务举例:Feign调用就属于同步方式,虽然调用可以实时得到结果

    我们给出一个同步通信的简单例子:

    我们对上图进行简单解释:

    /*
    用户使用支付服务,支付服务需要经过一系列操作之后才能返回结果给用户
    
    具体服务:支付服务->订单服务->仓储服务->短信服务->...
    */
    
    // 那么就会存在以下问题:
    
    // 1.假设我们每个服务耗时1s,那么多个服务累计在一起,耗时逐渐增多用户得到结果的速度会变慢
    
    // 2.如果我们需要添加新的服务,那么我们需要在原函数中添加该服务的调用方法,会修改原有代码,导致修改困难
    
    // 3.并且当前面的操作进行过程中,后面的操作手中仍存有该流程的资源无法释放,导致资源损耗需要当当前服务结束后才可释放
    
    // 4.最可怕的是,当其中有一个服务出现错误,那么整条服务链就会出现错误,导致后面的服务无法执行,导致用户无法得到结果!!!
    

    我们可以很明显的感觉到同步通信的优点:

    • 时效性较强,可以立即得到结果

    但是缺点也非常的多:

    • 耦合度高:每次加入新的需求都需要修改原先的代码
    • 性能和吞吐能力下降:调用者需要等待服务者全部完成服务后才会得到响应,若服务者过多导致速度过慢
    • 有额外的资源消耗:调用链中每个服务在等待过程中无法释放自己已保留的资源,必须等当前服务结束后才可释放
    • 有级联失败问题:当其中一个服务出现错误,整条调用链出现错误

    异步通信

    我们同样给出异步通信的概念:

    • 异步通信整体分为三部分:发布者,Broker,订阅者
    • 其中发布者就相当于我们的用户,发布者只需要发布一条信息,这条信息会携带一定的信息
    • 其中订阅者就相当于我们的微服务,订阅者会去依次处理自己所接收到的信息,然后做出对应的操作
    • 其中Broker就是消息队列,Broker会去接收信息,并将信息传递给订阅者,它并不会去记录信息来自哪也不去记录信息去往哪

    那么异步通信的优点其实很明显:

    • 吞吐量提升:无需等待订阅者处理完成,发送方会直接获得一个响应,吞吐量提升
    • 故障隔离:服务没有直接调用,不存在级联失败问题,当一个微服务故障时只有该微服务失效
    • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,消费者可以按照自己的速度去处理事件
    • 耦合度低:每个服务都单独存在,当需要使用到某个服务时,该服务只需要去订阅该Broker,不需要做额外源代码修改
    • 无额外资源消费:由于每个微服务单独存在,所以不存在链表关系,不会去提前占用资源,只有自己使用时才会占用资源

    但是缺点同样明显:

    • Broker核心工具:需要依赖于Broker的可靠、安全、性能
    • 业务复杂性:业务之间没有链表连接,而是信息直接传递,没有线性关系,难以追踪判断

    技术对比

    我们来认识一下市面上常见的消息队列:

    RabbitMQ ActiveMQ RocketMQ Kafka
    公司/社区 Rabbit Apache 阿里 Apache
    开发语言 Erlang Java Java Scala&Java
    协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
    可用性 一般
    单机吞吐量 一般 非常高
    消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
    消息可靠性 一般 一般

    我们给出一些消息队列选择的建议:

    • 追求吞吐能力:RocketMQ、Kafka
    • 追求消息低延迟:RabbitMQ、Kafka
    • 追求可靠性:RabbitMQ、RocketMQ
    • 追求可用性:Kafka、 RocketMQ 、RabbitMQ

    RabbitMQ

    我们主要去学习RabbitMQ的基本使用

    基本安装

    我们如果要去使用RabbitMQ,首先需要先进行插件安装:

    1. 线上拉取镜像
    # docker拉取镜像(docker在之前的文章中已经介绍过了~)
    docker pull rabbitmq:3-management
    
    1. 在线生成容器
    docker run \	# docker启动容器
     -e RABBITMQ_DEFAULT_USER=root \	# 配置环境:mq用户名
     -e RABBITMQ_DEFAULT_PASS=123321 \	# 配置环境:mq密码
     --name mq \	# mq名称
     --hostname mq1 \	# mq主机名(单机部署可以省略,集群部署需要)
     -p 15672:15672 \	# 开放端口号:管理平台端口,ui界面
     -p 5672:5672 \		# 开放端口号:消息队列端口,作为Broker的核心端口
     -d \
     rabbitmq:3-management
    

    基本入门

    首先我们需要知道最基本的消息队列模型:

    • 最基本的消息队列模型只包括三个元素,分别是publisher(发布者),queue(消息队列),consumer(订阅者)

    他们的用途分别是:

    • publisher:消息发布者,将消息发送到队列queue
    • queue:消息队列,负责接受并缓存消息
    • consumer:订阅队列,处理队列中的消息

    其基本流程图为:

    那么下面我们就来完成一个基本的RabbitMQ的小项目(只需了解):

    1. 首先我们需要一个父工程,在父工程下有两个子工程

    1. 我们首先去书写发布者的发送代码
    /*
    发布者
    具体逻辑为:建立连接->创建Channel->声明队列->发送消息->关闭连接和channel
    */
    
    package cn.itcast.mq.helloworld;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.150.101");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("itcast");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.发送消息
            String message = "hello, rabbitmq!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");
    
            // 5.关闭通道和连接
            channel.close();
            connection.close();
    
        }
    }
    
    1. 然后我们去书写订阅者接收代码
    /*
    订阅者
    具体逻辑为:建立连接->创建Channel->声明队列->订阅消息
    */
    
    package cn.itcast.mq.helloworld;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConsumerTest {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.150.101");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("itcast");
            factory.setPassword("123321");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
    

    到这里我们已经基本了解了RabbitMQ的使用,让我们进入下一章节!

    SpringAMQP

    SpringAMQP是针对MQ的API更新,也就是使用简单的API去完成上述复杂的RabbitMQ使用过程

    RabbitMQ消息模型

    在正式接收SpringAMQP之前,我们需要先去了解一下RabbitMQ的五种常见消息模型:

    1. 基本消息队列
    • 存在一条链关系,发布者发布信息交给消息队列,订阅者从消息队列订阅消息

    1. 工作消息队列
    • 存在一个消息队列连接多个订阅者
    • 正常情况下订阅者均等保存所获取的消息,但可以通过设置来改变订阅者当前可保存信息个数

    1. 发布订阅广播版
    • 除消息队列外,存在一个交换器Exchange,交换器在广播状态下会将消息发送给所有相连接的消息队列

    1. 发布订阅路由版
    • 交换器选择性地将信息交给不同的消息队列
    • 交换器传递的信息会附带一个key值,而不同的消息队列存在一个或多个key值,如果相符合就将其信息传递给该消息队列

    1. 发布订阅主题版
    • 一种功能类似于路由版的发布订阅方式
    • 将传统的key值转化为多个字段的拼接值,采用"."进行拼接,其中可以采用"*"代替一个字段,采用"#"代替一个或多个字段

    SpringAMQP简单介绍

    首先我们需要去了解AMQP:

    • 用于应用程序之间的传递业务信息的开放标准
    • 该协议与平台与编程语言无关,更加符合微服务的独立性要求

    那么我们再去了解SpringAMQP:

    • SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发布消息和接收消息,利用SpringBoot对其实现了自动装配

    其实简单来说SpringAMQP为我们提供了三个功能:

    • 自动声明队列、交换机及其绑定关系
    • 基于注解的监听器模式,异步接收消息
    • 封装了RabbitTemplate工具,用于发送消息

    SpringAQMP简单消息队列

    我们利用SpringAMQP来实现简单消息队列:

    1. 在父工程中导入依赖
    
    <dependency>
        <groupId>org.springframework.bootgroupId>
        <artifactId>spring-boot-starter-amqpartifactId>
    dependency>
    
    1. 配置RabbitMQ地址
    # 应当在Publisher发布者和Consumer订阅者两个子工程下均配置地址
    
    spring:
      rabbitmq:
        host: 192.168.150.101 # 主机名
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: itcast # 用户名
        password: 123321 # 密码
    
    1. 编写Publisher测试类发送消息
    // 注意:在Publisher工程下的test模块下书写该发送消息的test代码
    
    package cn.itcast.mq.spring;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testSimpleQueue() {
            // 队列名称
            String queueName = "simple.queue";
            // 消息
            String message = "hello, spring amqp!";
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    
    1. 编写Listener监听者类监听消息
    // 注意:在Consumer订阅者下的Listener文件(自己创建)下创建该监听类,需设置为Bean
    
    package cn.itcast.mq.listener;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    // 能够被Spring扫描到
    @Component
    public class SpringRabbitListener {
    
        // 核心点:监听simple.queue队列
        @RabbitListener(queues = "simple.queue")
        // 发布者发布什么类型,监听者就接收什么类型并做出对应处理
        public void listenSimpleQueueMessage(String msg) throws InterruptedException {
            System.out.println("spring 消费者接收到消息:【" + msg + "】");
        }
    }
    

    SpringAMQP工作消息队列

    我们先来简单介绍一下工作消息队列:

    • 工作消息队列无非就是将一个消息队列与多个订阅者签订在一起
    • 这多个订阅者的功能大部分情况下是一样的,只是为了做一个简单的负载均衡处理
    • 每个订阅者都会去不断获取消息队列中的消息直到订阅者自身阈值或者消息已经被获取完毕

    我们来使用SpringAMQP来实现工作消息队列:

    1. 我们采用发布者发布多条消息
    /**
    * workQueue
    * 向队列中不停发送消息,模拟消息堆积。
    */
    
    @Test
    public void testWorkQueue() throws InterruptedException {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, message_";
        
        // 我们这里模拟发送了五十条消息,平均每20ms发送一条
        for (int i = 0; i < 50; i++) {
            // 发送消息
            rabbitTemplate.convertAndSend(queueName, message + i);
            Thread.sleep(20);
        }
    }
    
    1. 我们采用两个订阅者来订阅消息
    // 第一个订阅者平均每20ms获得一个消息
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue1(String msg) throws InterruptedException {
        System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(20);
    }
    
    // 第二个订阅者平均每200ms获得一个消息
    @RabbitListener(queues = "simple.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
        Thread.sleep(200);
    }
    
    /*
    但是由于两个订阅者均未设置阈值
    所以他们并不会在结束后才去拿去消息
    而是依次去获取消息,也就意味着不管他们何时结束自己的消息,他们都平分获取25条消息
    20ms订阅者1拿消息并处理,40ms订阅者2拿消息并处理,60ms订阅者1拿消息并处理,80ms订阅者2拿到消息但并不能处理,依次循环
    
    结论:
    - 两者均拿到25条消息
    - 订阅者1在980ms时结束所有的消息获取,并结束所有消息处理
    - 订阅者2在1000ms时结束所有的消息获取,但是还需要在5000ms(大概哈)才能完全处理消息
    */
    
    1. 修改最大阈值来加快效率
    spring:
      rabbitmq:
        listener:
          simple:	# 队列名称
            prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
    

    SpringAMQP发布订阅广播

    我们首先来详细介绍一下发布订阅(广播)的结构:

    • 发布订阅广播主要是在消息队列的划分上加上了一层交换机系统
    • 在发布订阅广播中交换机会将从发布者获得信息传递给全部所有与之相连的消息队列以供处理
    • 需要注意Exchange(交换机)只负责转发消息,不具备存储消息的能力,如果没有与之相连的消息队列就会导致信息丢失

    我们同样采用SpringAQMP来实现发布订阅广播:

    1. 发布者Publisher发布消息
    @Test
    public void testFanoutExchange() {
        // 队列名称
        String exchangeName = "qiuluo.fanout";
        // 消息
        String message = "hello, everyone!";
        // 第一个参数是交换机名称,因为目前的publisher只能发送信息给交换机,由交换机来决定传递给哪个消息队列
        // 第二个参数是key值选择,我们会在后面用到
        // 第三个参数是所传递的信息
        rabbitTemplate.convertAndSend(exchangeName, "", message);
    }
    
    1. 消费者Consumer获取信息
    // 和之前一样,Consumer从消息队列那里获取信息
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1(String msg) {
        System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
    }
    
    // 和之前一样,Consumer从消息队列那里获取信息
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2(String msg) {
        System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
    }
    
    1. 设置交换机和消息队列并进行绑定
    // 在consumer中创建一个类,声明队列和交换机
    
    package cn.itcast.mq.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutConfig {
        /**
         * 声明交换机
         * @return Fanout类型交换机
         */
        @Bean
        public FanoutExchange fanoutExchange(){
            // 采用@Bean的形式将其设置为Bean
            return new FanoutExchange("itcast.fanout");
        }
    
        /**
         * 第1个队列
         */
        @Bean
        public Queue fanoutQueue1(){
            // 采用@Bean的形式将其设置为Bean
            return new Queue("fanout.queue1");
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean
        public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
            // 采用BindingBuilder的bind,to方法进行交换机与队列的绑定即可(固定形式)
            return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
        }
    
        /**
         * 第2个队列
         */
        @Bean
        public Queue fanoutQueue2(){
            return new Queue("fanout.queue2");
        }
    
        /**
         * 绑定队列和交换机
         */
        @Bean
        public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
        }
    }
    

    SpringAMQP发布订阅路由

    我们同样来简单介绍一下发布订阅路由:

    • 发布订阅系列都是需要交换机与消息队列进行绑定,由交换机决定消息应当发往哪个消息队列
    • 在该模式下需要进行路由选择,在发送消息时会传递一个key值,这个值在publisher发送时所携带的
    • 每一个队列也会有一个或多个对应的key值,当交换机获得信息后,会对key进行比对,若相同就传递给对应的消息队列

    我们下面采用SpringAMQP的注解声明方式来实现发布订阅路由:

    1. 发布者发布消息
    @Test
    public void testSendDirectExchange() {
        // 交换机名称
        String exchangeName = "qiuluo.direct";
        // 消息
        String message = "红色警报!";
        // 发送消息
        // 这里就用到了第二个参数,就是key值
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    }
    
    1. 订阅者处理消息(采用注解方式来绑定交换机和消息队列)
    // 采用@RabbitListener注解的bindings参数,在里面需要表明value(队列名称),exchange(交换机名称),key(队列的key值)
    // 其内部的数据都需要采用@注解来给出
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue1"),
        exchange = @Exchange(name = "qiuluo.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "blue"}
    ))
    public void listenDirectQueue1(String msg){
        System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
    }
    
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "direct.queue2"),
        exchange = @Exchange(name = "qiuluo.direct", type = ExchangeTypes.DIRECT),
        key = {"red", "yellow"}
    ))
    public void listenDirectQueue2(String msg){
        System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
    }
    

    SpringAMQP发布订阅主题

    我们同样来简单介绍一下发布订阅路由:

    • 发布订阅主题实际上和发布订阅路由一样是进行队列选择的
    • 但是主题的key值是由多个部分组成的,其中采用"."来进行分割,例如:China.weather
    • 我们可以采用"*"来代替一个key值,同时我们可以采用"#"来代替一个或多个key值,更具有灵活性

    我们同样采用SpringAMQP来给出一个发布订阅主题的案例:

    1. 发布者发布消息
    /**
    * topicExchange
    */
    
    @Test
    public void testSendTopicExchange() {
        // 交换机名称
        String exchangeName = "qiuluo.topic";
        // 消息
        String message = "喜报!胜!";
        // 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    }
    
    1. 订阅者获得消息
    // 这里仅仅对exchange的type类型进行更改,并且更改了key值
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue1"),
        exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
        key = "china.#"
    ))
    public void listenTopicQueue1(String msg){
        System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
    }
    
    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue2"),
        exchange = @Exchange(name = "qiuluo.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
    ))
    public void listenTopicQueue2(String msg){
        System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
    }
    

    消息转换器

    最后我们介绍一个简单的知识点:

    • 由于我们的RabbitMQ在存储信息时会进行序列化处理,而默认的Spring序列化处理是JDK序列化处理

    • 而JDK序列化处理存在有多种缺点:数据体积大,存在安全漏洞,可读性差等

    所以我们在正常使用时通常会去更换默认消息转换器,采用JSON消息转换器:

    1. 导入jackson依赖
    
    <dependency>
        <groupId>com.fasterxml.jackson.dataformatgroupId>
        <artifactId>jackson-dataformat-xmlartifactId>
        <version>2.9.10version>
    dependency>
    
    1. 在启动类中添加一个Bean即可
    @Bean
    public MessageConverter jsonMessageConverter(){
        return new Jackson2JsonMessageConverter();
    }
    

    结束语

    这篇文章中介绍了消息队列的内容并详细介绍了RabbitMQ以及SpringAMQP,希望能为你带来帮助

    附录

    该文章属于学习内容,具体参考B站黑马程序员的微服务课程

    这里附上视频链接:01-今日课程介绍4_哔哩哔哩_bilibili

  • 相关阅读:
    手把手建立Roofline模型(CPU)
    springboot单独在指定地方输出sql
    【JUC系列-15】深入理解CompletableFuture的基本使用
    【一生一芯】Chap.1 “一生一芯”实验环境配置| VMware安装Ubuntu20.04 | PA工程配置 | 解决llvm版本问题
    java垃圾回收基础
    如何在数据库中存储小数:FLOAT、DECIMAL还是BIGINT?
    Multimodal Unsupervised Image-to-Image Translation多通道无监督图像翻译
    iOS16新特性:实时活动-在锁屏界面实时更新APP消息
    Flask 项目创建:
    湖南衡阳3D扫描在生物仿真研究的应用高精度三维扫描螃蟹-CASAIM中科广电
  • 原文地址:https://www.cnblogs.com/qiuluoyuweiliang/p/17201877.html