工作队列(又称为任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。 相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进 程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务
我们启动两个工作线程,一个消息发送线程,一个用来接受线程,我们来看看它们两个工作线程是如何工作的
我们将获取信道这个重复的代码封装为一个类,当时用的时候直接调用
- /**
- * 连接工厂创建信道工具类
- */
- public class RabbitMqUtils {
- public static Channel getChannel(){
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("118.31.6.132");
- factory.setUsername("admin");
- factory.setPassword("123");
- Connection connection = null;
- try {
- try {
- connection = factory.newConnection();
- } catch (IOException e) {
- e.printStackTrace();
- }
- } catch (TimeoutException e) {
- e.printStackTrace();
- }
- Channel channel = null;
- try {
- channel = connection.createChannel();
- } catch (IOException e) {
- e.printStackTrace();
- }
- return channel;
- }
- }
这里使用try...catch防止之后每次调用都需要抛出异常
生产者代码
- /**
- * 生产者 发送大量消息
- */
- public class Task01 {
- //队列名称
- public static final String QUEUE_NAME = "hello";
- //发送大量的消息
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMqUtils.getChannel();
- /*
- *生成一个队列
- * 1.队列名称
- * 2.队列里面的信息是否持久化(磁盘)默认情况时在内存
- * 3.该队列是否只供一个消费者进行消费 是否消费共享 true是允许
- * 4.是否自动删除 最后一个消费者断开连接之后 该队列是否自动删除 true自动删除 false不自动删除
- * 5.其他参数 延迟消息等
- */
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- //从控制台接受消息
- Scanner scanner = new Scanner(System.in);
- /**
- * 发送一个消息
- * 1.发送到那个交换机
- * 2.路由的KEY值是哪个 本次是队列的名称
- * 3.其他参数信息
- * 4.发送消息的消息体
- */
- while(scanner.hasNext()){
- String message = scanner.next();
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
- System.out.println("发送"+message+"完成");
- }
- }
- }
消费者代码
- /**
- * 这是一个工作线程(消费者)
- */
- public class Worker01 {
- //队列名称
- public static final String QUEUE_NAME = "hello";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMqUtils.getChannel();
- //消息接受
- DeliverCallback deliverCallback = (consumerTag,message)->{
- System.out.println(new String(message.getBody()));
- };
- //消息接受被取消时
- CancelCallback cancelCallback = (consumerTag)->{
- System.out.println(consumerTag+"消息取消消费接口回调");
- };
- /**
- * 消费者信息
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
- * 3.消费者微车才能更改消费的回调
- * 4.消费者取消消费回调
- */
- System.out.println("C2等待接受消息......................");
- channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
- }
- }
在运行之前我们要修改一个选项,这样我们就不需要重复的写消费者2了
启动程序,用生产者发送四条消息,
通过程序执行发现生产者总共发送 4 个消息,消费者 1 和消费者 2 分别分得两个消息,并且 是按照有序的一个接收一次消息
消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成 了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消 息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续 发送给该消费这的消息,因为它无法接收到。 为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了
消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢 失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用
手动应答的一个好处就是可以批量应答并且减少网络拥堵
multiple 的 true 和 false 代表不同意思:
true:代表批量应答channel 上未应答的消息,比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时 5-8 的这些还未应答的消息都会被确认收到消息应答
false:同上面相比 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息 未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者 可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确 保不会丢失任何消息
默认消息采用的是自动应答,所以我们要想实现消息消费过程中不丢失,需要把自动应答改 为手动应答,消费者在上面代码的基础上增加下面画红色部分代码
睡眠工具类:
- public class SleepUtils {
- public static void sleep(int second){
- try {
- Thread.sleep(1000*second);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
生产者:
- /*
- * 消息再手动应答不丢失、放回消息队列重新消费
- */
- public class Task2 {
- //队列名称
- public static final String TASK_QUEUE_NAME = "ack_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- Channel channel = RabbitMqUtils.getChannel();
- //声明队列
- channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);
- //从控制台中输入信息
- Scanner scanner = new Scanner(System.in);
- while(scanner.hasNext()){
- String message = scanner.next();
- channel.basicPublish("",TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes(StandardCharsets.UTF_8));
- System.out.println("生产者发出消息:"+message);
- }
- }
- }
消费者01:
- public class Work01 {
- private static final String ACK_QUEUE_NAME = "ack_queue";
-
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMqUtils.getChannel();
- System.out.println("C1等待接收消息短");
- //消息消费的时候如何处置消息
- DeliverCallback deliverCallback = (consumerTag,delivery)->{
- String message = new String(delivery.getBody());
- SleepUtils.sleep(1);
- System.out.println("接收到消息:"+message);
- /**
- * 1.消息标记tag
- * 2.是否批量应答未应答的消息
- */
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
- };
- //取消消息的回调
- CancelCallback cancelCallback = consumerTag->{
- System.out.println("消息消费被中断");
- };
- /**
- * 消费者信息
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
- * 3.消费者微车才能更改消费的回调
- * 4.消费者取消消费回调
- */
- boolean autoAck = false;
- channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
- }
- }
消费者02:
- public class Work02 {
- private static final String ACK_QUEUE_NAME = "ack_queue";
-
- public static void main(String[] args) throws IOException {
- Channel channel = RabbitMqUtils.getChannel();
- System.out.println("C2等待接收消息长");
- //消息消费的时候如何处置消息
- DeliverCallback deliverCallback = (consumerTag,delivery)->{
- String message = new String(delivery.getBody());
- SleepUtils.sleep(30);
- System.out.println("接收到消息:"+message);
- /**
- * 1.消息标记tag
- * 2.是否批量应答未应答的消息
- */
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
- };
- //取消消息的回调
- CancelCallback cancelCallback = consumerTag->{
- System.out.println("消息消费被中断");
- };
- /**
- * 消费者信息
- * 1.消费哪个队列
- * 2.消费成功之后是否要自动应答 true自动应答 false手动应答
- * 3.消费者微车才能更改消费的回调
- * 4.消费者取消消费回调
- */
- boolean autoAck = false;
- channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
- }
- }
在发送者发送消息 dd,发出消息之后的把 C2 消费者停掉,按理说该 C2 来处理该消息,但是 由于它处理时间较长,在还未处理完,也就是说 C2 还没有执行 ack 代码的时候,C2 被停掉了, 此时会看到消息被 C1 接收到了,说明消息 dd 被重新入队,然后分配给能处理消息的 C1 处理了