默认情况下RabbitMQ退出或由于某种原因崩溃的时候,队列和消息都将丢失,所以我们需要将队列和消息都标记为持久化。
我们之前创建的队列都是非持久化的,rabbitmq如果重启的话,没有持久化的队列会被删除掉,如果要实现队列的持久化,需要在声明队列的时候将durable参数设置为持久化(false 非持久化,true 持久化)。
//消息持久化,需要在声明的时候进行声明
boolean durable = true;
channel.queueDeclare(QueueName.Message_Queue.getName(), durable,false,false,null);
注意:如果报 but current is 'flase' 说明这个队列已经存在过,并且之前声明的队列不是持久化的,需要把原来的队列删除,或者重新创建一个新的不同名称持久化队列。
声明创建之后,可以在UI界面中看到(这时候即使重启rabbitMQ的服务器,队列还是存在的)

想要将消息实现持久化需要修改消息生产者代码,需要在发消息添加MessageProperties.PERSISTENT_TEXT_PLAIN属性,将消息标记为持久化并不能完全保证不会丢失消息。
/**
* 1. 交换机
* 2. 队列名
* 3. 参数 消息持久化(保存在磁盘上)
* 4. 消息
*/
channel.basicPublish("",QueueName.Ack_Queue.getName(), MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
如果需要更强的持久化策略,可以参考后面的发布确认模式。
当采用轮询分发的时候,是按照人人平等原则进行分发消息的,但是在某种场景下,这种策略并不是最好的策略。
比如当同时有两个消费者在处理任务,其中有个消费者1处理任务的速度非常快,而另一个消费者2的处理速度却很慢,这个时候使用轮询就会导致消费者1的工作不饱和,会存在大量闲置状态。而处理慢的消费者却一直在缓慢消费,造成消息堆积,这时候轮询就不太好。
为了避免这种情况,我们可以使用不公平分发。
通过设置参数channel.basicQos(1)来启用不公平分发。
//设置不公平分发 能者多劳 消费者端
int prefetchCount = 1;
channel.basicQos(prefetchCount);

策略:当慢的信道还有任务没有处理完成,就先暂时不要给慢的分配,然后rabbitmq就会将该任务分配给没有那么忙的空闲消费者,但是如果所有的消费者都没有完成手头上的任务,队列还在不停的添加新任务,队列就有可能被撑满,这个时候就能添加新的worker(消费者)或者增加新的存储任务的策略。
作用:为了限制次缓冲区的大小,以避免缓冲区里面无限制的存储未确认消息信息。
可以使用 basic.qos方法设置"预取计数"值来完成,该值定义通道上允许存在的未确认消息的最大数量
这种会增加消费者的RAM消耗(随机存取存储器),应该小心使用具有无限预处理的自动确认模式,或手动确认模式。
例子演示:
生产者 Task02
public class Task02 {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
/**
* 1。 队列名称
* 2. 是否持久化到磁盘 (队列持久化)
* 3. 是否共享
* 4. 是否自动删除
* 5. 队列参数
*/
boolean durable = true;//持久化
channel.queueDeclare(QueueName.Ack_Queue.getName(), durable,false,false,null);
//循环发送消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
/**
* 1. 交换机
* 2. 队列名
* 3. 参数 消息持久化(保存在磁盘上)
* 4. 消息
*/
channel.basicPublish("",QueueName.Ack_Queue.getName(), MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息: "+message);
}
}
}
消费者1 QuickWork(处理速度快) 预取值设置为 5
/**
* Created by IntelliJ IDEA.
* User: LvHaoIT (lvhao)
* Date: 2022/7/19
* Time: 14:46
* 快流程(消费者)
*/
public class QuickWork {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//设置预取值,大于2
int prefetchCount = 5;
channel.basicQos(prefetchCount);
//声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
try {
//通过线程睡眠来模拟快慢线程的情况
Thread.sleep(1000 * 4);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//采用手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
System.out.println("处理接收");
};
//取消消息回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag + "消息被中断");
};
System.out.println("QuickWork(执行时间较短)正在等待接收消息");
//采用手动应答
Boolean AutoAck = false;
channel.basicConsume(QueueName.Ack_Queue.getName(), AutoAck, deliverCallback, cancelCallback);
}
}
消费者2 SlowWork(处理速度慢) 预取值设置为 2
/**
* 慢流程(消费者)
*/
public class SlowWork {
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//设置预取值,大于2
int prefetchCount = 2;
channel.basicQos(prefetchCount);
//声明接收消息
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "UTF-8"));
try {
//通过线程睡眠来模拟快慢线程的情况
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//采用手动应答
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
System.out.println("处理接收");
};
//取消消息回调
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag + "消息被中断");
};
System.out.println("SlowWork(执行时间较长)正在等待接收消息");
//采用手动应答
Boolean AutoAck = false;
channel.basicConsume(QueueName.Ack_Queue.getName(), AutoAck, deliverCallback, cancelCallback);
}
}
演示结果
同时启动,生产者同时发送七条消息
生产端:

