RabbitMQ是使用Erlang语言开发并基于AMQP协议来实现的开源消息队列系统。其主要目的是为了解决传统的消息传输上管理困难,效率不高的问题。
docker pull rabbitmq
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:latest
docker exec -it 108435a3ea5f /bin/bash
rabbitmq-plugins enable rabbitmq_management

Broker:要使用 RabbitMQ 来收发消息,必须要安装一个 RabbitMQ 的服务,可以安装在 Windows 上面也可以安装在 Linux 上面,默认是 5672 的端口。这台安装 RabbitMQ的服务器的机器我们把它叫做 Broker。Vhost:在一个Broker上面划分出多个隔离的环境,这多个环境就可以理解成是Vhost。每个Vhost相当月一个相对独立的RabbitMQ服务器,每个Vhost之间是相互隔离的。一个Vhost里面可以有若干个Exchange和Queue,同一个Vhost里面不能有相同名称的Exchange或者Queue,每个Vhost中的exchange、queue、message不能互通。Connection:无论是生产者还是消费者,都需要和 Broker 建立连接,这个连接就是Connection,这个连接是一个 TCP 的长连接。一个生产者或一个消费者与 Broker 之间只有一个Connection,即只有一条TCP连接。Channel:消息推送使用的通道,如果每一次访问消息队列中间件都建立一个TCP连接的话,那么系统资源会被大量的占用,效率也会降低,所以AMQP提供了Channel机制,共享同一个TCP连接,而一个TCP连接里可以有大量的Channel,不同的Channel之间是完全隔离的。Exchange:交换机,用于接收消息,可根据路由键将消息转发到绑定的队列。Queue:也称作Message Queue,即消息队列,用于保存消息并将他们转发给消费者。Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来,在进行绑定的时候一般会指定一个binding key。Routing Key:一个路由规则,生产者将消息发送到交换机时,会在消息头上携带一个 key,这个 key就是routing key,来指定这个消息的路由规则。Producer:消息生产者,就是投递消息的程序。Consumer:消息消费者,就是接受消息的程序。生产者:1.建立链接–>2.开起信道–>3.发送消息–>4.释放资源
消费者:1.建立链接–>2.开启信道–>3.准备接收消息–>4.broker推送消息–>5.发送确认–>6.释放资源
Name:交换机名称Type:交换机类型,direct,topic,fanout,headersDurability:是否需要持久化,如果持久性,则RabbitMQ重启后,交换机还存在Auto Delete:当最后一个绑定到Exchange上的队列删除后,自动删除该ExchangeInternal:当前Exchange是否用于RabbitMQ内部使用,默认为false。Arguments:扩展参数,用于扩展AMQP协议定制使用。简单模式:是最简单的消息传输模式,它包含一个生产者、一个消费者和一个队列。生产者向队列里发送消息,消费者从队列中获取消息并消费。

工作模式:是指向多个互相竞争的消费者发送消息的模式,由一个生产者、一个队列和多个消费者组成。多个消费者订阅同一个队列,当有消息进入队列时,这多个消费者会竞争去获取队列的消息消费,对于任务过重或任务较多的情况使用工作队列可以提高任务处理速度。

发布/订阅模式:该模式会将消息发送到交换机上,这种模式下交换机的类型为fanout,再根据该交换机所绑定的队列,将要发送的消息存放到这些队列中,最终将被订阅了这些队列的消费者消费掉,这种方式也就导致了一个消息可能会被多个消费者同时消费。

如上图所示:
(1)生产者发送消息到交换机后,消息会同时存放到队列1和队列2中。
路由模式:这种模式也是将消息发送到交换机上,此模式下交换机的类型为direct,且生产者发送消息的时候会携带一个路由键Routing key。只有当这个Routing key与交换机绑定队列的binding key完全匹配时,这个消息就会进入到该binding key所对应的队列中。一个队列可以由不同的binding key来绑定,Routing key与其中任何一个binding key匹配都会进入到与之对应的队列中。 同一个binding key也可以绑定多个不同的队列,这时则跟fanout模式类似,当Routing key与这个binding key匹配时,则消息会进入到与这个binding key对应的多个队列中。

如上图所示:
(1)当Routing key为key1时,消息会存放到队列1和队列3中;
(2)当Routing key为key2或者key3时,消息会存放到队列2中。
主题模式:该模式与路由模式类似也是通过路由键去匹配要发送消息的队列,只不过该方式的Routing key必须具有固定格式:以.间隔的一串单词,比如:com.rabbit.message,且这种格式可以使用通配符*或者#来进行模糊匹配,*可以替代一个单词,#可以替代 0 或多个单词。该模式下的交换机类型为topic,且Routing key 最多不能超过255byte。

