
交换机有四类:
1、Fanout Exchange
投递到所有绑定的队列,不需要规则,不需要匹配,相当于广播、群发;

2、Direct Exchange
根据路由键精确匹配进行路由消息队列;

3、Topic Exchange
通配符匹配,相当于模糊匹配;
# 匹配多个单词,* 匹配一个单词,用 . 隔开的为一个单词:

4、Headers Exchange
基于消息内容中的headers属性进行匹配;


要确保一下四个步骤全部成功;
① 代表消息从生产者发送到Exchange;
② 代表消息从Exchange路由到Queue;
③ 代表消息在Queue中存储;
④ 代表消费者监听Queue并消费消息;
① 代表消息从生产者发送到Exchange;
Confirm确认模式;// 实现接口
implements RabbitTemplate.ConfirmCallback
// 重写方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
② 代表消息从Exchange路由到Queue;
Return返回模式;// 实现接口
implements RabbitTemplate.ReturnsCallback
/**
* 消息从 交换机 --> 到 --> 队列,如果失败了,就会回调该方法
* (失败了才触发该方法,成功是不会触发该方法的)
*
* 比如说磁盘满
*
* @param returned
*/
@Override
public void returnedMessage(ReturnedMessage returned)
③ 代表消息在Queue中存储;
/**
* 声明创建一个FanoutExchange交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME)
.durable(true) // 持久化
.build();
}
/**
* 声明创建一个队列
*
* @return
*/
@Bean
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
}
//消息体
Message message = MessageBuilder.withBody(json.getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT) //消息持久化
.build();
④ 代表消费者监听Queue并消费消息;
@Slf4j
@Component
public class MyRabbitListener {
@RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE_NAME})
public void onMessage(String msg, @Headers Map<String, Object> header, Message message, Channel channel) {
try {
System.out.println("[RabbitListener]接收到的消息: " + msg);
//处理业务
//业务处理成功,手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//业务处理失效,不确认消息,并且重新入队,这样又可以重新消费
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息
// channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
}
幂等性:多次操作,重复操作,对系统不会造成影响; 比如:消息重复发送两次或多次,消息重复消费两次或多次;
保证MQ的幂等性,只要保证消费者不会重复消费相同的消息即可;
setnx 命令:SET id 1 NX,若返回OK,说明该消息之前没有消费过,正常消费;若返回 nil,说明该消息之前已消费过,那就不用处理;死信队列即DLX,全称为Dead-Letter-Exchange,翻译为:死信交换机。当一个消息在队列中变成死信 (dead message) 之后,它能被重新发送到另外一个交换机中,这个交换机就是DLX,绑定到DLX的队列就称为死信队列;
死信队列本身也是一个普通的消息队列,可以通过设置一些参数将其设置为死信队列;
死信队列是一个用于存放无法被消费的消息的队列,这些消息被称为死信,死信队列可以避免消息一直被消费却无法消费成功的情况;

RabbitMQ导致死信的几种原因:
1、消息被拒
2、消息 TTL 过期;
3、队列达到最大长度,即队列满了;

RabbitMQ 本身是没有直接可以使用的延迟队列;要实现延迟队列,一般有两种方式:
使用TTL消息过期 + DLX死信队列来实现;

使用使用 rabbitmq-delayed-message-exchange 延迟插件来实现;

RabbitMQ有三种模式:
1)单机模式
2)普通集群模式

3)镜像集群模式 (高可用)


1、提前预防;
2、应急处理;
3、事后优化;
消息堆积,导致消息大量存储在消息队列中得不到消费,而消息又设置了TTL过期时间,当到达过期时间时,消息被过期丢弃;
解决:
1、让消息不要被堆积;
2、不设置TTL过期时间或者增加过期时间时长;
3、设置死信队列
4、编写临时程序补发消息;