• RabbitMQ第一话 -- docker安装RabbitMQ以及Springboot集成RabbitMQ


    1.RabbitMQ介绍

    RabbitMQ是一个由erlang语言编写的、基于AMQP协议的、开源的消息中间件。支持多种语言使用。
    MQ是消息队列(Message Queue)的简称,是应用与程序的通信管道。

    AMQP:一种提供统一消息服务的应用层标准消息协议,专门为面向消息的中间件设计的。

    工作原理:
    生产者生产消息发送到队列,消费者从队列拿消息并处理。生产者在发送完消息后代表着生产者的业务流程已经完成了,消费者也只需要关心消息后的业务。在众多可异步的场景均可使用到。

    例如注册完成后需要给账户派送虚拟奖品,派送奖品这一业务就可丢给消费者处理

    2.RabbitMQ基于Docker的安装

    • liunx centos7,镜像rabbitmq:3.9-management,docker,docker-compose
    version: '3.7'
    services:
      rabbitmq01:
        image: rabbitmq:3.9-management
        container_name: rabbitmq01
        hostname: rabbithost01
    #    restart: always
        ports:
        #程序连接使用的端口
        - "5672:5672"
    	#管理控制台使用的端口
        - "15672:15672"
        environment:
        - TZ=Asia/Shanghai
        volumes:
        - /root/local/rabbitmq:/var/lib/rabbitmq
        networks:
        - my-net
    networks:
      #新增的网络 内部服务名调用
      my-net:
        external: true
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    启动命令就不贴了,看下启动完成的容器信息

    [root@m rabbitmq]# docker ps -a
    CONTAINER ID        IMAGE                     COMMAND                  CREATED             STATUS                        PORTS                                                                                                         NAMES
    d7875aef4384        rabbitmq:3.9-management   "docker-entrypoint.s…"   12 seconds ago      Up 10 seconds                 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp   rabbitmq01
    
    • 1
    • 2
    • 3

    3.RabbitMQ管理控制台介绍

    从上述内容可以得知,15672是管理后台,直接访问看下http://ip:15672,默认的账后密码是guest/guest,不能外网访问的情况需要去配置文件修改权限。
    这个版本不用去修改,可以在日志中看到

    rabbitmq01    | 2022-06-22 01:51:33.880125+00:00 [info] <0.222.0> Created user 'guest'
    rabbitmq01    | 2022-06-22 01:51:33.881562+00:00 [info] <0.222.0> Successfully set user tags for user 'guest' to [administrator]
    rabbitmq01    | 2022-06-22 01:51:33.883061+00:00 [info] <0.222.0> Successfully set permissions for 'guest' in virtual host '/' to '.*', '.*', '.*'
    
    • 1
    • 2
    • 3

    3.1 登录后的页面是这样的

    在这里插入图片描述
    overview:总览页面,如果是集群,nodes就会显示多个节点的信息
    connections:已连接的客户端
    channel :管道
    exchange :交换机
    queue:队列
    admin:管理信息,里面的virtual-host表示虚拟主机
    一条消息的走向:某一个的虚拟主机中的管道channel -> 交换机exchange -> 分发到queue队列

    3.2 自定义的exchange

    在这里插入图片描述
    可以看到当前绑定的队列,如果有设置key也会显示

    3.3 队列queue

    在这里插入图片描述
    在Bindings这里可以看到绑定的交换机exchange,如果有设置key也会在这里显示

    3.5 Virtual Hosts虚拟主机空间

    在这里插入图片描述
    可以对不同的角色设置不同的虚拟主机空间,在配置中也可以直接配置虚拟主机

    关于虚拟主机、自定义exchange、队列的详细内容会在下篇文章中分享

    4.RabbitMQ在springboot中的应用

    本次demo基于springboot2.5.6,mq的pom依赖如下

    4.1 pom.xml

    <dependency>
    	<groupId>org.springframework.boot</groupId>
    	<artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    4.2 yaml配置

    spring:
      rabbitmq:
        addresses: 192.168.0.221:5672
        username: guest
        password: guest
        virtual-host: /
        #如果想默认指定发送的exchange和key 则打开下面配置
    #    template:
    #      exchange: test_exchange
    #      routing-key: 123456
        listener:
          direct:
            prefetch: 1000000
          simple:
            #手动确认 当有自动确认机制 又手动ACK会报406错误
            acknowledge-mode: manual
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    4.3 生产者代码

    @Autowired
    RabbitTemplate rabbitTemplate;
    
    @GetMapping("/send")
    public String send() {
        JSONObject object = new JSONObject();
        object.put("12345", "22222");
    	//convertAndSend也支持直接传递消息 会走默认配置的交换机和key
        rabbitTemplate.convertAndSend("test_exchange","123456",object.toJSONString());
        return "success";
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    4.4 消费者代码

    使用默认交换机时,发送消息会报ACCESS_REFUSED - 默认交换不允许操作异常。所以这里手动创建交换机和队列并进行绑定。

     @Bean
      public boolean createQueues(@Autowired ConnectionFactory connectionFactory) {
          RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
          rabbitAdmin.declareExchange(new DirectExchange("test_exchange"));
          //如果有需要每次重启都重新创建队列的 可以设置Queue构造方法的autoDelete=true
          rabbitAdmin.declareQueue(new Queue("test_queue"));
          rabbitAdmin.declareBinding(new Binding("test_queue", Binding.DestinationType.QUEUE,
                  "test_exchange", "123456", null));
    	//这里是循环创建队列并关联的代码 有需要可以取用
    //        String exchangeName = "test1234";
    //        for (int i = 0; i < 5; i++) {
    //            String queueName = "test_queue_" + i;
    //            rabbitAdmin.declareExchange(new DirectExchange(exchangeName));
    //            rabbitAdmin.declareQueue(new org.springframework.amqp.core.Queue(queueName));
    //            rabbitAdmin.declareBinding(new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, "key_" + i, null));
    //        }
          return true;
      }
    
      //消息队列监听,可多个
      @RabbitHandler
      @RabbitListener(queues = {"test_queue"})
      public void onMessage(Message message, Channel channel) throws Exception {
          String msgBodyString = new String(message.getBody());
          JSONObject json = JSONObject.parseObject(msgBodyString);
          log.info("消费消息:{},{}", json, message.getMessageProperties().getConsumerQueue());
          //进行消费 只有确认消费了的才不会从新进入队列
          channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
      }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    4.5 测试消息消费

    调用接口发送消息,查看消费者日志

    c.e.rabbitmq.consum.RabbitmqConsume      : 消费消息:{"12345":"22222"},test_queue
    
    • 1

    5.RabbitMQ参数配置详解

    以下数据转=转载自@m0_61174282,仅为记录用。

    spring.rabbitmq.host: 服务Host
    spring.rabbitmq.port: 服务端口
    spring.rabbitmq.username: 登陆用户名
    spring.rabbitmq.password: 登陆密码
    spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost
    spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
    spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s
    spring.rabbitmq.publisher-confirms: 是否启用【发布确认】
    spring.rabbitmq.publisher-returns: 是否启用【发布返回】
    spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时
    spring.rabbitmq.parsed-addresses:
    
    
    # ssl
    spring.rabbitmq.ssl.enabled: 是否支持ssl
    spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
    spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
    spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
    spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
    spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1
    
    
    # cache
    spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
    spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
    spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效
    spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION
    
    
    # listener
    spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器
    spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
    spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
    spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
    spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
    spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
    spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒
    
    spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用
    spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数
    spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
    spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数
    spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔
    spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态
    
    
    # template
    spring.rabbitmq.template.mandatory: 启用强制信息;默认false
    spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
    spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
    spring.rabbitmq.template.retry.enabled: 发送重试是否可用
    spring.rabbitmq.template.retry.max-attempts: 最大重试次数
    spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
    spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数
    spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56

    以上就是本章的全部内容了。

    上一篇:Zookeeper第四话 – zookeeper高可用集群搭建
    下一篇:RabbitMQ第二话 – Springboot基于四种Exchange(Direct、Fanout、Topic、Heders、延时队列)的实现和多虚拟主机下的生产消费者实现

    书山有路勤为径,学海无涯苦作舟

  • 相关阅读:
    linux入门学习17
    数据库中的存储过程、游标、触发器与常用的内置函数
    GPT4到底是何方神圣?
    C++中double类型使用技巧
    dell 720 安装系统
    智能货柜:无人零售行业的新宠
    《PyTorch深度学习实践》第二讲 线性模型 课后练习
    使用Tensorboard碰到AttributeError: module ‘distutils‘ has no attribute ‘version‘
    助力OTT大屏营销,酷开科技引领产业变革与创新
    C语言之自定义类型_结构体篇(1)
  • 原文地址:https://blog.csdn.net/qq_35551875/article/details/125208066