• 201、RabbitMQ 之 Exchange 典型应用模型 之 工作队列(Work Queue)


    ★ 工作队列介绍

    工作队列: 就是让多个消费者竞争消费同一个消息队列的消息,相当于多个消费者共享消息队列。

    RabbitMQ可以让多个消费者竞争消费同一个消息队列

    消息队列默认会将消息“均分”给每个消费者,但这样做往往并不合适:
    因为有的消费者需要更多时间处理一条消息,有的消费者只要更少时间即可处理一条消息,
    如果让它们“均分”这些消息,就会造成资源浪费。

    ▲ 比较理想的做法是“能者多劳”,让队列将消息多分给需要更少时间的消费者(快),
    将消息少分给需要更多时间的消费者(慢)。

    ▲ 调用Channel的basicQos(int prefetchCount)方法可控制消费者在同一时间点最多能得到的消息数量
    ——此时应该采用手动确认。

    在这里插入图片描述
    在这里插入图片描述

    这个就是上一篇写的采用自动确认策略
    注意: channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
    这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
    而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
    在这里插入图片描述

    代码演示

    在上一篇的代码基础上修改
    200、使用默认 Exchange 实现 P2P 消息 之 消息生产者(发送消息) 和 消息消费者(消费消息)

    思路:
    1、创建一个消息生产者和两个消息消费者。
    2、生产者发送20条消息
    3、消费者01 和 消费者 02 都用 channel.basicQos(3); 设置同一时间点只能获取3条消息来处理,只有这3条消息处理完才能再次获取3条消息
    4、每个消费者都在消息处理完之后添加 channel.basicAck() 这个方法来手动确认消息成功消费并返回确认成功消费的消息给消息队列。
    5、消费者01 每次消费完后,先睡眠个1秒,再手动确认消息已经消费,消费者02不需要,当消息消费完成后就马上手动确认。用于看两个消费者的消费情况

    代码如图:
    生产者 Producer
    生产者代码不变,只是设置发送20条消息
    在这里插入图片描述

    消费者01 Consumer01
    经过测试:同一时间点每次只能消费3条消息,只有这3条消息消费完成,并手动确认消费完成后,才能再获取3条消息进行消费。如果把手动确认消费的代码注释掉,那么这个消费者只能消费到3条消息。最后面有演示:

    在这里插入图片描述
    在这里插入图片描述

    消费者02 Consumer02
    多个了睡眠1秒再手动确认消息
    在这里插入图片描述
    在这里插入图片描述

    测试

    生产者发送20条消息,消费者01 和 消费者02 每次获取3条消息,消息消费并手动确认后才能再获取3条消息进行消费。
    然后消费者02 因为每次消费完都睡眠一秒,而消费者01没有。
    这个睡眠 用来演示消费者01的消息处理速度比消费者02 快的情况。
    所以那个消费者消费的快,哪个消费者处理的消息就越多
    这个就是工作队列:
    工作队列 就是让多个消费者竞争消费同一个消息队列的消息,相当于多个消费者共享消息队列。
    在这里插入图片描述

    注意点1:

    如图:这个 multiple 参数,设置为false,表示 不对之前未确认的的消息进行批量确认。
    可以经过测试,无论改成true还是false,只要消息队列里面有已消费未确认的消息,再次启动这个消费者,它还是会对之前已消费未确认的消息进行批量确认。
    在这里插入图片描述
    测试流程:
    1、首先,关闭消费者,然后生产者发送20条消息。
    现在就是消息队列有20条消息未被消费
    在这里插入图片描述

    2、这时候把确认消费的代码注释掉,然后如图,成功消费3条消息,但是未确认,还有17条消息待消费。
    在这里插入图片描述
    3、重新启动消费者01,这个时候正确应该是消费剩下的17条消息,但是那3条消费未确认的消息应该还在。

    但是结果却如图:
    重启消费者01,把自动确认的代码放开,multiple 为 false,但是最终还是把所有消息消费了,包括3条已消费未确认的消息。

    所以感觉这个 multiple 为 false 没起作用。
    在这里插入图片描述

    在这里插入图片描述

    注意点2:

    注释掉手动确认代码的演示: 经过测试:同一时间点每次只能消费3条消息,只有这3条消息消费完成,并手动确认消费完成后,才能再获取3条消息进行消费。如果把手动确认消费的代码注释掉,那么这个消费者只能消费到3条消息
    在这里插入图片描述

    完整代码:

    ConnectionUtil

    package cn.ljh.rabbitmq.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //连接工具
    public class ConnectionUtil
    {
        //获取连接的方法
        public static Connection getConnection() throws IOException, TimeoutException
        {
            //创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来
            ConnectionFactory connectionFactory =  new ConnectionFactory();
            //设置连接信息
            connectionFactory.setHost("localhost");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("ljh");
            connectionFactory.setPassword("123456");
            connectionFactory.setVirtualHost("/"); //连接虚拟主机
            //从连接工厂获取连接
            Connection connection = connectionFactory.newConnection();
            //返回连接
            return connection;
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    Producer

    package cn.ljh.rabbitmq.producer;
    
    import cn.ljh.rabbitmq.consumer.Consumer01;
    import cn.ljh.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    import java.io.IOException;
    import java.nio.charset.StandardCharsets;
    import java.util.concurrent.TimeoutException;
    
    //消息生产者--使用默认的exchange
    public class Producer
    {
        //(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。
        //(2)通过Connection获取Channel。
        //(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。
        //    如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。
        //(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,
        //    第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。
        public static void main(String[] args) throws IOException, TimeoutException
        {
            //1、创建连接
            Connection conn = ConnectionUtil.getConnection();
            //2、通过Connection获取Channel。
            Channel channel = conn.createChannel();
            //3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定
            //此处打算直接使用默认的Exchange来分发消息,因此无需声明 Exchange,只需声明队列
            channel.queueDeclare(Consumer01.QUEUE_NAME, true, false, false, null);
    
            //生产者发送20条消息
            for (int i = 1; i <= 20; i++)
            {
                String message = "生产者发送的第【 "+i+" 】条消息的内容";
    
                //4、调用Channel的basicPublish()方法发送消息
                channel.basicPublish(""/*默认的 Exchange 没有名字,所以用空的字符串*/,
                        Consumer01.QUEUE_NAME/*使用队列名作为路由key,表明该消息将会被路由到该队列*/,
                        null /*指定额外的消息的属性*/,
                        message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/
                );
            }
            //5、关闭资源
            //关闭通道
            channel.close();
            //关闭连接
            conn.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49

    Consumer01

    package cn.ljh.rabbitmq.consumer;
    
    import cn.ljh.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //消息消费者1
    public class Consumer01
    {
        // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
        //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
        //(2)通过Connection获取Channel。
        //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
        //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
        //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。
    
        //常量
        public final static String QUEUE_NAME = "myQueue01";
    
        public static void main(String[] args) throws IOException, TimeoutException
        {
            //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
            Connection conn = ConnectionUtil.getConnection();
    
            //2、通过Connection获取Channel 消息通道
            Channel channel = conn.createChannel();
    
            //工作队列的关键,设置该消费者在同一时间点,最多只会获取3条消息来处理
            //不要纠结同一时间点是多久,重点是这个消费者每次只能获取3条消息,只有这条消息处理完后,才能再获取3条信息来处理
            channel.basicQos(3);
    
    
            //3、调用 Channel 的 queueDeclare() 方法声明队列
            //如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
            //参数1:声明的队列名; 参数2:消息队列是否持久化
            //参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。
            //参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数
            channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    
    
            //4、调用Channel 的 basicConsume()方法开始处理消费消息
            channel.basicConsume(QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,
                    false/*消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                    new DefaultConsumer(channel)
                    {
                        //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                                   AMQP.BasicProperties properties /*消息的那些属性*/,
                                                   byte[] body /*body:消息的消息体*/) throws IOException
                        {
                            //把消息体中的消息拿出来
                            String message = new String(body, "UTF-8");
                            //printf:格式化输出函数   %s:输出字符串  %n:换行
                            System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                    envelope.getExchange(),envelope.getRoutingKey(),message);
    
                            //到这里消息已经处理完了=======
                            //因为 basicConsume 方法的 参数2 的 autoAck 为 false , 所以这里需要进行手动确认消息
                            //等消息处理完成后,确认本条消息
                            //getDeliveryTag() 获取这条消息的标志,作为参数,让basicAck()方法确认这条消息已经消费完成并返回给消息队列
                            channel.basicAck(envelope.getDeliveryTag(),
                                    false /* 指定是否对之前未确认的消息进行批量确认 */);
    
                        }
                    }
            );
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73

    Consumer02

    package cn.ljh.rabbitmq.consumer;
    
    import cn.ljh.rabbitmq.util.ConnectionUtil;
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    //消息消费者2
    public class Consumer02
    {
        // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
        //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
        //(2)通过Connection获取Channel。
        //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
        //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
        //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。
        public static void main(String[] args) throws IOException, TimeoutException
        {
            //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
            Connection conn = ConnectionUtil.getConnection();
    
            //2、通过Connection获取Channel 消息通道
            Channel channel = conn.createChannel();
    
            //工作队列的关键,设置该消费者在同一时间点,最多只会获取3条消息来处理
            //不要纠结同一时间点是多久,重点是这个消费者每次只能获取3条消息,只有这条消息处理完后,才能再获取3条信息来处理
            channel.basicQos(3);
    
            //3、调用 Channel 的 queueDeclare() 方法声明队列
            //如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
            //参数1:声明的队列名; 参数2:消息队列是否持久化
            //参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。
            //参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数
            channel.queueDeclare(Consumer01.QUEUE_NAME, true, false, false, null);
    
            //4、调用Channel 的 basicConsume()方法开始处理消费消息
            channel.basicConsume(Consumer01.QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,
                    false/*消消息的确认模式:是否自动确认该消息已经被消费完成并返回确认消息给消息队列*/,
                    new DefaultConsumer(channel)
                    {
                        //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                        @Override
                        public void handleDelivery(String consumerTag,
                                                   Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                                   AMQP.BasicProperties properties /*消息的那些属性*/,
                                                   byte[] body /*body:消息的消息体*/) throws IOException
                        {
                            //把消息体中的消息拿出来
                            String message = new String(body, "UTF-8");
                            //printf:格式化输出函数   %s:输出字符串  %n:换行
                            System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                    envelope.getExchange(),envelope.getRoutingKey(),message);
    
                            try
                            {
                                //模拟耗时的操作 1秒
                                Thread.sleep(1000);
                            } catch (InterruptedException e)
                            {
                                e.printStackTrace();
                            }
    
                            //等消息处理完成后,确认本条消息=======到这里消息已经处理完了
                            //因为 basicConsume 方法的 参数2 的 autoAck 为 false , 所以这里需要进行手动确认消息
                            //getDeliveryTag() 获取这条消息的标志,作为参数,让basicAck()方法确认这条消息已经消费完成并返回给消息队列
                            channel.basicAck(envelope.getDeliveryTag(),
                                    false /* 指定是否对之前未确认的消息进行批量确认 */);
                        }
                    }
            );
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76

    pom.xml

    依赖不变

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>cn.ljh</groupId>
        <artifactId>workqueue</artifactId>
        <version>1.0.0</version>
        <name>workqueue</name>
    
        <!--  属性  -->
        <properties>
            <maven.compiler.source>11</maven.compiler.source>
            <maven.compiler.target>11</maven.compiler.target>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <java.version>11</java.version>
        </properties>
    
        <!--  依赖  -->
        <dependencies>
            <!-- RabbitMQ 的依赖库 -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.13.0</version>
            </dependency>
    
        </dependencies>
    
    </project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
  • 相关阅读:
    欧洲版“OpenAI”——Mistral 举办的 AI 大模型马拉松
    论坛介绍|COSCon'23 开源百宝箱(T)
    Oracle/PLSQL: NANVL Function
    hystrix 熔断器
    使用Tornado进行网络异步编程
    (原创)【B4A】一步一步入门09:xCustomListView,加强版列表、双行带图片、复选框按钮等自定义列表项(控件篇05)
    Docker(三)、Dockerfile探究
    【计算机图形学入门】笔记2:向量与线性代数(图形学中用到的线性代数)
    【Nginx】Windows10 平台下配置Nginx服务实现负载均衡
    【数据结构】二叉搜索树
  • 原文地址:https://blog.csdn.net/weixin_44411039/article/details/133774684