• RabbitMQ快速上手及讲解


    前言:在介绍RabbitMQ之前,我们先来看下面一个场景:

    1.1.1.1 异步处理
    场景说明:
    用户注册后,需要发注册邮件和注册短信,传统的做法有两种

    1.串行的方式
    (1)串行方式:将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西。
    在这里插入图片描述

    (2)并行方式:将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
    在这里插入图片描述

    假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并性已经提高的处理时间,但是,前面说过邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。
    (3)消息队列
    引入消息队列后,把发送邮件,短信不是必须的业务逻辑异步处理
    在这里插入图片描述

    由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。

    1.1.2 应用解耦
    场景:

    双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口。
    在这里插入图片描述

    这种做法有一个缺点:

    • 当库存系统出现故障时,订单就会失败。
    • 订单系统和库存系统高耦合。

    引入消息队列
    在这里插入图片描述

    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

    库存系统:订阅下单的消息,获取下单消息,进行库操作。
    就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

    1.1.3 流量削峰
    流量削峰一般在秒杀活动中应用广泛

    场景:

    秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

    作用:
    1、可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^)
    2、可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单)

    1、用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面。

    2、秒杀业务根据消息队列中的请求信息,再做后续处理。

    常见MQ产品
    ActiveMQ:基于JMS

    RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

    RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会

    Kafka:分布式消息系统,高吞吐量

    一、RabbitMQ快速入门

    RabbitMQ是由erlang语言开发,基于AMQP(Advanced Message Queue 高级消息队列协议)协议实现的消息队列,它是一种应用程序之间的通信方法,消息队列在分布式系统开发中应用非常广泛。RabbitMQ官方地址:http://www.rabbitmq.com

    二、使用步骤

    1.pom.xml里导入相关的依赖:

        
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.boot
            spring-boot-starter-web
        
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2.application.yml配置文件:

      #rabitMq配置
      rabbitmq:
        host: 120.25.228.68
        port: 5672
        virtual-host: /
        connection-timeout: 10000
        #开启Confirm机制
        publisher-confirm-type: correlated
        #开启Return机制
        publisher-returns: true
        #开启ACK
        listener:
          type: simple
          simple:
            #每次能接收10个消息
            prefetch: 10
            acknowledge-mode: manual
          direct:
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    3.rabbitMq交换机讲解:

    1、交换机介绍:
    先附加下官网文档。RabbitMQ的交换机类型共有四种,是根据其路由过程的不同而划分成的:

    2、交换机模式
    一、Direct Exchange(直连交换机)
    在这里插入图片描述
    直连型交换机背后的路由算法很简单——消息会传送给绑定键与消息的路由键完全匹配的那个队列。 我们用直连交换机取代了只会无脑广播的扇形交换机,并且具备了选择性接收消息的能力。

    这种配置下,我们可以看到有两个队列Q1、Q2绑定到了直连交换机X上。第一个队列用的是橘色(orange)绑定键,第二个有两个绑定键,其中一个绑定键是黑色(black),另一个绑定键是绿色(green)。在此设置中,发布到交换机的带有橘色(orange)路由键的消息会被路由给队列Q1。带有黑色(black)或绿色(green)路由键的消息会被路由给Q2。其他的消息则会被丢弃。

    二、 Fanout Exchange(扇型交换机)

    在这里插入图片描述
    Fanout Exchange(扇型交换机):当一个Msg发送到扇形交换机X上时,则扇形交换机X会将消息分别发送给所有绑定到X上的消息队列。扇形交换机将消息路由给绑定到自身的所有消息队列,也就是说路由键在扇形交换机里没有作用,故消息队列绑定扇形交换机时,路由键可为空。这个模式类似于广播。

    三、Topic Exchange(主题交换机)
    在这里插入图片描述
    1)路由键和绑定键命名

    消息路由键—发送到主题交换机的消息所携带的路由键(routing_key)不能随意命名——它必须是一个用点号分隔的词列表。当中的词可以是任何单词,不过一般都会指定一些跟消息有关的特征作为这些单词。列举几个有效的路由键的例子:“stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。只要不超过255个字节,词的长度由你来定。
    绑定键(binding key)也得使用相同的格式。主题交换机背后的逻辑跟直连交换机比较相似——一条携带特定路由键(routing key)的消息会被投送给所有绑定键(binding key)与之相匹配的队列。尽管如此,仍然有两条与绑定键相关的特殊情况:

    • (星号) 能够替代一个单词。

      #(井号) 能够替代零个或多个单词。

    (2)示例解析,如上图:
    我们将会发送用来描述动物的多条消息。发送的消息包含带有三个单词(两个点号)的路由键(routing key)。路由键中第一个单词描述速度,第二个单词是颜色,第三个是品种: “<速度>.<颜色>.<品种>”。我们创建三个绑定:Q1通过".orange.“绑定键进行绑定,Q2使用”…rabbit" 和 “lazy.#”。

    队列绑定键解释:

    Q1针对所有的橘色orange动物。
    Q2针对每一个有关兔子rabbits和慵懒lazy的动物的消息。
    消息路由键解释:

    一个带有"quick.orange.rabbit"路由键的消息会给两个队列都进行投送。消息"lazy.orange.elephant"也会投送给这两个队列。
    另外一方面,“quick.orange.fox” 只会给第一个队列。"lazy.pink.rabbit"虽然与两个绑定键都匹配,但只会给第二个队列投送一遍。“quick.brown.fox” 没有匹配到任何绑定,因此会被丢弃掉。
    (3)异常情况
    如果我们破坏规则,发送的消息只带有一个或者四个单词,例如 “orange” 或者 "quick.orange.male.rabbit"会发生什么呢?结果是这些消息不会匹配到任何绑定,将会被丢弃。另一方面,“lazy.orange.male.rabbit”即使有四个单词,也会与最后一个绑定匹配,并 被投送到第二个队列。

    (4)注意事项
    主题交换机非常强大,并且可以表现的跟其他交换机相似。

    当一个队列使用"#"(井号)绑定键进行绑定。它会表现的像扇形交换机一样,不理会路由键,接收所有消息。
    当绑定当中不包含任何一个 “*” (星号) 和 “#” (井号)特殊字符的时候,主题交换机会表现的跟直连交换机一毛一样。

    4.在rabbitMq创建好交换机:

    这里创建了三种模式的交换机,分为直连模式(direct),广播模式(fanout),主题模式(topic)
    在这里插入图片描述

    5、队列绑定交换机配置类

    /**
     * @author :Mr.ZJW
     * @date :Created 2023/8/30 16:08
     * @description:直连模式交换机配置
     */
    @Configuration
    public class DirectRabbitConfig {
    
        /**
         * 队列
         */
        public static final String DIRECT_QUERY = "directQuery";
    
        /**
         * 交换机
         */
        public static final String DIRECT_EXCHANGE = "directExchange";
    
        /**
         * 配置队列
         */
        @Bean
        public Queue directQuery() {
            return new Queue(DIRECT_QUERY, true);
        }
    
        /**
         * 配置交换机
         */
        @Bean
        public DirectExchange directExchange() {
            return new DirectExchange(DIRECT_EXCHANGE);
        }
    
        /**
         * 绑定交换机
         */
        @Bean
        public Binding bindingPushPassengerInfoToDriverExchange(@Qualifier(DIRECT_QUERY) Queue queue,
                                                                @Qualifier(DIRECT_EXCHANGE) Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with(DIRECT_QUERY).noargs();
        }
    
        /**
         * 乱码配置
         */
        @Bean
        public MessageConverter getMessageConverter() {
            return new Jackson2JsonMessageConverter();
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    6、消息生产者配置类:

    /**
     * @Author: Mr.ZJW
     * @Date: 2022-07-12 9:30
     * @Description: MQ消息发送
     */
    @Configuration
    public class MqProduct {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送消息 (直连模式)
         *
         * @param msg        消息
         * @param routingKey 消息队列Key
         * @return true or false
         */
        public boolean mqDirectSend(String msg, String routingKey) {
            rabbitTemplate.convertAndSend(DirectRabbitConfig.DIRECT_EXCHANGE, DirectRabbitConfig.DIRECT_QUERY, msg, new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //消息持久化
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                }
            });
            return true;
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    7、消息消费者配置类:

    /**
     * @Author: Mr.ZJW
     * @Date: 2022-07-08 16:29
     * @Description: MQ消费者
     */
    @Slf4j
    @Component
    public class ProductConsumer {
    
        /**
         * 监听司机信息给乘客
         *
         * @param msg
         * @param message
         * @param channel
         * @throws IOException
         */
        @RabbitListener(queues = "directQuery")
        public void product(String msg, Message message, Channel channel) throws IOException {
            try {
                log.info("receiver success  ProductConsumer:{}", msg);
                log.info("唯一标识:{}", message.getMessageProperties().getCorrelationId());
                //手动ack
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    这里配置类,不完整,可根据自行调整,交换机以及队列可动态配置,配置到配置类等。

  • 相关阅读:
    后端服务架构的不同与区别
    【蓝桥杯】蓝桥杯双周赛第二场ABCD题
    Safari 背景为绿色,设置 Safari 访问页面时的背景颜色 theme-color
    【树莓派 Pico 基于Arduino IDE编程开发】
    06-ServletRequest
    基于SpringBoot的旅游系统
    C++(一)
    Spring 面向切面编程 第4关:AOP实现原理-CgLib动态代理
    MySQL如何查询某个字段含有字母数字的值
    竟然还有人说ArrayList是2倍扩容,今天带你手撕ArrayList源码
  • 原文地址:https://blog.csdn.net/javaeEEse/article/details/132595176