工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
假设消息队列中依次加入消息AA BB CC DD,并且此时有工作线程1和工作线程2,工作线程1会接收到消息AA,CC,工作线程2会接收到消息BB,DD
代码示例如下:
建立一个工具类,用于创建channel
public class RabbitMQUtils {
public static Channel getChannerl() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
return connection.createChannel();
}
}
工作线程类:
public class WorkOne {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel=RabbitMQUtils.getChannerl();
//推送的消息如何进行消费的接口回调
//lambda表达式实现
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("接受到的消息:"+message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
System.out.println("工作线程2等待接收消息...");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消息队列类:
public class TaskOne {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel= RabbitMQUtils.getChannerl();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("发送消息完成:"+message);
}
}
}
在运行设置中选择WorkOne可以多线程运行,然后运行两次WokkOne
运行TaskOne,并在控制台输入AA BB CC DD
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。为了保证消息在发送过程中不丢失,rabbitmq
引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了
1. 自动应答
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制, 当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
2. 手动应答
Channel.basicAck: 肯定确认
void basicAck(long var1, boolean var3) throws IOException;
第2个boolean类型的参数用来表示是否批量应答
Channel.basicNack :用于否定确认
Channel.basicReject: 用于否定确认,与 Channel.basicNack 相比少一个参数不处理该消息了直接拒绝,可以将其丢弃了
3. 消息重新入队
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息
4. 手动应答代码示例
TakkOne(发送消息)
package cn.edu.xd.three;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class Task {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel= RabbitMQUtils.getChannerl();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("发送消息完成:" + message);
}
}
}
WoekOne: 工作线程1
package cn.edu.xd.three;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkOne {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel=RabbitMQUtils.getChannerl();
System.out.println("线程1处理消息的时间较短");
//推送的消息如何进行消费的接口回调
//lambda表达式实现
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
Thread.sleep(1000);//睡眠1s
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody());
System.out.println("接受到的消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
System.out.println("工作线程1等待接收消息...");
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
WoekTwo: 工作线程2
package cn.edu.xd.three;
import cn.edu.xd.util.RabbitMQUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkTwo {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel=RabbitMQUtils.getChannerl();
System.out.println("线程2处理消息的时间较长");
//推送的消息如何进行消费的接口回调
//lambda表达式实现
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
Thread.sleep(15*1000);//睡眠15s
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(delivery.getBody());
System.out.println("接受到的消息:"+message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = s -> {
System.out.println("消息被中断");
};
System.out.println("工作线程2等待接收消息...");
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
}
}
运行这三个类,当输入aa时,线程1接收到,当输入bb时,15s后线2接收到,当输入cc时线程1接收到,当输入dd时,在线程2应答之前,关闭线程2,此时消息dd由线程a处理,即消息重新入队了
boolean durable=true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);//参数设置为true表示创建一个持久化的队列
//修改第3个参数使得消息持久化
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
之前的轮循分发是你一个我一个的方式处理消息的,但是按上面的代码,线程1处理一条消息1s, 线程2处理一条消息15s, 线程1处理消息的能力较强,那就让线程1多处理一些消息
//线程1
channel.basicQos(1);//参数为1表示不公平分发
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
//线程2
channel.basicQos(1);//参数为1表示不公平分发
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
可以看到发送了5条消息,线程1处理了4条,线程2处理了1条
假设我已经提前知道线程1处理消息的能力较强,线程2处理消息的能力较弱,可以设置线程1和线程2的channel中最多可以积累多少条消息
//线程1设置5条
channel.basicQos(5);
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
//线程2设置2条
channel.basicQos(2);
boolean autoAck=false;
channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, cancelCallback);
该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。RabbitMQ 将停止在通道上传递更多消息。