Brocker:消息队列服务器实体
Exchange(消息交换机):它指定消息按什么规则,路由到哪个队列。
Queue(消息队列载体):每个消息都会被投入到一个或多个队列。
Binding(绑定):它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key(路由关键字):exchange根据这个关键字进行消息投递;
vhost:权限数据隔离。
Producer(消息生产者):就是投递消息的程序。
Consumer(消息消费者):就是接受消息的程序;
简单收发模式,其中一个生产者一个消费者,一个队列。也称为点对点模式
一个消息生产者,一个交换机,一个消息队列,多个消费者。
生产者P发送消息到队列,多个消费者C消费队列的数据。
工作流队列也被称为公平性队列模式,RabbitMQ将按顺序将每条消息发送给笑一个消费者,每个消费者将获得相同数量的消息。
无选择接收消息,一个生产者,一个Fanout交换机,多个队列,多个消费者。
在应用中,需要将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。
生产者P只需吧消息发送到交换机X上,绑定这个交换机的队列都会获得一份一样的数据。
在发布订阅模式的基础上,有选择的接收消息,也是通过routing理由进行匹配条件是否满足接收消息,Direct交换机。
生产者P发送数据是要指定交换机(X)和routing发送消息 ,指定的routingKey=error,则队列Q1和队列Q2都会有一份数据,如果指定routingKey=into,或=warning,交换机(X)只会把消息发到Q2队列。
topics(主题)模式跟routing路由模式类似,只不过路由模式是指定固定的路由键 routingKey,而主题模式是可以模糊匹配路由键 routingKey,类似于SQL中 = 和 like 的关系。
没匹配routingKey的消息将会被丢弃。
* 代表一个词,# 代表零个或多个
与上面其他5种所不同之处,该模式是拥有请求/回复的。也就是有响应的,上面5种都没有。
RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的处理业务,处理完后然后在A服务器继续执行下去,把异步的消息以同步的方式执行。
一般都不会选用 RabbitMQ 的 RPC 模式,因为有专门进行远程调用的框架如 Dubbo,用起来会更加方便。
集群方面:
启动
RabbitMQ 集群对延迟非常敏感,应当只在本地局域网内使用。在广域网中不应该使用集群。
普通集群,又称为标准集群,具备以下特征:
使用docker搭建rabbitmq集群
docker pull rabbitmq:3.9.15-management
# 运行5672
docker run -d --name rabbitmq5672 \
-p 5672:5672 -p 15672:15672 \
-v /Users/lzq/docker/rabbitmq5672/data:/var/lib/rabbitmq \
-v /Users/lzq/docker/rabbitmq5672/log:/var/log/rabbitmq \
-v /Users/lzq/docker/rabbitmq5672/rabbitmq_delayed_message_exchange-3.9.0.ez:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez \
--hostname rabbitmq5672 \
-e RABBITMQ_DEFAULT_VHOST=localhost \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
-e RABBITMQ_LOGS=/var/log/rabbitmq/rabbitmq.log \
--net=my_net \
-v /etc/localtime:/etc/localtime \
rabbitmq:3.9.29-management
# 运行5673
docker run -d --name rabbitmq5673 \
-p 5673:5672 -p 15673:15672 \
-v /Users/lzq/docker/rabbitmq5673/data:/var/lib/rabbitmq \
-v /Users/lzq/docker/rabbitmq5673/log:/var/log/rabbitmq \
-v /Users/lzq/docker/rabbitmq5673/rabbitmq_delayed_message_exchange-3.9.0.ez:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez \
--hostname rabbitmq5673 \
-e RABBITMQ_DEFAULT_VHOST=localhost \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
-e RABBITMQ_LOGS=/var/log/rabbitmq/rabbitmq.log \
--net=my_net \
-v /etc/localtime:/etc/localtime \
rabbitmq:3.9.29-management
# 运行5674
docker run -d --name rabbitmq5674 \
-p 5674:5672 -p 15674:15672 \
-v /Users/lzq/docker/rabbitmq5674/data:/var/lib/rabbitmq \
-v /Users/lzq/docker/rabbitmq5674/log:/var/log/rabbitmq \
-v /Users/lzq/docker/rabbitmq5674/rabbitmq_delayed_message_exchange-3.9.0.ez:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez \
--hostname rabbitmq5674 \
-e RABBITMQ_DEFAULT_VHOST=localhost \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root \
-e RABBITMQ_LOGS=/var/log/rabbitmq/rabbitmq.log \
--net=my_net \
-v /etc/localtime:/etc/localtime \
rabbitmq:3.9.29-management
进入第一个节点
docker exec -it rabbitmq5672 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit
进入第二个节点
docker exec -it rabbitmq5673 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq5672
rabbitmqctl start_app
exit
进入第三个节点
docker exec -it rabbitmq5674 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@rabbitmq5672
rabbitmqctl start_app
exit
**rabbitmqctl join_cluster {cluster_node} [–ram]**表示将节点加入指定集群中。在这个命令执行前需要停止RabbitMQ应用并重置节点。参数“–ram”表示同步 rabbit@rabbitmq01的内存节点,忽略此参数默认为磁盘节点。
rabbitmqctl join_cluster rabbit@rabbitmq5672
移除节点
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
rabbitmqctl forget_cluster_node rabbit@rabbitmq5674
rabbitmqctl join_cluster rabbit@rabbitmq5672
rabbitmqctl forget_cluster_node --offline rabbit@rabbitmq5672
docker pull haproxy
docker run -d \
-p 5671:5671 \
-p 15671:15671 \
-p 5670:5670 \
-p 15670:15670 \
--net=my_net \
--hostname haproxy \
--name haproxy \
-v /Users/lzq/docker/haproxy/haproxy.cfg:/usr/local/etc/haproxy/haproxy.cfg:ro haproxy
在docker的宿主机上编写haproxy.cfg
文件如下
# 全局配置
global
#定义全局的syslog服务器
log 127.0.0.1 local0 info
#每个haproxy进程可以接受的最多并发数
maxconn 4096
#让haproxy以守护进程的方式工作于后台
daemon
#默认参数的配置部分
defaults
log global
#工作模式 http ,tcp 是 4 层,http是 7 层
mode tcp
option tcplog
option dontlognull
#健康检查。3次连接失败就认为服务器不可用,主要通过后面的check检查
retries 3
maxconn 2000
#ha服务器与后端服务器连接超时时间
timeout connect 5s
#客户端超时
timeout client 120s
#服务端超时
timeout server 120s
listen rabbitmq_cluster
#监听地址
bind :5671
#工作模式
mode tcp
#负载均衡方法轮询
balance roundrobin
server rabbit-node01 10.60.57.62:5672 check inter 5000 rise 2 fall 3 weight 1
server rabbit-node02 10.60.57.62:5673 check inter 5000 rise 2 fall 3 weight 1
server rabbit-node03 10.60.57.62:5674 check inter 5000 rise 2 fall 3 weight 1
listen rabbitmq_manager
#监听地址
bind :15671
#工作模式
mode tcp
#负载均衡方法轮询
balance roundrobin
server rabbit-node01 10.60.57.62:15672 check inter 5000 rise 2 fall 3 weight 1
server rabbit-node02 10.60.57.62:15673 check inter 5000 rise 2 fall 3 weight 1
server rabbit-node03 10.60.57.62:15674 check inter 5000 rise 2 fall 3 weight 1
跟普通集群模式相比,该模式加入镜像队列 ,镜像模式有以下特征:
镜像模式通过数据同步和主节点切换提供了更高的可用性和数据冗余,适合对数据可用性有要求较高的应用场景
有三种模式,使用不同的参数来定义镜像策略
可以精确控制队列在集群中的副本数量。例如:如果将ha-params
设置为2,表示每个队列将有2个副本,启动一个是主节点,另一个是镜像节点。如果集群中的节点数不足以维护所需的副本数,队列将被镜像到所有节点。如果有足够多的节点,但其中某些节点出现故障,将在其他节点上创建新的镜像。
配置命令如下:
#进入任意一个rabbitmq节点,执行
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
队列将在集群中的所有节点之间进行镜像,队列将镜像到任何新加入的节点。
将队列镜像到所有的节点会增加额外的压力,包括网络I/O、磁盘I/O和磁盘空间的使用。因此,不建议使用all模式
配置命令如下:
#进入任意一个rabbitmq节点,执行
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
可以明确指定队列应该创建在哪些节点上。如果执行的节点全部存在,队列将在这些节点说上创建。如果指定的节点在集群中存在,但是某些节点不可用,队列将在当前客户端连接到的节点上创建。如果指定的节点在集群中不存在,可能会引发异常。
配置命令如下:
#进入任意一个rabbitmq节点,执行
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq5672", "rabbit@rabbitmq5673"]}'
进入任意一个rabbitmq节点,执行
rabbitmqctl set_policy -p localhost ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
rabbitmqctl set_policy -p /study ha "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'
可以设置镜像队列,"^"表示匹配所有队列,-p表示针对当前vhost,即所有队列在各个节点上都会有备份。在集群中,只需要在一个节点上设置镜像队列,设置操作会同步到其他节点。
注意:例如:队列A存在主节点rabbit5672,镜像节点rabbit5673、rabbit5674。如果rabbit5672下线,rabbit5673和rabbit5674会升成主节点,当rabbit5672节点重新上线后,会变为镜像节点。
尽管镜像模式能够做到主从复制,但是并不是强一致的,因此可能还是会导致数据的丢失。
仲裁队列是 3.8 版本以后才有的新功能,用来替代镜像队列,属于主从模式,支持基于 Raft 协议强一致的主从数据同步。
具有以下特点:
添加仲裁队列的方式非常简单,只需要在创建队列的使用指定队列的类型为 Quorum
即可
此模式是作用在生产者端的,开启了这个模式就可以知道消息有没有发送到交换机上,不管有没有发送到都会触发回调方法。
publisher-confirm-type: correlated
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("confirm,correlationData:" + correlationData);
System.out.println("confirm,ack:" + ack);
System.out.println("confirm,cause:" + cause);
});
作用于生产者端的,这个模式iuu是知道消息有没有发送到对应的队列上,如果没有发送到了对接的队列才会触发回调方法
#设置交换机确认发布模式,默认为禁用
publisher-confirm-type: correlated
#退回消息
publisher-returns: true
rabbitTemplate.setReturnsCallback(returned -> {
System.out.println("returnedMessage,getMessage:" + returned.getMessage());
System.out.println("returnedMessage,getExchange:" + returned.getExchange());
System.out.println("returnedMessage,getRoutingKey:" + returned.getRoutingKey());
System.out.println("returnedMessage,getReplyCode:" + returned.getReplyCode());
System.out.println("returnedMessage,getReplyText:" + returned.getReplyText());
});
死信队列,英文缩写:DLX 。DeadLetter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
消息成为死信的三种情况
死信消息的三种处理方式
队列绑定死信交换机:
给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", exchange_dead);
arguments.put("x-dead-letter-routing-key", routing_dead_routing_key);
return new Queue(queue_dead, true, false, false, arguments);
延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息”是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。
在RabbitMQ中并未提供延迟队列功能,但是可以使用:TTL+死信队列 组合实现延迟队列的效果。
没有安装延迟插件报错如下:
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; protocol method: #method(reply-code=503, reply-text=COMMAND_INVALID - invalid exchange type 'x-delayed-message', class-id=40, method-id=10)
at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:66)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:505)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:296)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:144)
... 133 common frames omitted
先确定docker中rabbitmq的版本号,如果镜像中没有版本号,使用命令docker inspect rabbitmq:management
查看RABBITMQ_VERSION
字段。根据当前版本下载延迟队列插件。
下载RabbitMQ延迟插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
rabbitmq-delayed-message-exchange插件为交换机提供了新的类型:x-delayed-message
开启延迟插件命令:
#进入容器中执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在管理页面上Exchanges新增页面能够看到x-delayed-message
类型后即可
插件的禁用要慎重,以下方式可以实现将插件禁用,但是注意如果此时还有延迟消息未消费,那么禁掉此插件后所有的未消费的延迟消息将丢失。
rabbitmq-plugins disable rabbitmq_delayed_message_exchange
所谓的脑裂问题,就是在多集群中节点与节点之间失联,都认为对方出现故障,而自身裂变为独立的个体,那么久出现了抢夺对方的资源,争抢启动,至此就发生了事故,RabbitMQ
RabbitListenerContainerFactory的代码如下:
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//设置线程并发数,默认1
factory.setConcurrentConsumers(2);
//最大线程数,默认null
factory.setMaxConcurrentConsumers(5);
//设置本地缓存消息数,默认250
factory.setPrefetchCount(1);
//连续成功处理消息数,进行扩容,默认10
factory.setConsecutiveActiveTrigger(3);
//连续空闲数,进行缩容,默认10
factory.setConsecutiveIdleTrigger(3);
//默认等待队列超时时间,默认1000ms
factory.setReceiveTimeout(1000L);
return factory;
}
concurrentConsumers默认为1,即每个Listener容器静静开启一个线程去处理消息
private final class AsyncMessageProcessingConsumer implements Runnable {
@Override // NOSONAR - complexity - many catch blocks
public void run() { // NOSONAR - line count
if (!isActive()) {
this.start.countDown();
return;
}
try {
initialize();
while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
mainLoop();
}
}
}
private void mainLoop() throws Exception { // NOSONAR Exception
try {
if (SimpleMessageListenerContainer.this.stopNow.get()) {
this.consumer.forceCloseAndClearQueue();
return;
}
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
long idleEventInterval = getIdleEventInterval();
if (idleEventInterval > 0) {
if (receivedOk) {
updateLastReceive();
}
else {
long now = System.currentTimeMillis();
long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
long lastReceive = getLastReceive();
if (now > lastReceive + idleEventInterval
&& now > lastAlertAt + idleEventInterval
&& SimpleMessageListenerContainer.this.lastNoMessageAlert
.compareAndSet(lastAlertAt, now)) {
publishIdleContainerEvent(now - lastReceive);
}
}
}
}
每个消费者会在MQ预取一些消息放入内存的LinkedBlockingQueue中进行消费,这个值越高,消息传递的越快,单非顺序处理消息的风险更高。如果ack模式为none,则忽略。将增加此值以匹配txSize或messagePerAck。从2.0开始默认为250;设置为1将还原为以前的行为。
不过在在有些情况下,尤其是处理速度比较慢的大消息,消息可能在内存中大量堆积,消耗大量内存,以及对于一些严格要求顺序的消息,prefetchCount应当设置为1
linux中以守护程序的形式在后台启动
rabbitmq-server -detached
新建一个用户
rabbitmqctl add_user root root
创建一个虚拟环境
rabbitmqctl add_vhost /study
设置管理员角色
rabbitmqctl set_user_tags root administrator
设置权限
rabbitmqctl set_permissions -p /study root “." ".” “.*”