• 【Spring Cloud】初识RabbitMQ




    十、RabbitMQ 消息队列协议

    10.1 初识MQ

    1.1 同步和异步通讯

    微服务间通讯有同步和异步两种方式:

    • 同步通讯就像打电话,需要实时响应

    • 异步通讯就像发邮件,不需要马上回复(延时

    image-20210717161939695

    两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。

    发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。


    1.1.1 同步通讯

    image-20220819224353694

    我们之前学习的Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:

    image-20210717162004285

    同步调用的优点:

    • 时效性较强,可以立即得到结果

    同步调用的问题:

    • 耦合度高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

    返回顶部


    1.1.2 异步通讯

    异步调用则可以避免上述问题:

    我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。

    在事件模式中,支付服务是事件发布者(publisher,在支付完成后只需要发布一个支付成功的事件(event,事件中带上订单id

    订单服务物流服务事件订阅者(Consumer,订阅支付成功的事件,监听到事件后完成自己业务即可。

    image-20220819225112810

    为了解除事件发布者订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker

    • 发布者发布事件到Broker,不关心谁来订阅事件。

    • 订阅者从Broker订阅事件,不关心谁发来的消息。

    image-20210422095356088

    Broker 是一个像数据总线一样的东西,所有的服务要接收数据发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

    好处:

    • 吞吐量提升无需等待订阅者处理完成,响应更快速

    • 故障隔离服务没有直接调用,不存在级联失败问题

    • 调用间没有阻塞不会造成无效的资源占用

    • 耦合度极低每个服务都可以灵活插拔,可替换

    • 流量削峰不管发布事件的流量波动多大,都由Broker接**收,订阅者可以按照自己的速度去处理事件

    缺点:

    • 架构复杂了,业务没有明显的流程线,不好管理
    • 需要依赖于Broker的可靠、安全、性能

    返回顶部


    1.2 技术对比

    MQ(MessageQueue,中文是消息队列,字面来看就是存放消息的队列,也就是事件驱动架构中的Broker。

    比较常见的MQ实现:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka

    几种常见MQ的对比:

    RabbitMQActiveMQRocketMQKafka
    公司/社区RabbitApache阿里Apache
    开发语言ErlangJavaJavaScala&Java
    协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
    可用性一般
    单机吞吐量一般非常高
    消息延迟微秒级毫秒级毫秒级毫秒以内
    消息可靠性一般一般

    追求可用性:Kafka、 RocketMQ 、RabbitMQ

    追求可靠性:RabbitMQ、RocketMQ

    追求吞吐能力:RocketMQ、Kafka

    追求消息低延迟:RabbitMQ、Kafka

    返回顶部


    10.2 快速入门

    2.1 安装RabbitMQ

    MQ的基本结构:

    image-20210717162752376

    RabbitMQ中的一些角色:

    • publisher生产者
    • consumer消费者
    • exchange交换机,负责消息路由
    • queue队列,存储消息
    • virtualHost虚拟主机,隔离不同租户的exchange、queue、消息的隔离

    2.1.1 单机部署

    我们在 Centos7虚拟机中使用 Docker来安装。

    1.1 下载镜像

    方式一:在线拉取

    docker pull rabbitmq:3-management
    
    • 1

    方式二:从本地加载

    课前资料已经提供了镜像包:

    image-20210423191210349

    上传到虚拟机中后,使用命令加载镜像即可:

    image-20220820003242751

    docker load -i mq.tar
    
    • 1

    image-20220820003206553

    返回顶部


    1.2 安装MQ

    执行下面的命令来运行MQ容器:

    docker run \
     -e RABBITMQ_DEFAULT_USER=zyx \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     --name mq \
     --hostname mq1 \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq:3-management
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    image-20220820003620707

    浏览器访问http://192.168.64.178:15672/

    image-20220820004352320

    输入用户名、密码后,登陆到主界面:

    image-20220820004317548

    我们可以选择创建新的用户,并赋予身份信息:

    image-20220820005016062

    刚创建的用户是没有访问主机权限的,需要我们进行分配:

    image-20220820005204861

    我们先新建一个主机连接:

    image-20220820005255563

    接着对admin用户进行授权:

    image-20220820005421595

    通常来说,一个用户管理一个主机,所以我们将主机连接重新分配:

    image-20220820005514549

    image-20220820005545566

    image-20220820005605125

    返回顶部


    2.1.2 集群部署

    2.1 集群分类

    RabbitMQ的官方文档中,讲述了两种集群的配置方式:

    • 普通模式:普通模式集群不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你;如果mq1宕机,消息就会丢失。
    • 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。
    2.2 设置网络

    需要让3台MQ互相知道对方的存在

    分别在3台机器中,设置 /etc/hosts文件,添加如下内容:

    192.168.150.101 mq1
    192.168.150.102 mq2
    192.168.150.103 mq3
    
    • 1
    • 2
    • 3

    并在每台机器上测试,是否可以ping通对方。

    返回顶部


    2.2 RabbitMQ消息模型

    RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:

    image-20210717163332646

    返回顶部


    2.3 导入Demo工程

    课前资料提供了一个Demo工程,mq-demo:

    image-20210717163253264

    导入后可以看到结构如下:

    image-20210717163604330

    包括三部分:

    • mq-demo父工程,管理项目依赖
    • publisher消息的发送者
    • consumer消息的消费者

    返回顶部


    2.4 入门案例

    简单队列模式的模型图:

    image-20210717163434647

    官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

    • publisher消息发布者,将消息发送到队列queue
    • queue消息队列,负责接受并缓存消息
    • consumer订阅队列,处理队列中的消息

    返回顶部


    2.4.1 publisher实现

    思路:

    • 建立连接
    • 创建Channel
    • 声明队列
    • 发送消息
    • 关闭连接和channel

    代码实现:

    package cn.itcast.mq.helloworld;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class PublisherTest {
        @Test
        public void testSendMessage() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.64.178");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("zyx");
            factory.setPassword("123456");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.发送消息
            String message = "hello, rabbitmq!";
            channel.basicPublish("", queueName, null, message.getBytes());
            System.out.println("发送消息成功:【" + message + "】");
    
            // 5.关闭通道和连接
            channel.close();
            connection.close();
    
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    返回顶部


    2.4.2 consumer实现

    代码思路:

    • 建立连接
    • 创建Channel
    • 声明队列
    • 订阅消息

    代码实现:

    package cn.itcast.mq.helloworld;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConsumerTest {
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.64.178");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("zyx");
            factory.setPassword("123456");
            // 1.2.建立连接
            Connection connection = factory.newConnection();
    
            // 2.创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3.创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4.订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5.处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41

    返回顶部


    2.4.3 断点测试

    生产者中首先创建连接工厂建立连接,然后设置连接参数,分别是:主机名、端口号、vhost、用户名、密码等信息:

    image-20220820101712915

    最后完成连接创建:

    image-20220820102236000

    可以看到在Connections下面我们新创建的连接信息:

    image-20220820102040504

    继续往下走,我们通过连接创建了新的通道:

    image-20220820102345444

    可以看到在Channels下面我们新创建的通道信息:

    image-20220820102550831

    然后我们声明一个队列信息,用于发送消息:

    image-20220820102724902

    可以看到在Queues下面我们新创建的消息队列信息:

    image-20220820102742871

    创建好消息队列后,我们通过channel进行消息的发送:

    image-20220820103225695

    可以看到在Channels下面我们的通道中已经准备好了一个信息:

    image-20220820103120040

    进入我们的simple-queue,可以看到我们的队列信息:

    image-20220820103147434

    Get messages选项下,便可以查看我们发送的消息信息:hello,rabbitmq!

    image-20220820103207390

    同样的我们的消费者,在获取消息之前也需要创建连接、通道、消息队列,消费者与生产者都声明消息队列是因为消息多了的时候不确定谁先执行,重复声明并不会创建新的,避免了队列的缺失:

    image-20220820104042464

    在订阅消息内部,声明绑定了内部类·DefaultConsumer·,并实现了函数handleDelivery的回调(异步执行),用于处理获取的消息:

    image-20220820104411860

    image-20220820104842071

    当我们一旦消费了信息,队列中的信息就会立刻被清空:

    image-20220820104717284

    返回顶部


    2.5 总结

    基本消息队列的消息发送流程:

    1. 建立connection

    2. 创建channel

    3. 利用channel声明队列

    4. 利用channel向队列发送消息

    基本消息队列的消息接收流程:

    1. 建立connection

    2. 创建channel

    3. 利用channel声明队列

    4. 定义consumer的消费行为handleDelivery()

    5. 利用channel将消费者与队列绑定

    返回顶部


  • 相关阅读:
    黑产反诈有方法,异常识别我在行—欺诈反洗钱等领域用得最多的经典算法
    一夜之间,3.0万 Star,全部清零。。
    AI 生成的唯美头像也太好看了吧!附好说 AI 一秒出图技巧
    Linux系统常用的工具
    27-搭建LVS-DR+Nginx高可用模式
    SpringMvc---编码过滤器(解决乱码问题)
    SAP BC TSV_TNEW_PAGE_ALLOC_FAILED
    Python Web3.0应用开发【2022】
    blender UV基础
    QT数据库,实现数据库增删改查
  • 原文地址:https://blog.csdn.net/qq_45797116/article/details/126437147