• 浅学 RabbitMQ


    RabbitMQ

    1.初识MQ

    1.1.同步和异步通讯

    微服务间通讯有同步和异步两种方式:

    同步通讯:就像打电话,需要实时响应。

    异步通讯:就像发邮件,不需要马上回复。
    在这里插入图片描述

    两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。

    1.1.1.同步通讯

    我们之前学习的Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:

    在这里插入图片描述

    总结:

    同步调用的优点:

    • 时效性较强,可以立即得到结果

    同步调用的问题:

    • 耦合度高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

    1.1.2.异步通讯

    异步调用则可以避免上述问题:

    我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。

    在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。

    订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。

    为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

    在这里插入图片描述

    Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

    好处:

    • 吞吐量提升:无需等待订阅者处理完成,响应更快速

    • 故障隔离:服务没有直接调用,不存在级联失败问题

    • 调用间没有阻塞,不会造成无效的资源占用

    • 耦合度极低,每个服务都可以灵活插拔,可替换

    • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

    缺点:

    • 架构复杂了,业务没有明显的流程线,不好管理
    • 需要依赖于Broker的可靠、安全、性能

    好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是我们今天要学习的MQ技术。

    1.2.技术对比:

    MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

    比较常见的MQ实现:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka

    几种常见MQ的对比:

    RabbitMQActiveMQRocketMQKafka
    公司/社区RabbitApache阿里Apache
    开发语言ErlangJavaJavaScala&Java
    协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
    可用性一般
    单机吞吐量一般非常高
    消息延迟微秒级毫秒级毫秒级毫秒以内
    消息可靠性一般一般

    追求可用性:Kafka、 RocketMQ 、RabbitMQ

    追求可靠性:RabbitMQ、RocketMQ

    追求吞吐能力:RocketMQ、Kafka

    追求消息低延迟:RabbitMQ、Kafka

    2.快速入门

    2.1.安装RabbitMQ

    安装RabbitMQ,参考课前资料:

    在这里插入图片描述

    MQ的基本结构:

    在这里插入图片描述

    RabbitMQ中的一些角色:

    • publisher:生产者
    • consumer:消费者
    • exchange个:交换机,负责消息路由
    • queue:队列,存储消息
    • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

    2.2.RabbitMQ消息模型

    RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:

    在这里插入图片描述

    1、基本消息队列:

    ​ 一个生产者对应一个消费者

    2、工作消息队列:

    ​ 一个生产者对应多个消费者,但是一条消息只能有一个消费者获得消息!!!
    ​ 轮询分发就是将消息队列中的消息,依次发送给所有消费者。一个消息只能被一个消费者获取。

    3、发布/订阅模式(fanout 交换器):

    ​ 一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费。

    ps:X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers,这里的交换器是 fanout。下面我们会详细介绍这几种交换器。
      两个消费者获得了同一条消息。即就是,一个消息从交换机同时发送给了两个队列中,监听这两个队列的消费者消费了这个消息;
    如果没有队列绑定交换机,则消息将丢失。因为交换机没有存储能力,消息只能存储在队列中。

    4、路由模式(精准匹配 direct交换器)

    ​ 生产者将消息发送到direct交换器,在绑定队列和交换器的时候有一个路由key,生产者发送的消息会指定一个路由key,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。

    也就是让消费者有选择性的接收消息。
    路由模式,是以路由规则为导向,引导消息存入符合规则的队列中。再由队列的消费者进行消费的。

    5、主题模式(模糊匹配 topic交换器)

    上面的路由模式是根据路由key进行完整的匹配(完全相等才发送消息),这里的通配符模式通俗的来讲就是模糊匹配。

    符号“#”表示匹配一个或多个词,符号“*”表示匹配一个词。
      与路由模式相似,但是,主题模式是一种模糊的匹配方式。

    这五种工作模式,可以归为三类:

    生产者,消息队列,一个消费者;
    生产者,消息队列,多个消费者;
    生产者,交换机,多个消息队列,多个消费者;

    4种交换器:

    1、direct 如果路由键完全匹配的话,消息才会被投放到相应的队列。

    2、fanout 当发送一条消息到fanout交换器上时,它会把消息投放到所有附加在此交换器上的队列。

    3、topic 设置模糊的绑定方式,“*”操作符将“.”视为分隔符,匹配单个字符;“#”操作符没有分块的概念,它将任意“.”均视为关键字的匹配部分,能够匹配多个字符。

    4、header headers 交换器允许匹配 AMQP 消息的 header 而非路由键,除此之外,header 交换器和 direct 交换器完全一致,但是性能却差很多,因此基本上不会用到该交换器

    1)网页登录 rabbitmq添加管理员账号

    在这里插入图片描述

    2)、设置 virtual hosts

    在这里插入图片描述

    设置权限

    在这里插入图片描述

    添加

    在这里插入图片描述

    2.3.导入Demo工程

    课前资料提供了一个Demo工程,mq-demo:

    在这里插入图片描述

    导入后可以看到结构如下:

    在这里插入图片描述

    包括三部分:

    • mq-demo:父工程,管理项目依赖
    • publisher:消息的发送者
    • consumer:消息的消费者

    2.4.入门案例

    简单队列模式的模型图:

    在这里插入图片描述

    官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

    • publisher:消息发布者,将消息发送到队列queue
    • queue:消息队列,负责接收并缓存消息
    • consumer:订阅队列,处理队列中的消息

    2.4.1.publisher实现

    思路:

    • 建立连接
    • 创建Channel
    • 声明队列
    • 发送消息
    • 关闭连接和channel

    代码实现:

    package cn.itcast.mq.helloworld;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    //        factory.setHost("192.168.1.18");
            factory.setHost("localhost");
            factory.setPort(5672);
            factory.setVirtualHost("/access");
            factory.setUsername("admin");
            factory.setPassword("admin");
    //        设置连接超时时间
            factory.setConnectionTimeout(5000000);
    //        设置握手超时时间
            factory.setHandshakeTimeout(5000000);
    
            factory.setShutdownTimeout(60000);
            factory.setChannelRpcTimeout(60000);
            factory.setWorkPoolTimeout(60000);
    
            Channel channel = null;
            Connection connection = null;
            try {
            // 1.2.建立连接
                 connection = factory.newConnection();
    
                // 2.创建通道Channel
                 channel = connection.createChannel();
                // 3.创建队列
                String queueName = "simple.queue";
                channel.queueDeclare(queueName, false, false, false, null);
    
                // 4.发送消息
                String message = "hello, rabbitmq!";
                channel.basicPublish("", queueName, null, message.getBytes());
                System.out.println("发送消息成功:【" + message + "】");
            }catch (Exception e){
              e.printStackTrace();
            }
            finally {
                // 5.关闭通道和连接
                channel.close();
                connection.close();
            }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    在这里插入图片描述

    打开rabbitmq网页,查看queue创建成功
    在这里插入图片描述

    2.4.2.consumer实现

    代码思路:

    • 建立连接
    • 创建Channel
    • 声明队列
    • 订阅消息

    代码实现:

    package cn.itcast.mq.helloworld;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConsumerTest {
    
        public static void main(String[] args) throws IOException, TimeoutException {
    //        创建连接
            ConnectionFactory connectionFactory = new ConnectionFactory();
    //        设置基本信息
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("admin");
    //        虚拟主机
            connectionFactory.setVirtualHost("/access");
    //        设置连接时间等
            connectionFactory.setConnectionTimeout(500000);
            connectionFactory.setHandshakeTimeout(50000);
            connectionFactory.setShutdownTimeout(50000);
            Connection connection = null;
            Channel channel = null;
            try {
    //            创建连接
                connection = connectionFactory.newConnection();
    //            创建 通道
                channel = connection.createChannel();
                String queueName = "simple.queue";
    //            队列声明
                channel.queueDeclare(queueName,false,false,false,null);
    //            刚发布的是基础的模式,1个生产者,1个消费者,1个队列 ,消费者订阅队列
                 channel.basicConsume(queueName,true,new DefaultConsumer(channel){
                     @Override
                     public void handleDelivery(String consumerTag, Envelope envelope,
                                                AMQP.BasicProperties properties, byte[] body) throws IOException {
                         // 5.处理消息
                         String message = new String(body);
                         System.out.println("接收到消息:【" + message + "】");
                     }
    
                 });
    
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                connection.close();
                channel.close();
            }
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54

    2.5.总结

    基本消息队列的消息发送流程:

    1. 建立connection

    2. 创建channel通道

    3. 利用channel声明队列

    4. 利用channel向队列发送消息

    基本消息队列的消息接收流程:

    1. 建立connection

    2. 创建channel通道

    3. 利用channel声明队列

    4. 定义consumer的消费行为handleDelivery()

    5. 利用channel将消费者与队列绑定

  • 相关阅读:
    MyBatis ---- 搭建MyBatis
    如何创建自己的Spring Boot Starter并为其编写单元测试
    53.基于微信小程序与SpringBoot的戏曲文化系统设计与实现(项目 + 论文)
    每天分享几个面试题(七)
    Ubuntu16.04安装网卡驱动
    领悟《信号与系统》之 LTI 系统的卷积积分及性质
    目标检测YOLO实战应用案例100讲-基于无人机图像的房屋目标检测
    Android 框架
    vue、vuex状态管理
    codeshell安装配置
  • 原文地址:https://blog.csdn.net/weixin_43987718/article/details/126060552