• RabbitMQ消息队列快速入门


    RabbitMQ消息队列快速入门

    初始MQ

    MQ全称为Message Queue,即消息队列,是在消息的传输过程中保存消息的容器。它是典型的生产者-消费者模型
    生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。消息的生产和消费都是异步的,可以解耦发送者和接收者之间的通信,提高系统的可扩展性和可靠性

    技术选型

    目比较常见的MQ实现有:

    • ActiveMQ
    • RabbitMQ
    • RocketMQ
    • Kafka
    RabbitMQActiveMQRocketMQKafka
    公司/社区RabbitApache阿里Apache
    开发语言ErlangJavaJavaScala&Java
    协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
    可用性一般
    单机吞吐量一般非常高
    消息延迟微秒级毫秒级毫秒级毫秒以内
    消息可靠性一般一般

    RabbitMQ

    RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
    Messaging that just works — RabbitMQ

    安装

    基于Docker来安装RabbitMQ

    docker pull rabbitmq

    运行

    docker run \
     -e RABBITMQ_DEFAULT_USER=daybreak \
     -e RABBITMQ_DEFAULT_PASS=123456 \
     --name mq \
     --hostname mq \
     -p 15672:15672 \
     -p 5672:5672 \
     -d \
     rabbitmq
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    在安装命令中有两个映射的端口:

    • 15672:RabbitMQ提供的管理控制台的端口
    • 5672:RabbitMQ的消息发送处理接口

    运行成功后,访问http://ip:15672,输入username和password即可进入管理控制台。

    RabbitMQ架构

    • publisher:生产者,也就是发送消息的一方
    • consumer:消费者,也就是消费消息的一方
    • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
    • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。
    • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

    Spring AMQP

    由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。
    但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:Spring AMQP

    SpringAmqp的官方地址:Spring AMQP

    SpringAMQP提供了三个功能:

    • 自动声明队列、交换机及其绑定关系
    • 基于注解的监听器模式,异步接收消息
    • 封装了RabbitTemplate工具,用于发送消息

    交换机

    Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

    交换机的类型有四种:

    • Fanout:广播,将消息交给所有绑定到交换机的队列。
    • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列。
    • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符。
    • Headers:头匹配,基于MQ的消息头匹配,用的较少。

    声明队列和交换机

    基于Bean方式声明
    package com.itheima.consumer.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class FanoutConfiguration {
    
        /**
         * 声明交换机
         * @return
         */
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange("daybreak.fanout");
        }
    
        /**
         * 声明队列
         * @return
         */
        @Bean
        public Queue fanoutQueue(){
            return new Queue("fanout.queue");
        }
    
        /**
         * 绑定队列和交换机
         * @param fanoutQueue3
         * @param fanoutExchange
         * @return
         */
        @Bean
        public Binding FanoutBinding3(Queue fanoutQueue, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    基于注解声明

    声明Direct模式的交换机和队列:

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue", durable = "true"),
            exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "yellow"}
    ))
    public void listenDirectQueue(String msg){
        System.out.println("消费者收到了direct.queue的消息:" + msg);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    声明Topic模式的交换机和队列:

    @RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "topic.queue"),
        exchange = @Exchange(name = "daybreak.topic", type = ExchangeTypes.TOPIC),
        key = "#.news"
    ))
    public void listenTopicQueue(String msg){
        System.out.println("消费者接收到topic.queue的消息:【" + msg + "】");
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    快速入门

    导入依赖

    <!--AMQP依赖,包含RabbitMQ-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    添加配置

    在application.yml中添加配置:

    spring:
      rabbitmq:
        host: 192.168.200.130 # 你的虚拟机IP
        port: 5672 # 端口
        virtual-host: / # 虚拟主机
        username: daybreak # 用户名
        password: 123456 # 密码
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    配置JSON转换器

    Spring的消息发送代码接收的消息体是一个Object:

    在数据传输时,它会把发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
    然而默认情况下Spring采用的序列化方式是JDK序列化。

    JDK序列化存在下列问题:

    • 数据体积过大
    • 有安全漏洞
    • 可读性差

    显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

    使用JSON方式序列化需要引入以下依赖:

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

    配置消息转换器,在服务的启动类中添加一个Bean即可:

    @Bean
    public MessageConverter messageConverter(){
        // 1.定义消息转换器
        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
        jackson2JsonMessageConverter.setCreateMessageIds(true);
        return jackson2JsonMessageConverter;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    接收端

    package com.itheima.consumer.listeners;
    
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MyListener {
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(name = "daybreak.queue", durable = "true"),
                exchange = @Exchange(name = "daybreak.direct", type = ExchangeTypes.DIRECT),
                key = "demo"
        ))
        public void listenDirectQueue(String msg){
            System.out.println("消费者收到了direct.queue的消息:" + msg);
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    发送端

    package com.itheima.publisher;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    public class MyPublisher {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void myTest(){
            String exchangeName = "daybreak.direct";
            String routingKey = "demo";
            String msg = "Hello,RabbitMQ";
            rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21

    启动发送端和接收端后,运行结果如下:

    消费者收到了direct.queue的消息:Hello,RabbitMQ

  • 相关阅读:
    CCF ChinaSoft 2023 论坛巡礼 | CCF-华为胡杨林基金-软件工程专项(海报)论坛
    java计算机毕业设计计算机组成原理教学网站源码+mysql数据库+系统+lw文档+部署
    CDN简介
    Android第三方库的使用
    如何将firebase应用转为supabase应用(之一)
    值类型和引用类型的区别 I 数据结构中的堆和栈和内存中的堆和栈的区别
    一、基本数据类型(数组)
    HummerRisk V0.5:新版云合规报告、资源风险联动、拓扑展示等内容
    为什么说CDN是网站速度优化大师
    初级程序员如何进阶
  • 原文地址:https://blog.csdn.net/weixin_53065229/article/details/134542441