• MQ简单入门案例


    目录

    1、建立项目

    2、引入依赖

    3、代码

    Helloworld简单队列模式

    WorkQueue工作队列模式

    概念1:轮询分发消息

    概念2:消息应答

    概念3:消息自动重新入队

    概念4:队列持久化

     概念5:消息持久化(生产者方设置)

     概念6:不公平分发(消费者方设置)

     概念7:预取值

    发布确认模式

    单个确认发布

    批量发布确认

    异步发布确认


    前面mq服务器的安装就相当于交换机与队列已经安装好了,接下来用代码的方式来编写两个程序,分别作为生产者与消费者,实现消息的通信。

    1、建立项目

    2、引入依赖

    1. <!--指定 jdk 编译版本-->
    2. <build>
    3. <plugins>
    4. <plugin>
    5. <groupId>org.apache.maven.plugins</groupId>
    6. <artifactId>maven-compiler-plugin</artifactId>
    7. <configuration>
    8. <source>8</source>
    9. <target>8</target>
    10. </configuration>
    11. </plugin>
    12. </plugins>
    13. </build>
    14. <dependencies>
    15. <!--rabbitmq 依赖客户端-->
    16. <dependency>
    17. <groupId>com.rabbitmq</groupId>
    18. <artifactId>amqp-client</artifactId>
    19. <version>5.8.0</version>
    20. </dependency>
    21. <!--操作文件流的一个依赖-->
    22. <dependency>
    23. <groupId>commons-io</groupId>
    24. <artifactId>commons-io</artifactId>
    25. <version>2.6</version>
    26. </dependency>
    27. </dependencies>

    3、代码

    Helloworld简单队列模式

    消息生产者端代码:

    1. package com.areio.rabbitmq.one;
    2. import com.rabbitmq.client.Channel;
    3. import com.rabbitmq.client.Connection;
    4. import com.rabbitmq.client.ConnectionFactory;
    5. import java.io.IOException;
    6. import java.util.concurrent.TimeoutException;
    7. //生产者:发消息到队列
    8. public class Producer {
    9. //转为大写字母的快捷键ctr+shift+u
    10. public static final String QUEUE_NAME="hello";
    11. public static void main(String[] args) throws IOException, TimeoutException {
    12. //1、创建一个连接工厂
    13. ConnectionFactory factory = new ConnectionFactory();
    14. //2、设置工厂ip,即rabbitmq服务所在的地址
    15. factory.setHost("192.168.30.100");
    16. //3、设置rabbitmq的用户名
    17. factory.setUsername("admin");
    18. //4、设置rabbitmq的密码
    19. factory.setPassword("123");
    20. //5、创建连接
    21. Connection connection = factory.newConnection();
    22. //6、获取连接中的信道
    23. Channel channel = connection.createChannel();
    24. //7、创建队列(使用默认交换机)
    25. //队列名称、队列里的消息是否持久化(默认存储在内存即不持久化,持久化则存于磁盘)、队列是否可供多个消费者消费、最后一个消费者端开链接后是否自动删除、其他参数
    26. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    27. String message = "hello world";
    28. //8、发消息
    29. //发送到哪个交换机、路由的key(队列名)、其他参数、消息体
    30. channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
    31. System.out.println("消息发送完毕");
    32. }
    33. }

     消息消费者端代码:

    1. package com.areio.rabbitmq.one;
    2. import com.rabbitmq.client.*;
    3. import java.io.IOException;
    4. import java.util.concurrent.TimeoutException;
    5. public class Consumer {
    6. public static final String QUEUE_NAME="hello";
    7. public static void main(String[] args) throws IOException, TimeoutException {
    8. //1、创建一个连接工厂
    9. ConnectionFactory factory = new ConnectionFactory();
    10. //2、设置工厂ip,即rabbitmq服务所在的地址
    11. factory.setHost("192.168.30.100");
    12. //3、设置rabbitmq的用户名
    13. factory.setUsername("admin");
    14. //4、设置rabbitmq的密码
    15. factory.setPassword("123");
    16. //5、创建连接
    17. Connection connection = factory.newConnection();
    18. //6、获取连接中的信道
    19. Channel channel = connection.createChannel();
    20. //7、创建队列(使用默认交换机)
    21. //队列名称、队列里的消息是否持久化(默认存储在内存即不持久化,持久化则存于磁盘)、队列是否可供多个消费者消费、最后一个消费者端开链接后是否自动删除、其他参数
    22. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    23. //8、接受消息
    24. DeliverCallback deliverCallback = (consumerTag,message)->{
    25. System.out.println(new String(message.getBody()));
    26. };
    27. CancelCallback cancelCallback = consumerTag->{
    28. System.out.println("消费被中断");
    29. };
    30. //消费哪个队列、消费成功后是否自动应答、消费失败时的回调、消费者取消消费时回调
    31. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    32. System.out.println("消息接收完毕");
    33. }
    34. }

    WorkQueue工作队列模式

    工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

     注意:即使有多个工作线程,一个消息也只会被处理一次,因此一个消息被某个工作线程接收了,其他线程就不会收到该消息了(线程之间是竞争关系)

    消息生产者端代码:

    1. package com.areio.rabbitmq.workqueue;
    2. import com.areio.rabbitmq.utils.RabbitMqUtils;
    3. import com.rabbitmq.client.Channel;
    4. import java.util.Scanner;
    5. public class Producer {
    6. public static final String QUEUE_NAME = "hello";
    7. public static void main(String[] args) throws Exception {
    8. Channel channel = RabbitMqUtils.getChannel();
    9. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    10. Scanner scanner = new Scanner(System.in);
    11. while (scanner.hasNext()){
    12. String mesg = scanner.next();
    13. channel.basicPublish("",QUEUE_NAME,null,mesg.getBytes());
    14. System.out.println("发送消息完成:"+mesg);
    15. }
    16. }
    17. }

    消息消费者端代码:

    1. package com.areio.rabbitmq.workqueue;
    2. import com.areio.rabbitmq.utils.RabbitMqUtils;
    3. import com.rabbitmq.client.CancelCallback;
    4. import com.rabbitmq.client.Channel;
    5. import com.rabbitmq.client.DeliverCallback;
    6. //这是一个工作线程,即消费者
    7. public class Worker01 {
    8. public static final String QUEUE_NAME = "hello";
    9. public static void main(String[] args) throws Exception {
    10. Channel channel = RabbitMqUtils.getChannel();
    11. DeliverCallback deliverCallback = (consumerTag, message)->{
    12. System.out.println("接收到的消息:"+new String(message.getBody()));
    13. };
    14. CancelCallback cancelCallback = consumerTag->{
    15. System.out.println(consumerTag+"消费者取消消费接口时回调此函数");
    16. };
    17. System.out.println("C2等待接收消息......");
    18. channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    19. }
    20. }

    此时如果我们运行两个工作线程,首先运行Worker01这个类,要再运行一次,其操作如下:

     

     接着运行消息生产者,从控制台分别输入发送的消息AA,BB,CC,DD,此时两个工作线程轮询接收到消息:

     

     

    概念1:轮询分发消息

    比如有五个消息过来,第一个消息被线程1拿了,接下来第二消息就被线程2拿,第三个消息被线程3拿,一个线程分配一个消息。

    概念2:消息应答

    消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费这的消息,因为它无法接收到。
    为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接 收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 才可以把该消息删除。

     ①自动应答

    消息发送后立即被认为已经传送成功,这种模式需要在 高吞吐量和数据传输安全性方面做权 ,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制 ,这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,那么这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用 。(接收到消息它就自动应答,但未必任务未必完全处理,不靠谱)

    ②手动应答

    A .Channel.basicAck(用于肯定确认)
    RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
    B .Channel.basicNack(用于否定确认)
    C .Channel.basicReject(用于否定确认)
    与 Channel.basicNack 相比少一个参数
    不处理该消息了直接拒绝,可以将其丢弃了
    手动应答的好处是可以批量应答并且减少网络拥堵
    Channel.basicAck(deliveryTag,true);
    multiple 的 true 和 false 代表不同意思
    true 代表批量应答 channel 上未应答的消息
    比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
    5-8 的这些还未应答的消息都会被确认收到消息应答(不推荐批量应答,因为8处理完成了不代表其他也完成了)
    false 同上面相比,
    只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

     生产者代码:

    1. public class ManualAckPro01 {
    2. public static final String QUEUE_NAME="ack_queue";
    3. public static void main(String[] args) throws Exception {
    4. Channel channel = RabbitMqUtils.getChannel();
    5. /**
    6. * 队列名称
    7. * 队列里的消息是否持久化(默认存储在内存即不持久化,持久化则存于磁盘)
    8. * 队列是否可供多个消费者消费
    9. * 最后一个消费者端开链接后是否自动删除
    10. * 其他参数
    11. */
    12. channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    13. Scanner scanner = new Scanner(System.in);
    14. while (scanner.hasNext()){
    15. String mesg = scanner.next();
    16. channel.basicPublish("",QUEUE_NAME,null,mesg.getBytes("UTF-8"));
    17. System.out.println("发送消息完成:"+mesg);
    18. }
    19. }
    20. }

     消费者1代码:

    1. /*
    2. 自动应答:收到消息后不管任务是否处理完成都会立马应答并在队列中将消息删除
    3. 手动应答:任务处理完成才应答
    4. 实验目标:消息在手动应答模式下是不会丢失的,因为一旦消息在某个消费者处处理失败,它会将这个消息重新放回队列中,给其他队列消费,处理完成后,消费者才给出应答,此时消息才会被删除
    5. 异同:
    6. 自动应答 消费者收到消息后立马应答-->应答后删除消息(未处理完成的消息被删除则丢失消息)
    7. 立马应答 消费者处理完成后才会应答-->应答后删除消息(消息已处理完成,不丢失)
    8. */
    9. public class ManualAckCon01 {
    10. public static final String QUEUE_NAME="ack_queue";
    11. public static void main(String[] args) throws Exception {
    12. Channel channel = RabbitMqUtils.getChannel();
    13. System.out.println("C1等待接收消息处理时间较短");
    14. //采用手动应答
    15. boolean autoAck = false;
    16. channel.basicConsume(QUEUE_NAME,autoAck,(consumerTag,message)->{
    17. SleepUtils.sleep(1);
    18. System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));
    19. /**
    20. * 手动应答
    21. *
    22. * 1、消息的标记
    23. * 2、是否批量应答
    24. * 3、
    25. */
    26. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    27. },(consumerTag)->{
    28. System.out.println(consumerTag+":消费者取消消费时接口回调");
    29. });
    30. }
    31. }

     消费者2代码: 

    1. /*
    2. 自动应答:收到消息后不管任务是否处理完成都会立马应答并在队列中将消息删除
    3. 手动应答:任务处理完成才应答
    4. 实验目标:消息在手动应答模式下是不会丢失的,因为一旦消息在某个消费者处处理失败,它会将这个消息重新放回队列中,给其他队列消费,处理完成后,消费者才给出应答,此时消息才会被删除
    5. 异同:
    6. 自动应答 消费者收到消息后立马应答-->应答后删除消息(未处理完成的消息被删除则丢失消息)
    7. 立马应答 消费者处理完成后才会应答-->应答后删除消息(消息已处理完成,不丢失)
    8. */
    9. public class ManualAckCon02 {
    10. public static final String QUEUE_NAME="ack_queue";
    11. public static void main(String[] args) throws Exception {
    12. Channel channel = RabbitMqUtils.getChannel();
    13. System.out.println("C2等待接收消息处理时间较长");
    14. //采用手动应答
    15. boolean autoAck = false;
    16. channel.basicConsume(QUEUE_NAME,autoAck,(consumerTag,message)->{
    17. SleepUtils.sleep(30);
    18. System.out.println("接收到消息:"+new String(message.getBody(),"UTF-8"));
    19. /**
    20. * 手动应答
    21. *
    22. * 1、消息的标记
    23. * 2、是否批量应答
    24. * 3、
    25. */
    26. channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
    27. },(consumerTag)->{
    28. System.out.println(consumerTag+":消费者取消消费时接口回调");
    29. });
    30. }
    31. }

    操作:启动生产者,再启动两个消费者,接着在控制台分别输入AA、BB,看到C1收到AA,此时C2在30s后可看到BB。如果再发送CC、DD,且把C2的程序在30s内关闭,此时C1能收到CC,DD(C2没有处理完成DD,此时消息放回队列重新被消费)

    预期:生产者发送AA、BB,此时消费者1间隔1s后收到AA,如果未出现错误,消费者2在30s后可收到BB,如果宕机等,那么BB回到队列,由消费者1来处理

    概念3:消息自动重新入队

    如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

    概念4:队列持久化

    前面我们已经知道了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会忽视(丢失)队列和消息,除非告知它不要这样做。确保消息不会丢失需要做两件事:我们需要将队列和消息都标 记为持久化

    操作:只需要在生成队列时将durable标志设为true

    注意: 如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误

    此时这个队列在mq管理界面可以看到字母D

     概念5:消息持久化(生产者方设置)

    要想让消息实现持久化需要在消息生产者修改代码,在发布消息时的channel.basicPlush()方法 添加MessageProperties.PERSISTENT_TEXT_PLAIN 这个属性。
    将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。如果需要更强有力的持久化策略,参考后边发布确认章节。

     概念6:不公平分发(消费者方设置)

    在最开始的时候我们学习到 RabbitMQ 分发消息采用的轮训分发,但是在某种场景下这种策略并不是 很好,但是有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,此时采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间 处于空闲状态,而处理慢的那个消费者一直在干活。
    为了避免这种情况,我们可以设置参数 channel.basicQos(1);

     概念7:预取值

    本身消息的发送就是异步发送的,所以在任何时候, channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此 缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题 。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量 。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5 6 7 8 ,并且通道的预取计数设置为 4 ,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack 。比方说 tag=6 这个消息刚刚被确认 ACK RabbitMQ 将会感知这个情况到并再发送一条消息。

    这里的2、5是堆积在信道的消息数,而不是最终处理的消息数,假如11 22分别进入C1、C2的信道,接着C1把11处理完成,22堆积在C2信道中,此时生产者再发送33 44 55, 33 44堆积在C1信道,(此时信道C1只有两个,因为前面的11已经被处理完成) 

    发布确认模式

     单个确认发布

    它是一种 同步确认发布 的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。
    最大的缺点就是: 发布速度特别的慢, 因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量。
    1. //发布1000条单独确认的消息,耗时:1207ms
    2. public class SingleConfirmPro {
    3. //批量发消息的个数
    4. public static final int MESSAGE_COUNT=1000;
    5. public static void main(String[] args) throws Exception {
    6. Channel channel = RabbitMqUtils.getChannel();
    7. String queueName = UUID.randomUUID().toString();
    8. channel.queueDeclare(queueName,true,false,false,null);
    9. //开启发布确认
    10. channel.confirmSelect();
    11. //开始时间
    12. long begin = System.currentTimeMillis();
    13. //批量发消息
    14. for(int i=0;i<MESSAGE_COUNT;i++){
    15. String message = i+"";
    16. channel.basicPublish("",queueName,null,message.getBytes());
    17. //单个消息就立马进行确认
    18. boolean flag = channel.waitForConfirms();
    19. if (flag){
    20. System.out.println("消息发送成功");
    21. }
    22. }
    23. //结束时间
    24. long end = System.currentTimeMillis();
    25. System.out.println("发布"+MESSAGE_COUNT+"条单独确认的消息,耗时:"+(end-begin)+"ms");
    26. }
    27. }

    批量发布确认

    先发布一批消息然后一起确认可以极大地 提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布。
    1. //发布1000条消息,间隔消息数量为100时批量确认,耗时:123ms
    2. public class BatchConfirmPro {
    3. //批量发消息的个数
    4. public static final int MESSAGE_COUNT=1000;
    5. public static void main(String[] args) throws Exception {
    6. Channel channel = RabbitMqUtils.getChannel();
    7. String queueName = UUID.randomUUID().toString();
    8. channel.queueDeclare(queueName,true,false,false,null);
    9. //开启发布确认
    10. channel.confirmSelect();
    11. //开始时间
    12. long begin = System.currentTimeMillis();
    13. //多少条就批量确认一次
    14. int batchSize=100;
    15. //批量发消息并进行批量确认
    16. for(int i=0;i<MESSAGE_COUNT;i++){
    17. String message = i+"";
    18. channel.basicPublish("",queueName,null,message.getBytes());
    19. //消息积累100条再批量确认一次
    20. if (i%batchSize==0){
    21. channel.waitForConfirms();
    22. }
    23. }
    24. //结束时间
    25. long end = System.currentTimeMillis();
    26. System.out.println("发布"+MESSAGE_COUNT+"条消息,间隔消息数量为100时批量确认,耗时:"+(end-begin)+"ms");
    27. }
    28. }

    异步发布确认

    异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,他是利用回调函数来达到消息可靠性传递的,这里的监听器有两个回调函数,一个是消息确认,一个是消息未确认,我们可以在确认成功或者确认失败时做一些处理。

  • 相关阅读:
    xss 漏洞
    【笔者感悟】笔者的学习感悟【四】
    每天五分钟机器学习:机器学习之非监督学习中的聚类算法的介绍
    JAVA基础(二十五)——枚举
    ssh宿舍管理系统
    首个中文Stable Diffusion模型开源,玩转中文-图片”的跨模态生成任务
    SpringBoot如何实现热部署
    2024.1IDEA 到2026年
    kali linux将默认的非root提升为root权限
    高级深入--day39
  • 原文地址:https://blog.csdn.net/qq_45382625/article/details/125417786