• 04MQ消息队列


    一、同步异步通讯

    1.同步通讯和异步通讯

    同步通讯:当A服务调用B服务时,一直等待B服务执行完毕,A才继续往下走

    优点:时效性强,可以立即得到结果。

    缺点:

    耦合度高,加入新的需求就要修改原来代码

    ②性能下降,调用者需要等待服务提供者的响应,如果调用链过长则响应时间是每次调用时间之和。

    ③资源浪费, 每个调用者在等待服务响应的过程中,不能释放资源,高并发会浪费系统资源

    ④级联失败,如果提供者出现问题,所有的调用都会出现问题。

    异步通讯: 当A服务调用B服务时,不需要等待B服务执行完毕,A就可以往下走(事件驱动模式)

    优点:

    ①服务解耦

    ②性能提升,吞吐量提高。

    ③服务没有强依赖,不担心级联失败问题

    ④流量削峰

    缺点:

    ①依赖Broker的可靠性、安全性、吞吐能力

    ②架构复杂,业务没有明显的流程线,不好追踪管理

    2.什么是MQ(MessageQueue)

    存放消息的队列,事件驱动架构的Broker

    二、RabbitMQ快速入门

    1.RabbitMQ概述

    RabbitMQ是开源消息通信中间件

    生产者-》交换机-》信道-》队列-》消费者

    ①Publisher:消息的发送者,把消息发送到exchange交换机

    ②exchange:交换机,负责路由,把消息投递到queue队列

    ③queue:队列,负责缓存消息

    ②consumer:消息的消费者,从队列获取消息,处理消息。

    总结

    RabbitMQ中的几个概念:

    channel:操作MQ的工具

    exchange:路由消息到队列中

    queue:缓存消息

    virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

    2.运行RabbitMQ命令

    1. docker run \
    2. -e RABBITMQ_DEFAULT_USER=itcast \
    3. -e RABBITMQ_DEFAULT_PASS=123321 \
    4. --name mq \
    5. --hostname mq1 \
    6. -p 15672:15672 \
    7. -p 5672:5672 \
    8. -d \
    9. rabbitmq:3-management

    ①端口15672是MQ管理平台的端口

    ②端口5672是消息通信的端口

    3.常见消息模型

    ①基本消息队列(BasicQueue)

    ②工作消息队列(WorkQueue)

    ③发布订阅(Publish,Subscribe),根据交换机分为三种类型

    Fanout Exchange:广播

    Direct Exchange:路由

    Topic Exchange:主题

    4.HelloWorld案例

    Publisher:消息发布者,将消息发送到队列queue

    Queue:消息队列,负责接受并缓存消息

    Consumer:订阅队列,处理队列中的消息

    发送流程

    • 建立connection
    • 创建channel
    • 利用channel声明队列
    • 利用channel向队列发送消息

    接收流程

    • 建立connection
    • 创建channel
    • 利用channel声明队列
    • 定义consumer的消费行为handleDelivery()
    • 利用channel将消费者与队列绑定

    三、SpringAMOP

    1.什么是SpringAMOP

    SpringAMOP是基于AMOP协议定义的一套API规范,提供模板接收发送消息。包括两部分,spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

    2.案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能

    发送

    • 引入amqp的starter依赖
    • 配置RabbitMQ地址
    • 利用RabbitTemplate的convertAndSend方法

    ①引入AMQP依赖:子模块都需要amop,所以在把依赖写在父工程

    1. <!--AMQP依赖,包含RabbitMQ-->
    2. <dependency>
    3. <groupId>org.springframework.boot</groupId>
    4. <artifactId>spring-boot-starter-amqp</artifactId>
    5. </dependency>

    ②在publisher服务中编写配置文件application.yml,连接mq信息

    1. spring:
    2. rabbitmq:
    3. host: 192.168.150.101 # 主机名
    4. port: 5672 # 端口
    5. virtual-host: / # 虚拟主机
    6. username: itcast # 用户名
    7. password: 123321 # 密码

    ③测试类

    1. @RunWith(SpringRunner.class)
    2. @SpringBootTest
    3. public class SpringAmqpTest {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. @Test
    7. public void testSimpleQueue() {
    8. String queueName = "simple.queue";
    9. String message = "hello, spring amqp!";
    10. rabbitTemplate.convertAndSend(queueName, message);
    11. }
    12. }

    接收

    • 引入amqp的starter依赖
    • 配置RabbitMQ地址
    • 定义类,添加@Component注解
    • 类中声明方法,添加@RabbitListener注解,方法参数就时消息

    ①②跟前面一样,编写③消费逻辑类

    1. @Component
    2. public class SpringRabbitListener {
    3. @RabbitListener(queues = "simple.queue")
    4. public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    5. System.out.println("spring 消费者接收到消息 :【" + msg + "】");
    6. }
    7. }

    3.WorkQueue工作队列

    Work queue工作队列,可以提高消息处理速度,避免队列消息堆积

    案例:模拟WorkQueue,实现一个队列绑定多个消费者

    基本思路如下:

    1. 在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue
    2. 在consumer服务中定义两个消息监听者,都监听simple.queue队列
    3. 消费者1每秒处理50条消息,消费者2每秒处理10条消息

    ①修改publisher,1秒发送50条消息

    1. @Test
    2. public void testWorkQueue() throws InterruptedException {
    3. // 队列名称
    4. String queueName = "simple.queue";
    5. // 消息
    6. String message = "hello, message__";
    7. for (int i = 0; i < 50; i++) {
    8. // 发送消息
    9. rabbitTemplate.convertAndSend(queueName, message + i);
    10. // 避免发送太快
    11. Thread.sleep(20);
    12. }
    13. }

    ②两个消费者监听simple.queue。消费者1每秒处理50条,消费者2每秒处理10条。

    消费者1处理效率高。

    1. @RabbitListener(queues = "simple.queue")
    2. public void listenSimpleQueueMessage(String msg) throws InterruptedException {
    3. System.out.println("spring 消费者1接收到消息:【" + msg + "】");
    4. Thread.sleep(20);
    5. }
    6. @RabbitListener(queues = "simple.queue")
    7. public void listenSimpleQueueMessage2(String msg) throws InterruptedException {
    8. System.err.println("spring 消费者2接收到消息:【" + msg + "】");
    9. Thread.sleep(100);
    10. }

    此时的两个消费者的消息数是相同的【平均分配】,但两个消费者处理消息的效率不一样。

    prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息,能者多劳。

    总结

    Work模型的使用:

    • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
    • 通过设置prefetch来控制消费者预取的消息数量

    4.发布Publish,订阅Subscribe

    发布订阅模式跟之前案例区别是允许将同一个消息发送给多个消费者。实现方式是加入了exchange交换机。

    ①常见的交换机exchange类型

    Fanout广播

    Direct路由

    Topic话题

     注意exchange负责消息路由(转发),而不是存储,路由失败则消息丢失

    发布订阅-Fanout Exchange

    Fanout Exchange会将接收到的消息路由到每一个跟其绑定的queue

    案例:利用SpringAMQP演示FanoutExchange的使用

    实现思路:

    ①在consumer服务中,利用代码声明队列,交换机,并将两者绑定。

    ②在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

    ③在publisher中编写测试方法,向itcast.fanout发送消息

    步骤1:在consumer服务声明Exchange、Queue、Binding

    1. @Configuration
    2. public class FanoutConfig {
    3. // itcast.fanout
    4. @Bean
    5. public FanoutExchange fanoutExchange(){
    6. return new FanoutExchange("itcast.fanout");
    7. }
    8. // fanout.queue1
    9. @Bean
    10. public Queue fanoutQueue1(){
    11. return new Queue("fanout.queue1");
    12. }
    13. // 绑定队列1到交换机
    14. @Bean
    15. public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
    16. return BindingBuilder
    17. .bind(fanoutQueue1)
    18. .to(fanoutExchange);
    19. }
    20. // fanout.queue2
    21. @Bean
    22. public Queue fanoutQueue2(){
    23. return new Queue("fanout.queue2");
    24. }
    25. // 绑定队列2到交换机
    26. @Bean
    27. public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
    28. return BindingBuilder
    29. .bind(fanoutQueue2)
    30. .to(fanoutExchange);
    31. }
    32. }

     步骤2:在consumer服务声明两个消费者

    1. @Component
    2. public class SpringListener {
    3. @RabbitListener(queues = "fanout.queue1")
    4. public void listenFanoutQueue1(String msg) {
    5. System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    6. }
    7. @RabbitListener(queues = "fanout.queue2")
    8. public void listenFanoutQueue2(String msg) {
    9. System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    10. }
    11. }

    步骤3:在publisher服务发送消息到FanoutExchange

    1. @RunWith(SpringRunner.class) // 开启注解
    2. @SpringBootTest
    3. public class PublisherApplicationTests {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. @Test
    7. public void testSendFanoutExchange() {
    8. // 交换机名称
    9. String exchangeName = "itcast.fanout";
    10. // 消息
    11. String message = "hello, every one!";
    12. // 发送消息
    13. rabbitTemplate.convertAndSend(exchangeName, "", message);
    14. }
    15. }

    总结:

    交换机的作用是什么?

    ①接收publisher发送的消息

    ②将消息按照规则路由到绑定的队列

    ③不能缓存消息,路由失败,消息丢失

    ④FanoutExchange将消息路由到每个绑定的队列

    声明队列,交换机,绑定关系的Bean是什么?

    Queue,FanoutExchange,Binding

    发布订阅-DirectExchange

    DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式

    ①每一个Queue都与Exchange设置一个BindingKey

    ②发布者发送消息时,指定消息的RoutingKey

    ③Exchange将消息路由到BindingKey与消息RoutingKey一致的队列

    案例:利用SpringAMQP演示DirectExchange的使用

    实现思路:

    ①利用@RabbitListener声明Exchange,Queue,RoutingKey

    ②在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

    ③在publisher中编写测试方法,向itcast. direct发送消息

    步骤1:在consumer服务声明Exchange、Queue

    1. @Component
    2. public class SpringListener {
    3. @RabbitListener(bindings=@QueueBinding(
    4. value = @Queue(name = "direct.queue1"),
    5. exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
    6. key = {"red","blue"}
    7. ))
    8. public void listenFanoutQueue1(String msg) {
    9. System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    10. }
    11. @RabbitListener(bindings=@QueueBinding(
    12. value = @Queue(name = "direct.queue2"),
    13. exchange = @Exchange(name = "itcast.direct",type = ExchangeTypes.DIRECT),
    14. key = {"red","yellow"}
    15. ))
    16. public void listenFanoutQueue2(String msg) {
    17. System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    18. }
    19. }

    步骤2:在publisher服务发送消息到DirectExchange

    1. @RunWith(SpringRunner.class) // 开启注解
    2. @SpringBootTest
    3. public class PublisherApplicationTests {
    4. @Autowired
    5. private RabbitTemplate rabbitTemplate;
    6. @Test
    7. public void testSendFanoutExchange() {
    8. // 交换机名称
    9. String exchangeName = "itcast.direct";
    10. // 消息
    11. String message = "hello, every one!!!";
    12. // 发送消息
    13. rabbitTemplate.convertAndSend(exchangeName, "yellow", message);
    14. }
    15. }

    发布订阅-TopicExchange

    TopicExchangeDirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以 . 分割。

    QueueExchange指定BindingKey时可以使用通配符:

    #:代指0个或多个单词

    *:代指一个单词

    利用SpringAMQP演示TopicExchange的使用,匹配使用

    1. @Component
    2. public class SpringListener {
    3. @RabbitListener(bindings=@QueueBinding(
    4. value = @Queue(name = "direct.queue1"),
    5. exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
    6. key = "china.#"
    7. ))
    8. public void listenFanoutQueue1(String msg) {
    9. System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
    10. }
    11. @RabbitListener(bindings=@QueueBinding(
    12. value = @Queue(name = "direct.queue2"),
    13. exchange = @Exchange(name = "itcast.topic",type = ExchangeTypes.TOPIC),
    14. key = "#.news"
    15. ))
    16. public void listenFanoutQueue2(String msg) {
    17. System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
    18. }
    19. }

    描述下Direct交换机与Topic交换机的差异?

    • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割
    • Topic交换机与队列绑定时的bindingKey可以指定通配符
    • #:代表0个或多个词
    • *:代表1个词
  • 相关阅读:
    Helium自动化框架 元素定位
    【ICLR 2021】半监督目标检测:Unbiased Teacher For Semi-Supervised Object Detection
    自定义实现:头像上传View
    基于YOLOv8深度学习的路面坑洞检测与分割系统【python源码+Pyqt5界面+数据集+训练代码】深度学习实战、目标分割
    C++左值/右值、左值引用&/右值引用&&、移动语义move、完美转发forward
    2、SySeVR环境配置(下)
    63基于matlab的生物地理的优化器(BBO)被用作多层感知器(MLP)的训练器。
    JS操作数组方法学习系列(2)
    java毕业设计—— 基于java+JavaEE+jsp的售后服务管理系统设计与实现(毕业论文+程序源码)——售后服务管理系统
    【Node.js从基础到高级运用】六、创建第一个 Node.js 应用
  • 原文地址:https://blog.csdn.net/jbkjhji/article/details/134052020