快消费端:(处理了五条信息)

慢消费端:(处理了两条信息)

生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传 给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置
basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。
发布确认默认是没有开启的,如果要开启需要调用方法confirmSelect,没当需要使用发布确认模式,都需要在 channel 上调用该方法
//开启队列
channel.queueDeclare(QueueName.Message_Queue.getName(), true,false,false,null);
//开启发布确认模式
channel.confirmSelect()
这是一种简单的确认方式,它是一种同步确认发布的方式,也就是发布一个消息之后,只有它被确认发布后,后续的消息才能继续发布,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
这种确认发布方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
//单个确认
private static void publishMessageIndividually() throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//开启队列
channel.queueDeclare(QueueName.Message_Queue.getName(), true, false, false, null);
//开启发布确认模式
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", QueueName.Message_Queue.getName(), null, message.getBytes());
//单个消息进行确认
boolean flag = channel.waitForConfirms();
//不打印耗时15929ms
// if(flag)
// 打印这句话耗时 20439
// System.out.println(message+"消息发送成功");
// 打印这句话耗时 17249
// System.out.println("消息发送成功");
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条消息 ,单个确认发布总耗时:" + (end - begin) + "ms");
}
与单个的消息的确认发布相比,先发布一批消息然后一起确认可以极大的提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样是阻塞消息的发布。
//批量确认 229ms
private static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//开启队列
channel.queueDeclare(QueueName.Message_Queue.getName(), true, false, false, null);
//开启发布确认模式
channel.confirmSelect();
//开始时间
long begin = System.currentTimeMillis();
//批量发送消息,批量发布确认
int batchSize = 100;
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "";
channel.basicPublish("", QueueName.Message_Queue.getName(), null, message.getBytes());
//判断达到100条消息进行一次确认
if ((i + 1) % batchSize == 0) {
// 发布确认
channel.waitForConfirms();
}
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条消息 ,批量确认发布总耗时:" + (end - begin) + "ms");
}
异步确认发布的编程逻辑比上两个都要复杂,但是性价比最高,无论是可靠性还是效率都是最好的,它是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功。
//异步确认模式
private static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtil.getChannel();
//开启队列
channel.queueDeclare(QueueName.Message_Queue.getName(), true, false, false, null);
//开启发布确认模式
channel.confirmSelect();
//创建线程安全有序的一个哈希表,适用于高并发的情况下
/**
* 1.轻松的将序号与消息进行关联
* 2.轻松批量删除条目,只需要给key
* 3.支持高并发(多线程)
*/
ConcurrentSkipListMap outstandingConfirms = new ConcurrentSkipListMap();
//开始时间
long begin = System.currentTimeMillis();
//消息成功回调函数 deliveryTag消息的标记 ,multiple 是否为批量确认
ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
if (multiple) {
ConcurrentNavigableMap confirmed = outstandingConfirms.headMap(deliveryTag);
confirmed.clear();
} else {
outstandingConfirms.remove(deliveryTag);
}
System.out.println("确认消息" + deliveryTag);
};
//消息失败回调函数
ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
System.out.println("未确认消息标记为:" + deliveryTag + " 未确认消息时是:" + outstandingConfirms.get(deliveryTag));
};
//准备消息的监听器,监听哪些消息成功了 哪些消息失败了
channel.addConfirmListener(ackCallback, nackCallback);
//批量发送消息
for (int i = 0; i < MESSAGE_COUNT; i++) {
String message = i + "qqq";
channel.basicPublish("", QueueName.Message_Queue.getName(), null, message.getBytes());
//1.此处需要记录下所有要发送的消息,消息的总和
outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
}
long end = System.currentTimeMillis();
System.out.println("发布" + MESSAGE_COUNT + "条消息 ,批量确认发布总耗时:" + (end - begin) + "ms");
}
| 模式 | 优点 | 缺点 | 发1000条时间比较 |
|---|---|---|---|
| 单个发布确认(同步) | 实现简单 | 吞吐量有限,速度慢 | 15929ms |
| 批量发布确认(同步) | 实现简单,合理的吞吐量 | 一旦出现问题,很难判断是哪里出现了问题 | 229ms |
| 异步发布确认(异步) | 最佳性能和资源使用,出现问题也可以很好的控制 | 代码实现复杂 | 84ms |