• 初识RabbitMQ


    大家好我是苏麟今天带来rabbitmq.

    RabbitMQ

    RabbitMQ官网 : RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ

    初识MQ

    同步和异步通讯

    微服务间通讯有同步和异步两种方式:

    同步通讯:就像打电话,需要实时响应。

    异步通讯:就像发邮件,不需要马上回复。

    两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与 多个人收发邮件,但是往往响应会有延迟。

    同步通讯

    我们之前学习的Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题

    总结:

    同步调用的优点:

    • 时效性较强,可以立即得到结果

    同步调用的问题:

    • 耦合度高
    • 性能和吞吐能力下降
    • 有额外的资源消耗
    • 有级联失败问题

    异步通讯

    异步调用则可以避免上述问题:

    我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响 应的库存并准备发货。

    在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件 (event),事件中带上订单id。 订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务 即可。

    为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布 者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

    Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线 就像协议一样,让服务间的通讯变得标准和可控。

    好处:

    • 吞吐量提升:无需等待订阅者处理完成,响应更快速
    • 故障隔离:服务没有直接调用,不存在级联失败问题
    • 调用间没有阻塞,不会造成无效的资源占用 耦合度极低,每个服务都可以灵活插拔,可替换
    • 流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

    缺点:

    • 架构复杂了,业务没有明显的流程线,不好管理
    • 需要依赖于Broker的可靠、安全、性能

    好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是我们今天要学习的MQ 技术。

    技术对比:

    MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的 Broker。

    比较常见的MQ实现:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka

     几种常见MQ的对比:

    追求可用性:Kafka、 RocketMQ 、RabbitMQ

    追求可靠性:RabbitMQ、RocketMQ

    追求吞吐能力:RocketMQ、Kafka

    追求消息低延迟:RabbitMQ、Kafka

    快速入门

    安装RabbitMQ

    安装 : RabbitMQ部署指南_踏遍三十六岸的博客-CSDN博客

    MQ的基本结构:

    RabbitMQ中的一些角色:

    • publisher:生产者
    • consumer:消费者
    • exchange个:交换机,负责消息路由
    • queue:队列,存储消息
    • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

    RabbitMQ消息模型

    RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:

    创建Demo工程

    结构如下:

    包括三部分:

    •  mq-demo:父工程,管理项目依赖 springboot工程
    • publisher:消息的发送者
    • consumer:消息的消费者

     引入依赖

    1. com.github.luues
    2. spring-boot-starter-rabbitmq
    3. 1.3.0.5.RELEASE

    入门案例

    简单队列模式的模型图:

    官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

    • publisher:消息发布者,将消息发送到队列
    • queue queue:消息队列,负责接受并缓存消息
    • consumer:订阅队列,处理队列中的消息

    publisher实现

    思路:

    • 建立连接
    • 创建Channel
    • 声明队列
    • 发送消息
    • 关闭连接和channel
    1. package com.sl;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import org.junit.Test;
    6. import java.io.IOException;
    7. import java.util.concurrent.TimeoutException;
    8. public class PublisherTest {
    9. @Test
    10. public void testSendMessage() throws IOException, TimeoutException {
    11. // 1.建立连接
    12. ConnectionFactory factory = new ConnectionFactory();
    13. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    14. factory.setHost("192.168.75.128");
    15. factory.setPort(5672);
    16. factory.setVirtualHost("/");
    17. factory.setUsername("root");
    18. factory.setPassword("root");
    19. // 1.2.建立连接
    20. Connection connection = factory.newConnection();
    21. // 2.创建通道Channel
    22. Channel channel = connection.createChannel();
    23. // 3.创建队列
    24. String queueName = "simple.queue";
    25. channel.queueDeclare(queueName, false, false, false, null);
    26. // 4.发送消息
    27. String message = "hello, rabbitmq!";
    28. channel.basicPublish("", queueName, null, message.getBytes());
    29. System.out.println("发送消息成功:【" + message + "】");
    30. // 5.关闭通道和连接
    31. channel.close();
    32. connection.close();
    33. }
    34. }

    consumer实现

    代码思路:

    • 建立连接
    • 创建Channel
    • 声明队列
    • 订阅消息
    1. package com.sl;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class ConsumerTest {
    6. public static void main(String[] args) throws IOException, TimeoutException {
    7. // 1.建立连接
    8. ConnectionFactory factory = new ConnectionFactory();
    9. // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    10. factory.setHost("192.168.75.128");
    11. factory.setPort(5672);
    12. factory.setVirtualHost("/");
    13. factory.setUsername("root");
    14. factory.setPassword("root");
    15. // 1.2.建立连接
    16. Connection connection = factory.newConnection();
    17. // 2.创建通道Channel
    18. Channel channel = connection.createChannel();
    19. // 3.创建队列
    20. String queueName = "simple.queue";
    21. channel.queueDeclare(queueName, false, false, false, null);
    22. // 4.订阅消息
    23. channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
    24. @Override
    25. public void handleDelivery(String consumerTag, Envelope envelope,
    26. AMQP.BasicProperties properties, byte[]
    27. body) throws IOException {
    28. // 5.处理消息
    29. String message = new String(body);
    30. System.out.println("接收到消息:【" + message + "】");
    31. }
    32. });
    33. System.out.println("等待接收消息。。。。");
    34. }
    35. }

    总结

    基本消息队列的消息发送流程:

    • 1. 建立connection
    • 2. 创建channel
    • 3. 利用channel声明队列
    • 4. 利用channel向队列发送消息

    基本消息队列的消息接收流程:

    • 1. 建立connection
    • 2. 创建channel
    • 3. 利用channel声明队列
    • 4. 定义consumer的消费行为handleDelivery()
    • 5. 利用channel将消费者与队列绑定

    下阶段 : Spring AMQP-CSDN博客

    这期就到这里下期见 !

  • 相关阅读:
    golang的垃圾回收算法之一基本介绍
    CH34X linux驱动安装,参考代码例程
    自然语言处理(NLP)—— 主题建模
    LeetCode240题:搜索二维矩阵II(python3)
    叶子数和深度
    Java学习——Java数组总结
    ClickHouse和Elasticsearch压测对比,谁才是yyds?
    postfix+dovecot+SSL 搭建邮件服务器
    基于ASP.NET的驾校管理系统设计与实现
    秋招还没Offer怎么办?
  • 原文地址:https://blog.csdn.net/sytdsqzr/article/details/133829686