如上图所示:
(1)当消息的Routing key为三个单词,且中间的单词为 rabbit 时,消息会存放到队列1中;
(2)当Routing key以 com 开头时,消息会存放到队列2中;
(3)当Routing key为三个单词,且最后一个单词为message的时候,消息会存放到队列3中。
headers模式:该模式下交换机的类型为headers,不依赖于路由键的匹配规则来路由消息,是通过Headers头部来将消息映射到队列的,Headers头部携带一个Hash结构,Hash结构中要求携带一个键"x-match",这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。当发送消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对。如果完全匹配,则路由该消息到此队列中。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串String类型。headers类型的交换器的性能很差,不建议使用。
轮询分发:是RabbitMQ 默认的分发机制,该机制主要是将队列中的消息依次分发给订阅了该队列的多个消费者。例如:一个队列中已经存放了很多的消息,这个对列被A、B、C三个消费者订阅了,第一个消息分发给A后,第二个消息只会在B和C之间选择一个消费者分发,假设第二个消息分发给B后,第三个消息就只能分发给C了,当把所有消费者都分发完一轮后,后面的消息再依次类推的分发。这种叫做轮询分发,也叫做公平分发。某种场景下这种策略并不是很好,不同的消费者处理消息的速度有快有慢的,就会导致这处理速度快的这个消费者很大一部分时间处于空闲状态。不公平分发:RabbitMQ 会根据消费者处理消息速度的快慢来进行消息的分发,消费快的就多分发点消息,消费慢点就少分发点消息,这样消费快的消费者就不用再等消费慢点消费者处理完后才能获取消息,达到能者多劳的效果。可以通过设置参数 channel.basicQos(1)实现不公平分发策略。通过RabbitMq的Web管理页面,可以看到Channels的Prefetch count属性显示为1则表示不公平分发成功。预取值分发:通过channel.basicQos(5)来设置预取值分发的大小,里面的参数5就是预取值,定义通道上允许的未确认消息的最大数量,且这个值要>1才行,=1的话是设置成不公平分发。当消息被消费者接收后,但是没有确认,此时这里就存在一个未确认的消息缓冲区,用于存储非被确认的消息,该缓存区的大小是没有限制的。一旦未确认消息数量达到配置的最大数量,RabbitMQ 将停止向该通道投递更多的消息,除非至少有一个未处理的消息被确认;例如:现在该通道上有A、B、C 三个被消费者消费但是未返回确认信息的消息,且预取值设置的也是3,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack;假设A这个消息刚刚被确认 ACK,此时RabbitMQ 将会感知这个情况到并再发送一条消息到这个通道。如果消费者消费了大量的消息但是没有确认的话,就会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范 围内的值通常可提供最佳的吞吐量。生产端:
消息确认机制:是指生产者投递消息后, 如果Broker收到消息, 则会给我们产生一个应答结果,生产者进行接收应答结果, 用来确定这条消息是否正常发送到Broker, 这种方式也是消息的可靠性投递的核心保障。
(1)常见的消息确认失败的场景:
1. broker在返回应答信息的时候,网络中断,生产端无法接受确认消息。
2. broker返回的应答信息是一些error类的信息,比如磁盘空间已满,无法写入等。
(2)confirm确认机制的实现:
1. 在生产者创建的channel上开启确认模式 : channel.confirmSelect();
2. 在channel上添加监听 : 通过回调channel.addConfirmListener()函数来创建一个ConfirmListener,监听成功和失败的broker应答结果, 根据具体的应答结果对消息进行重新发送, 或记录日志等后续处理。
(3)如果消息的routingKey是不可达,例如:消息在经过当前配置的 exchangeName 或 routingKey 没有找到指定的交换机,或没有匹配到对应的消息队列时,那么broker会返回成功接收的confirm确认消息,因为broker接收到了消息,只是没有符合的队列。如果只有消息确认机制的话,这种不可达的消息也会认为投递成功了,所以RabbitMQ中还有一个消息返回机制。
消息返回机制:主要用于处理一些不可路由的消息,如果消息不可达,则返回一个信号通知生产端,相反,如果消息可达,则不会返回任何信号。
(1)return消息返回机制的实现:通过回调channel.addReturnListener()函数来创建一个ReturnListener,用于监听不可达的消息,然后进行后续的处理。
消息投递失败的重发机制:如果rabbitmq返回ack失败,生产端也无法确认消息是否真的发送成功,也会造成数据丢失。最好的办法是使用rabbitmq的事务机制,但是rabbitmq的事务机制效率极低,每秒钟处理的消息仅几百条,不适合并发量大的场景。或者生产端每次发送消息的时候,将消息存放到数据库或者缓存中,当生产者接收到broker的成功应答之后就删除数据库存储到消息,如果接收到的broker失败的应答就拿出这条信息重新发送。
消费端:
为了保证消息能可靠到达消费端,RabbitMQ也提供了消费端的消息确认机制。消费者在声明队列时,可以指定autoAck参数,当autoAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。所以只要令autoAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
spring.rabbitmq.listener.simple.acknowledge-mode = manual;void basicAck(long deliveryTag, boolean multiple)方法通知broker该消息已经消费成功。void basicNack(long deliveryTag, boolean multiple, boolean requeue)方法或者void basicReject(long deliveryTag, boolean requeue)方法通知broker该消息消费失败。两种方法区别在于basicNack可以批量拒绝多条消息,而basicReject一次只能拒绝一条消息。消息幂等性,其实就是保证同一个消息不被消费者重复消费两次。当消费者消费完消息之后,在返回ack应答确认的时候失败了,会导致生产者接收不到确认消息,往往会将这个消息重新发送给消费端,但是消费端在之前一次就已经完成消费了,如果不能保证幂等性的话,那么消费者就会出现重复消费同一个消息。实现消息幂等性的方案:
生产者每次发送消息的时候会生成一个全局唯一的id放到信息中,每次消费消息之前根据这个全局id去查询db或者redis中是否存在该id的消息信息,如果有,则说明该消息已经消费过,直接返回不再做后续处理;如果没有,则说明该消息未被消费过,继续进行后续业务处理,处理成功之后再将该全局id插入到bd或者redis中。


