• RabbitMQ------简单队列模式以及工作队列模式以及消息应答的方式(三)


    RabbitMQ------简单队列模式以及工作队列模式以及消息应答的方式(三)
    创建maven项目,导入依赖

    <!--    指定jdk编译版本-->
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-eclipse-plugin</artifactId>
                    <version>2.8</version>
                     <configuration>
            			<source>1.8</source>
            			<target>1.8</target>
          			</configuration>
                </plugin>
            </plugins>
        </build>
    
        <dependencies>
    <!--   rabbitmq依赖客户端     -->
            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.8.0</version>
            </dependency>
    <!--操作文件流的一个依赖-->
            <dependency>
                <groupId>commons-io</groupId>
                <artifactId>commons-io</artifactId>
                <version>2.6</version>
            </dependency>
        </dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    简单队列模式

    通过信道连接交换机、再连接队列。
    简单模式,可以采用默认交换机,省略选择交换机,直接连接队列。
    生产者代码:
    1.首先需要创建连接工厂,ConnectionFactory
    2.配置IP、用户名、密码
    3.创建连接
    4.获取信道、或者说创建信道
    5.获取队列,或者说产生队列
    6.发送消息

    /**
     * 生产者
     */
    public class Producer {
        //队列名称
        public static final String QUEUE_NAME = "hello";
    
        //发消息,使用main函数
        public static void main(String[] args) throws Exception {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //工厂IP,连接RabbitMQ队列
            connectionFactory.setHost("192.168.200.129");
            //设置用户名
            connectionFactory.setUsername("admin");
            //设置密码
            connectionFactory.setPassword("123");
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //一个连接中有多个信道,信道才是发消息的
            //获取信道
            Channel channel = connection.createChannel();
            //产生一个队列  声明一个队列
            //参数说明
            // 1.队列名称,
            // 2.队列是否需要保存消息(持久化存储到磁盘),默认存储在内存中,不持久化,
            // 3.是否需要排他(只共一个消费者消费),是否进行消息共享,true可以多个消费者,false不允许多个消费者消费
            // 4.是否自动删除,最后一个消费者断开连接之后,该队列是否自动删除,true自动删除,fasle不自动删除
            // 5.传递一些队列的参数(延迟消息,死信消息)
            //    (String queue, boolean durable, boolean exclusive, boolean autoDelete,
            //                                 Map<String, Object> arguments) throws IOException;
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //发消息
            String message = "hello world";
            //void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            //1.发送到哪个交换机
            //2.路由的key值是哪个,本质上是队列名称
            //3.其他参数信息
            //4.发送的消息的消息体 byte
            channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
            System.out.println("消息发送完毕");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    其中队列的配置参数说明:

    //参数说明
            // 1.队列名称,
            // 2.队列是否需要保存消息(持久化存储到磁盘),默认存储在内存中,不持久化,
            // 3.是否需要排他(只共一个消费者消费),是否进行消息共享,true可以多个消费者,false不允许多个消费者消费
            // 4.是否自动删除,最后一个消费者断开连接之后,该队列是否自动删除,true自动删除,fasle不自动删除
            // 5.传递一些队列的参数(延迟消息,死信消息),没有就为null
    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                     Map<String, Object> arguments) throws IOException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    以及发消息的参数说明

    /void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
            //1.发送到哪个交换机
            //2.路由的key值是哪个,本质上是队列名称
            //3.其他参数信息
            //4.发送的消息的消息体 byte类型
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    消费者消费消息:
    1.首先需要创建连接工厂,ConnectionFactory
    2.配置IP、用户名、密码
    3.创建连接
    4.获取信道
    5.获取消息

    /**
     * 消费者
     */
    public class Consumer {
        //队列名称
        public static final String QUEUE_NAME = "hello";
        
        //发消息,使用main函数
        public static void main(String[] args) throws Exception {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //工厂IP,连接RabbitMQ队列
            connectionFactory.setHost("192.168.200.129");
            //设置用户名
            connectionFactory.setUsername("admin");
            //设置密码
            connectionFactory.setPassword("123");
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //一个连接中有多个信道,信道才是发消息的
            //获取信道
            Channel channel = connection.createChannel();
            /**
             * 消费者消费消息
             * 1.消费哪个队列
             * 2.消费成功后,是否要自动应答 true,false手动应答
             * 3.消费者成功消费的回调
             * 4.消费者取消消费的回调
             */
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag,message)->{
                //消息有消息头、消息体
                System.out.println(message.getBody());
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    //        String consume = channel.basicConsume(QUEUE_NAME,true,
    //                (consumerTag,message)->{
    //            System.out.println(message);
    //        },
    //                consumerTag->{
    //            System.out.println(consumerTag);
    //        });
    
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51

    工作队列模式 work queues

    工作队列又成任务队列,主要思想:避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。把任务封装为消息并将其发送到队列,在后台运行的工作进程将弹出任务,并最终执行作业。当有工作线程时,这些工作线程(多个消费者)将一起处理这些任务。
    大致意思就是:生产者发送大量消息到消息队列中,多个消费者轮询消费消息。
    注意:这种模式,一个消息只能被消费一次,不能被消费多次,消费多次,造成重复消费。
    特点:轮询分发消息。此时多个消费者是竞争关系。
    将建立连接、获取信道,抽取成工具类。

    public class RabbitMqUtiles {
    
        public static Channel getChannel() throws Exception {
            //创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            //工厂IP,连接RabbitMQ队列
            connectionFactory.setHost("192.168.200.129");
            //设置用户名
            connectionFactory.setUsername("admin");
            //设置密码
            connectionFactory.setPassword("123");
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //一个连接中有多个信道,信道才是发消息的
            //获取信道
            Channel channel = connection.createChannel();
            return channel;
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    IDEA中允许一段代码多个线程运转。只需要将allow parallel run 打钩即可。
    在这里插入图片描述
    消费者:
    将work1运行。

    /**
     * 这是一个工作线程,相当于一个消费者
     */
    public class Wrok1 {
        //队列名称
        public static final String QUEUE_NAME = "hello";
    
        //发消息,使用main函数
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            /**
             * 消费者消费消息
             * 1.消费哪个队列
             * 2.消费成功后,是否要自动应答 true,false手动应答
             * 3.消费者成功消费的回调
             * 4.消费者取消消费的回调
             */
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("接收到的消息"+message.getBody());
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            System.out.println("C1等待接受消息....");
            channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    
    
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    将C1等待接受消息改为C2,再次运行。模拟多线程的情况。
    生产者:

    public class Task {
        //队列名称
        public static final String QUEUE_NAME = "hello";
    
        //发消息,使用main函数
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            //发消息
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            StringBuffer message = new StringBuffer("hello world");
    
            for (int i = 0 ;i<100;i++){
                message.append(i);
                channel.basicPublish("", QUEUE_NAME, null, message.toString().getBytes());
                System.out.println("消息发送完毕"+message);
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    结果,消息被work1消费,就不会被work2消费,会轮询消费

    消息应答问题

    如果消费者接受到一个消息,需要进行长任务处理,但是完成部分后挂了,此时就会消息丢失,以及后续发送给这个消费者消息,因为它也无法接收到。
    为了解决这一问题,rabbitmq引入消费者应答机制。
    意味着:消费者在接收到消息并且处理该消息后,告诉rabbitmq该消息已被处理,rabbitmq才会将该消息删除。
    应答又分为手动应答和否定应答。

    自动应答

    消息发送后立即被认为已经传送成功,这种模式需要高吞吐量和数据传输安全性方面做权衡,自动应答可能会出现消息丢失的情况。因此仅限于各个方面可靠性都很高的情况下使用。

    手动应答的方法

    应答分为:肯定应答和否定应答

    1. Channel.basicAck(用于消息确认),RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃。
    2. Channel.basicNack(用于否定确认)
    3. Channel.basicReject(用于否定确认)
      与Channel.basicNack相比少了一个参数(是否批量处理的参数)。
      不处理该消息了直接拒绝,可以将其丢弃
    multiple解释

    手动应答的好处可以批量应答并且减少网络阻塞。

    //void basicAck(long deliveryTag, boolean multiple) throws IOException;
    channel.basicAck(deliveryTag,true);
    
    • 1
    • 2

    multiple的true和false代表不同意思,建议false,不进行批量应答。
    true:代表批量应答channel上未应答的消息
    比如:channel上有传送tag的消息5,6,7,8,当前tag是8,那么此时5-8的这些还未应答的消息都会被确认消息应答。
    false:则代表不批量应答
    只会应答tag=8的消息,5,6,7的这三个消息依然会被确认收到消息应答。

    消息重新入队

    如果消费者由于意外导致,消息未发送ACK确认,RabbitMQ将了解到消息未完全处理,并对其重新排队。如果此时其他消费者处理,它会很快被重新分发给另一个消费者。这样即使消费者偶尔宕机,也确保消息不会丢失。
    手动应答代码:
    变动点:
    1.将自动应答标记设为false
    2.在消费消费时候处理消息时,增加手动应答

    public class Wrok1 {
        //队列名称
        public static final String QUEUE_NAME = "hello";
    
        //发消息,使用main函数
        public static void main(String[] args) throws Exception {
            Channel channel = RabbitMqUtiles.getChannel();
            /**
             * 消费者消费消息
             * 1.消费哪个队列
             * 2.消费成功后,是否要自动应答 true,false手动应答
             * 3.消费者未成功消费的回调
             * 4.消费者取消消费的回调
             */
            //消息消费的时候如何处理消息
            DeliverCallback deliverCallback = (consumerTag, message)->{
                //消息有消息头、消息体
                System.out.println("接收到的消息"+message.getBody());
                //1.消费标记Tag
                //2.是否批量应答未应答的消息,fasle只应答当前消息
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            };
            //取消消息时的回调
            CancelCallback cancelCallback = consumerTag->{
                System.out.println("消息消费被终断");
            };
            System.out.println("C1等待接受消息....");
            //设置手动应答
            Boolean autoAck = false;
            channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    在消费者1、和消费者2中,分别增加线程休眠1s,和线程休眠30s。
    代码结果:消费者1消费第一条信息,消费者2消费第二条信息,由于消费者2有30s的休眠时间,此时将消费者2代码停止运行,能看到结果,消费者1会马上消费第2条信息。

  • 相关阅读:
    【嵌入式软件C编程】主函数free子函数malloc地址的两种方式以及注意事项
    新建SpringCloud电商前端Vant项目
    重磅:百度李彦宏、中科院曾毅入选,《时代周刊》AI最有影响力100人!
    解决:Glide 在回调中再次加载图片报错
    相控阵天线(八):圆环阵列天线和球面阵列天线
    19.SpringSecurity存在的问题及解决方案
    HDFS工作流程与机制
    C 语言通用MySQL 功能增删查改功能.
    BigEvent Demo
    【机器学习 之 Matplotlib】绘制折线图 基础练习
  • 原文地址:https://blog.csdn.net/cz_chen_zhuo/article/details/127559544