• rabbitmq入门、springboot集成rabbitmq


    前言

    本文主要参考了Understanding AMQP, the protocol used by RabbitMQ
    要学习rabbitmq,就要先学习amqp协议,amqp全称Advanced Message Queuing Protocol (AMQP),rabbitmq实现了amqp,当然ActiveMQ也实现了amqp。amqp中有几个重要的概念,producer(publisher)、exchange、queue、consumer
    在这里插入图片描述
    谈及MQ时,我们还会经常谈及RocketMQ,可参考Why choose RocketMQ

    消息从发送到接收的流程

    producer发送message到指定的exchange,exchange根据message header中的routing-key与队列的binding-key决定绑定到哪个队列;那么consumer怎么获取消息呢?queue push或者consumer pull,这取决于配置。

    消息的结构

    一个消息由header、properties、data3部分组成,header是amq规范规定的类似http规范中的header,比如后边会讲到的routing-key,properties是业务自定义的,data是二进制数据;
    在这里插入图片描述
    通常我们把message分配到queue的过程叫binding,那消息结构跟binding有什么关系呢?exchange拿着message header中的routing-key与queue中的binding-key(每个queue都有一个binding-key)做匹配。一般的consumer创建queue时,会同时创建exchange,并将queue关联到exchange,producer发送到指定exchange,从而实现消息流转;

    exchange类型

    exchange常见的有以下几种类型,

    • direct
      • binding key与routing key完全匹配,*#会当作普通文本处理;
    • topic
      • binding key的*#当作通配符处理,*只能匹配一个word,#匹配零个或多个点号分隔的word;
    • fanout,n. 输出(端数);展(散)开;扇出; 分列账户;
      • routing key和binding key会忽略,收到的消息转发给全部绑定的队列,就是通常所说的广播模式

    比如现有一个message,它的routing key是NYSE.TECH.MSFT,exchange是topic类型,那么匹配queue的binding key结果
    在这里插入图片描述
    以上包含了绝大多数情况。实际上,一个queue可以有多个consumer,一个exchange可以有多个queue,一个queue可以有多个exchange。

    应用场景

    RPC(远程过程调用)

    在这里插入图片描述
    这个过程是这样的

    1. 客户端发送一个message到queue,当然了message的routing key要匹配service。
    2. exchange将message传给service,service做了一些操作并返回响应message给exchange,响应message指定了routing_key以匹配响应queue。
    3. 客户端从响应queue中获取到message

    至于这个过程是blocking还是non-blockding,就取决于client library,无论是哪种都可以做到。AmqpTemplate#sendAndReceive就是对应的这种模式

    发布/订阅

    在这里插入图片描述

    一个消息会被多个消费者消费,也叫广播模式,在activemq中叫topic。

    任务分发

    一个message只会被多个consumer中的一个消费,在activemq中叫p2p(点对点)。
    在这里插入图片描述

    最后来一张自己总结的图在这里插入图片描述

    jms vs amqp

    参考消息队列(为什么要用消息队列,常见消息队列对比,JMS和AMQP谁更好用?),简单来说JMS是java的规范,跟语言是相关的,而amqp是协议,与语言无关,jms提供了点对点发布/订阅两种模式,amq也提供了这两种模式,并在此基础上,对路由提供了多种选择。

    springboot集成rabbitmq

    springboot可以很方便的集成rabbitmq,引入spring-boot-starter-amqp,然后在配置文件中添加配置

    spring:
      rabbitmq:
        host: "localhost"
        port: 5672
        username: "admin"
        password: "secret"
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    发送消息

    @Component
    public class MyBean {
    
        private final AmqpAdmin amqpAdmin;
    
        private final AmqpTemplate amqpTemplate;
    
        public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
            this.amqpAdmin = amqpAdmin;
            this.amqpTemplate = amqpTemplate;
        }
    
        // ...
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    发送重试

    默认的template禁用了重试,可以启用重试,注意这是发送(AmqpTemplate )重试;

      rabbitmq:
        template:
          retry:
            enabled: true
            initial-interval: "2s"
    
    • 1
    • 2
    • 3
    • 4
    • 5

    消费重试

    接收消息

    @Component
    public class MyBean {
    
        @RabbitListener(queues = "someQueue")
        public void processMessage(String content) {
            // ...
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    注意,消费者有重试重入队列两个概念;默认的rabbitmq.listener.simple.retry.enabled=false,重试被禁用;因为rabbitmq.listener.simple.default-requeue-rejected=true,如果listener抛出异常,那么会重入队列,从而会无限重试;如果不想要无限重试,有2种方法解决,

    • 设置defaultRequeueRejected 属性为false,这样的话,消费失败就不会再入队列或者抛出一个AmqpRejectAndDontRequeueException 来通知消息不用再重新入queue。
    • 打开重试并设置最大重试次数。比如设置最大重试次数
    rabbitmq:
        listener:
          simple:
            acknowledge-mode: auto # 超过重试次数后,自动确认,消息会从队列中删除
            retry:
            # 打开重试,设置最大重试次数
              enabled: true
              max-attempts: 3
              max-interval: 6000
              initial-interval: 2000
             # 如果消费失败,不再入队列
            default-requeue-rejected: false
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    发送可以重试,Listener也可以重试,注意区分;

    消息确认、消息丢失

    确认机制指的是,当消费者在处理消息时,如果出错或者网络出错,broker应不应该删除消息的问题。AMQP 0-9-1规范提供了机制供消费者控制,

    • 自动确认,消费者收到发送消息后删除不管消费成功或者失败
    • 手动确认,收到消费者明确返回确认后再删除

    消费者消费时可能会出现处理成功、处理失败、多条处理失败的情况,相应的就有ack、reject、nack(rabbitmq扩展了AMQP 0-9-1规范,自创的)三条命令,跟spring源码中可能不太一样,

    package org.springframework.amqp.core;
    public enum AcknowledgeMode {
    /**
    	 * No acks - {@code autoAck=true} in {@code Channel.basicConsume()}.
    	 */
    	 //自动确认,消费者收到消息后,不管消费成功与否,从broker中删除
    	NONE,
    
    	/**
    	 * Manual acks - user must ack/nack via a channel aware listener.
    	 */
    	 //手动确认,消费者明确返回确认后,才会从broker中删除
    	MANUAL,
    
    	/**
    	 * Auto - the container will issue the ack/nack based on whether
    	 * the listener returns normally, or throws an exception.
    	 * 

    Do not confuse with RabbitMQ {@code autoAck} which is * represented by {@link #NONE} here. */ //手动确认,java实现时自创的,container根据listener执行结果,成功时返回ack、失败时返回nack AUTO; }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    参考Message acknowledgmentAMQP 0-9-1 Message Acknowledgements

    publisher丢了消息

    因为网络的不确定性,可能在publisher发送到broker时,丢了消息,通常的解决方法是publisher发送前写入数据库,如果真的mq server丢失了,再手动补偿;还可能broker在处理message时出错了,导致了消息丢失;

    mq server丢了消息

    如果mq server重启了,那么队列及消息都丢失了,可以配置队列及消息是持久化的,这样重启也不会丢失队列和消息。但这不是绝对的不会丢失,因为mq server收到message会暂时放入cache在合适的时机再写入磁盘,如果真的需要绝对可靠可以考虑publisher confirm

    consumer丢了消息

    默认的,broker发送message给了consumer后,就会立刻删掉message。如果处理这个message要花费一段时间,还未处理完时,consumer终止了,那么这个message就丢失了,还有其他已经收到还未来得及处理的message;怎么解决这种问题呢?consumer 确认机制。处理完message后,发送ack,broker收到ack再删除message,如果一直没收到ack(默认是30分钟),则会放入messages_unacknowledged 队列。

    总结

    rabbitmq中,实现点对点模式还是广播模式的关键在于exchange的类型,

    • 点对点模式
      • exchange使用direct、topic都可以实现
    • 广播模式
      • exchange使用fanout实现
  • 相关阅读:
    js中获取json的值:使用 JSON.parse() 方法
    AMD EPYC(霄龙)Genoa服务器 | 综合评测
    如何将Vapor项目部署到Ubuntu?
    服务器安全怎么保障,主机安全软件提供一站式保护
    超好用的 Windows 效率工具推荐
    FT2004(D2000)开发实战之AMD R5 230显卡驱动适配
    Bean的作用域和生命周期
    设计模式-行为型模式
    python安装第三方包
    使用UEFI固件引导KVM虚拟机
  • 原文地址:https://blog.csdn.net/wangjun5159/article/details/122125149