缺点:
(1) 性能开销大,消息需要同步到所有机器上,导致网络带宽压力和消耗很大。
(2)非分布式,没有扩展性,如果 queue 的数据量大到这个机器上的容量无法容纳了,这时候集群也无法在接收消息了。
RabbitMQ的持久化队列分为:队列持久化、消息持久化、交换机持久化。
队列持久化:队列持久化后重启rabbit-server服务后该队列依然存在,而非持久化的队列则会丢失。// 参数1:名字
// 参数2:是否持久化,
// 参数3:独du占的queue,
// 参数4:不使用时是否自动删除,
// 参数5:其他参数
channel.queueDeclare(queueName,true,false,false,null);
(2)持久化的队列在web控制台中有一个D的标记。

2. 消息持久化:RabbitMQ默认把消息放在内存中是为了加快传输和消费的速度,消息持久化后将会存入磁盘中。当消息持久化后,重启rabbit-server服务,消息依然存在,而非持久化的消息则会丢失。消息持久化必须保证所在的队列也是持久化的,且将消息标记为持久化并不能完全保证不会丢失消息。因为当消息刚准备存储在磁盘的时候,但是还没有存储完,消息还在缓存的一个间隔点,此时宕机会导致消息丢失。
// 参数1:交换机的名字
// 参数2:队列名
// 参数3:是否进行消息持久化,设置为MessageProperties.PERSISTENT_TEXT_PLAIN代表消息持久化
// 参数4:发送消息的内容
channel.basicPublish(exchangeName, queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
交换机持久化:和队列一样,交换机也需要在定义的时候设置持久化的标识,否则在rabbit-server服务重启以后将丢失。// 参数1:交换机的名字
// 参数2:交换机的类型,topic/direct/fanout/headers
// 参数3:是否持久化
channel.exchangeDeclare(exchangeName,direct,true);
Rabbitmq事务与db中的事务类似,db中的事务是为了保证一系列的数据库操作要么全部执行成功,要么都不执行,而 Rabbitmq事务则是为了保证生产者发送的消息要么都发送到Rabbitmq服务上,要么都不发送。Rabbitmq事务相关的三个方法txSelect(),txCommit()以及txRollback()。
(1)txSelect():用于将当前channel设置成transaction模式,通过调用tx.select方法开启事务模式。
(2)txCommit():用于提交事务。
(3)txRollback():发生异常时,回滚事务。
在生产者发送消息的时候开启事务相关代码:
try {
channel.txSelect();//开始事务
channel.basicPublish(EXCHANGE_NAME,QUEUE_NAME,null,message.getBytes());
channel.txCommit();//提交事务
}catch (Exception e){
channel.txRollback();//回滚事务
}
如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
死信队列:主要用于存放那些无法被消费的消息队列。当queue消息队列中的消息由于一些原因一直没办法被消费者消费掉的话,这个消息就会从这个正常的消息队列转移到死信消息队列中。
//声明普通、死信交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.Direct);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.Direct);
//声明普通队列,通过参数设置死信交换机,死信RoutingKey
Map paramMap = new HashMap<>();
paramMap.put("x-dead-letter-exchange", DEAD_EXCHANGE);
paramMap.put("x-dead-letter-routing-key", "dead_routingkey");
channel.qunueDeclare(NORMAL_QUEUE, false,false,false,paramMap);
//声明死信队列
channel.qunueDeclare(DEAD_QUEUE, false,false,false,null);
//绑定普通交换机与普通队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE,"normal_routingkey");
//绑定死信交换机与死信队列
channel.qunueBind(DEAD_QUEUE, DEAD_EXCHANGE,"dead_routingkey");