• RabbitMQ


    什么是消息中间件

    消息中间件基于队列模型实现异步/同步传输数据
    作用:可以实现支撑高并发、异步解耦、流量削峰、降低耦合度

    小项目用多线程也可以处理,但有些大型项目有可能会消耗服务器cpu资源资源

    Mq与多线程之间区别

    MQ可以实现异步/解耦/流量削峰问题;
    多线程也可以实现异步,但是消耗到cpu资源,没有实现解耦。

    Mq消息中间件名词

    Producer 生产者:投递消息到MQ服务器端;
    Consumer 消费者:从MQ服务器端获取消息处理业务逻辑;
    Broker MQ服务器端
    Topic 主题:分类业务逻辑发送短信主题、发送优惠券主题
    Queue 存放消息模型队列先进先出后进后出原则数组/链表
    Message 生产者投递消息报文:json

    主流mq区别对比

    特性ActiveMQRabbitMQRocketMQkafka
    开发语言javaerlangjavascala
    单机吞吐量万级万级10万级10万级
    时效性ms级us级ms级ms级以内
    可用性高(主从架构)高(主从架构)非常高(分布式架构)非常高(分布式架构)
    功能特性成熟的产品,在很多公司得到应用;有较多的文档;各种协议支持较好基于erlang开发,所以并发能力很强,性能极其好,延时很低管理界面较丰富MQ功能比较完备,扩展性佳只支持主要的MQ功能,像一些消息查询,消息回溯等功能没有提供,毕竟是为大数据准备的,在大数据领域应用广。

    RabbitMQ如何保证消息不丢失

    1. 生产者角色
      确保生产者投递消息到MQ服务器端成功。
      Ack 消息确认机制
      同步或者异步的形式
      方式1:Confirms
      方式2:事务消息

    2. 消费者角色
      在rabbitmq情况下:
      必须要将消息消费成功之后,才会将该消息从mq服务器端中移除。可以手动签收或自动签收
      在kafka中的情况下:
      不管是消费成功还是消费失败,该消息都不会立即从mq服务器端移除。

    3. Mq服务器端 在默认的情况下 都会对队列中的消息实现持久化
      **持久化硬盘。 **

    4. 使用消息确认机制+持久技术
      A.消费者确认收到消息机制
      channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
      注:第二个参数值为false代表关闭RabbitMQ的自动应答机制,改为手动应答。
      在处理完消息时,返回应答状态,true表示为自动应答模式。
      channel.basicAck(envelope.getDeliveryTag(), false);
      B.生产者确认投递消息成功 使用Confirm机制 或者事务消息
      image.png

    Confirm机制 同步或者是异步的形式

    RabbitMQ基本介绍

    RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件),RabbitMQ服务器是用Erlang语言编写的。

    工作模型

    1. 点对点(简单)的队列
    2. 工作(公平性)队列模式
    3. 发布订阅模式
    4. 路由模式Routing
    5. 通配符模式Topics
    6. RPC

    image.png

    image.png

    RabitMQ工作队列

    默认的传统队列是为均摊消费,存在不公平性;如果每个消费者速度不一样的情况下,均摊消费是不公平的,应该是能者多劳。
    image.png
    采用工作队列
    在通道中只需要设置basicQos为1即可,表示MQ服务器每次只会给消费者推送1条消息必须手动ack确认之后才会继续发送。
    channel.basicQos(1);

    RabbitMQ交换机类型

    • Direct exchange(直连交换机)
    • Fanout exchange(扇型交换机)
    • Topic exchange(主题交换机)
    • Headers exchange(头交换机)

    /Virtual Hosts—区分不同的团队
    ----队列存放消息
    ----交换机路由消息存放在那个队列中类似于nginx
    —路由key 分发规则

    RabbitMQ Fanout 发布订阅

    生产者发送一条消息,经过交换机转发到多个不同的队列,多个不同的队列就多个不同的消费者。
    image.png

    image.png原理:

    1. 需要创建两个队列 ,每个队列对应一个消费者;
    2. 队列需要绑定我们交换机
    3. 生产者投递消息到交换机中,交换机在将消息分配给两个队列中都存放起来;
    4. 消费者从队列中获取这个消息。

    Direct路由模式

    当交换机类型为direct类型时,根据队列绑定的路由建转发到具体的队列中存放消息
    image.png
    生产者

    Connection connection = RabbitMQConnection.getConnection();
    // 创建Channel
    Channel channel = connection.createChannel();
    // 通道关联交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
    String msg = "我是发送的消息";
    channel.basicPublish(EXCHANGE_NAME, "sms", null, msg.getBytes());
    channel.close();
    connection.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    消费者

     // 创建我们的连接
    Connection connection = RabbitMQConnection.getConnection();
    // 创建我们通道
    final Channel channel = connection.createChannel();
    // 关联队列消费者关联队列,并且定义接收的关键词
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg = new String(body, "UTF-8");
            System.out.println("短信消费者获取消息:" + msg);
        }
    };
    // 开始监听消息 自动签收
    channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    Topic主题模式

    当交换机类型为topic类型时,根据队列绑定的路由建模糊转发到具体的队列中存放。
    #号表示支持匹配多个词;
    *号表示只能匹配一个词
    image.png
    生产者

     //  创建Connection
    Connection connection = RabbitMQConnection.getConnection();
    // 创建Channel
    Channel channel = connection.createChannel();
    // 通道关联交换机
    channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
    String msg = "这是我的消息";
    channel.basicPublish(EXCHANGE_NAME, "msg.sms", null, msg.getBytes());
    channel.close();
    connection.close();
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    消费者

    // 创建我们的连接
    Connection connection = RabbitMQConnection.getConnection();
    // 创建我们通道
    final Channel channel = connection.createChannel();
    // 关联队列消费者关联队列
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "meite.*");
    DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            String msg = new String(body, "UTF-8");
            System.out.println("短信消费者获取消息:" + msg);
        }
    };
    // 开始监听消息 自动签收
    channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    SpringBoot整合RabbitMQ

    简单案例

    配置类

    @Component
    public class RabbitMQConfig {
        /**
         * 定义交换机
         */
        private String EXCHANGE_SPRINGBOOT_NAME = "/msg_ex";
        /**
         * 短信队列
         */
        private String FANOUT_SMS_QUEUE = "fanout_sms_queue";
        /**
         * 邮件队列
         */
        private String FANOUT_EMAIL_QUEUE = "fanout_email_queue";
    
        /**
         * 配置smsQueue
         *
         * @return
         */
        @Bean
        public Queue smsQueue() {
            return new Queue(FANOUT_SMS_QUEUE);
        }
    
        /**
         * 配置emailQueue
         *
         * @return
         */
        @Bean
        public Queue emailQueue() {
            return new Queue(FANOUT_EMAIL_QUEUE);
        }
    
        /**
         * 配置fanoutExchange
         *
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange() {
            return new FanoutExchange(EXCHANGE_SPRINGBOOT_NAME);
        }
    
        // 绑定交换机 sms
        @Bean
        public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(smsQueue).to(fanoutExchange);
        }
    
        // 绑定交换机 email
        @Bean
        public Binding bindingEmailFanoutExchange(Queue emailQueue, FanoutExchange fanoutExchange) {
            return BindingBuilder.bind(emailQueue).to(fanoutExchange);
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59

    配置文件

    spring:
      rabbitmq:
        ####连接地址
        host: 127.0.0.1
        ####端口号
        port: 5672
        ####账号
        username: guest
        ####密码
        password: guest
        ### 地址
        virtual-host: /myVirtualHosts
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    生产者

    @RestController
    public class FanoutProducer {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        /**
         * 发送消息
         *
         * @return
         */
        @RequestMapping("/sendMsg")
        public String sendMsg(String msg) {
            /**
             * 1.交换机名称
             * 2.路由key名称
             * 3.发送内容
             */
            amqpTemplate.convertAndSend("/msg_ex", "", msg);
            return "success";
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    消费者

    @Slf4j
    @Component
    @RabbitListener(queues = "fanout_email_queue")
    public class FanoutEmailConsumer {
    
        @RabbitHandler
        public void process(String msg) {
            log.info(">>邮件消费者消息msg:{}<<", msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    Maven依赖

    <dependencies>
    
        <!-- springboot-web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- 添加springboot对amqp的支持 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <!--fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.49</version>
        </dependency>
    
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    生产者如何获取消费结果

    根据业务来定
    消费者消费成功结果:

    1. 能够在数据库中插入一条数据

    2. Rocketmq 自带全局消息id,能够根据该全局消息获取消费结果
      原理:生产者投递消息到mq服务器,mq服务器端在这时候返回一个全局的消息id,
      当我们消费者消费该消息成功之后,消费者会给我们mq服务器端发送通知标记该消息
      消费成功。生产者获取到该消息全局id,每隔2s时间调用mq服务器端接口查询该消息是否
      有被消费成功。

    3. 异步返回一个全局id,前端使用ajax定时主动查询;

    4. 在rocketmq中,自带根据消息id查询是否消费成功

    RabbitMQ死信队列

    死信队列产生的背景

    RabbitMQ死信队列俗称,备胎队列;消息中间件因为某种原因拒收该消息后,可以转移到死信队列中存放,死信队列也可以有交换机和路由key等。

    产生死信队列的原因
    1. 消息投递到MQ中存放 消息已经过期 消费者没有及时的获取到我们消息,消息如果存放到mq服务器中过期之后,会转移到备胎死信队列存放。
    2. 队列达到最大的长度 (队列容器已经满了)
    3. 消费者消费多次消息失败,就会转移存放到死信队列中
    死信队列的架构原理

    死信队列和普通队列区别不是很大
    普通与死信队列都有自己独立的交换机和路由key、队列和消费者。
    区别:
    1.生产者投递消息先投递到我们普通交换机中,普通交换机在将该消息投到
    普通队列中缓存起来,普通队列对应有自己独立普通消费者。

    2.如果生产者投递消息到普通队列中,普通队列发现该消息一直没有被消费者消费
    的情况下,在这时候会将该消息转移到死信(备胎)交换机中,死信(备胎)交换机
    对应有自己独立的死信(备胎)队列对应独立死信(备胎)消费者。
    image.png

    死信队列应用场景

    30分钟订单超时设计
    A. Redis过期key .
    B. 死信延迟队列实现
    采用死信队列,创建一个普通队列没有对应的消费者消费消息,在30分钟过后就会将该消息转移到死信备胎消费者实现消费。备胎死信消费者会根据该订单号码查询是否已经支付过,如果没有支付的情况下则会开始回滚库存操作。

    RabbitMQ消息幂等问题

    RabbitMQ消息自动重试机制

    1. 当我们消费者处理执行我们业务代码的时候,如果抛出异常的情况下在这时候mq会自动触发重试机制,默认的情况下rabbitmq是无限次数的重试。需要人为指定重试次数限制问题

    2. 在什么情况下消费者需要实现重试策略?

    A. 消费者获取消息后,调用第三方接口,但是调用第三方接口失败呢?是否需要重试?该情况下需要实现重试策略,网络延迟只是暂时调用不通,重试多次有可能会调用通。

    B. 消费者获取消息后,因为代码问题抛出数据异常,是否需要重试?该情况下是不需要实现重试策略,就算重试多次,最终还是失败的。可以将日志存放起来,后期通过定时任务或者人工补偿形式。如果是重试多次还是失败消息,需要重新发布消费者版本实现消费可以使用死信队列

    Mq在重试的过程中,有可能会引发消费者重复消费的问题。
    Mq消费者需要解决幂等性问题,幂等性保证数据唯一

  • 相关阅读:
    用 Python 编写 Chrome 扩展赚美刀,通过使用 PyScript 非常轻松(教程含源码)
    Matlab | 找出数组/向量中的重复项的索引
    如何做专利挖掘,关键是寻找专利点,其实并不太难
    桥接设计模式
    VSD Viewer 6.16.1(Visio绘图文件阅读器)
    微服务项目:尚融宝(51)(核心业务流程:充值服务(1))
    Window下Mysql8.0 怎样恢复被删除的Root
    R语言使用lm函数构建多元回归模型(Multiple Linear Regression)、并根据模型系数写出回归方程、使用anova函数给出模型方差分析表
    数据转换工具DBT介绍及实操
    springmvc-页面跳转&表单标签&其他标签&tomcat控制台中文乱码问题
  • 原文地址:https://blog.csdn.net/weixin_40864434/article/details/133804079