• RabbitMQ 学习(二)---- HelloWorld 简单模型


    RabbitMQ 学习(二)---- HelloWorld 简单模型

    在这里插入图片描述

    开放 rabbitMQ 端口号 5672


    之前我们使用rabbitMq 网页客户端 开放了 15672 的端口,要想是的 java客户端访问服务器成功,需要开放 5672 的端口号。在服务器安全组设置


    还要再pom.xml中加载 ampq客户端的依赖

      
            <dependency>
                <groupId>com.rabbitmqgroupId>
                <artifactId>amqp-clientartifactId>
                <version>5.14.2version>
            dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    如果想要在控制台不日志的报错信息,还要加载 slf4j 的依赖

            <dependency>
                <groupId>org.slf4jgroupId>
                <artifactId>slf4j-simpleartifactId>
                <version>1.7.25version>
            dependency>
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    发送者


    (1)创建连接工厂 connectionFactory


    在工厂中需要设置连接的主机名、端口号、客户端的用户名、密码、虚拟主机等,为之后的连接做好预先准备。


    虚拟主机就是相当于 我们在数据库软件中 一个系统对应的数据库一样,对应这个一个单独的节点

     //1、创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("XXXX.XXX.XXX.XXX"); // 部署rabbitMQ的
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
            // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
            // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
            connectionFactory.setVirtualHost("/test");
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    (2)创建 与 rabbitmq 的连接 connection


    通过工厂实例拿到与rabbitMq的连接对象,只有拿到了连接,才能进行后续的所有操作

       //2、根据连接工厂创建连接对象
                connection = connectionFactory.newConnection();
    
    • 1
    • 2

    (3)通过连接创建信道 channel


    这里有一个经典面试题,为什么我们在channel中完成消息的发送接收,而不是直接在connection中呢?


    看看这一个回答,能够很清楚的说明白。

    在这里插入图片描述


    (4)在信道中传递数据


    如何在信道中传递数据呢?


    我们需要明确一点,不管是什么模型,其实都需要 交换机(exchange)、路由器(routingkey)、队列(queue).


    虽然这里的简单模型是点对点的,只需要 队列来传递数据的

    • Exchanger交换机,用来接收生产者发布的消息并将这些消息路由给服务器中的队列。

    • 默认的交换机有一个特点,只要你的routerKey与这个交换机中有同名的队列,他就会自动路由上。

    这里的发送接收的规则捋了一下,是这样的,因为该模型不需要指定交换机与路由规则,只需要队列就行了所以使用默认交换机,交换机是用来接收生产者消息,并根据路由规则将消息分发给服务器中的队列中的,在此之前生产着声明了队列,但是在传递的时候仍然需要交换机与路由规则 给队列分发 消息, 巧了默认交换机有一个规则,如果路由规则与队列同名的话,那么路由与队列会自动绑定上,所以需要将 routingKey 写成 与 队列同名,让他们绑定上,然后生产者才能在队列中取到信息


    一个很重要的信息传递的规则

    • 生产者声明队列

    • 生产者在发送消息的时候 使用 交换机 接收消息,通过 路由规则 分发消息到 指定的 队列 中等待接收

    • 消费者声明队列

    • 消费者在接收消息的时候通过队列(与生产者队列一致)进行接收消息,接收成功并选择是否回应


    声明队列相关信息

     //4、在信道中声明队列
        /**
         * @Params1 queue:队列的名字
         * @Params2 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
         * @Params3 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问 
         * @Params4 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
         * @Params5 arguments: map类型,一些额外的参数,比如说过期时间设置等
         */
                channel.queueDeclare("queue",false,false,false,null);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    通过交换机、路由器、队列进行发送信息

            /**
             * 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
             * 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
             * 参数3:props 这里写发送的消息是否支持持久化
             * 参数4: bytes 类型,这是我们要传递的具体消息
             */ 
    
    //5、使用该信道进行发送消息 
        for(int i=0;i<10;i++){
                    channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:"+i).getBytes());
                }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    (6)关闭资源


    在发送完毕之后,关闭信道,关闭连接


    finally {
                if(channel!=null){
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    完整的过程代码


    package hello;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Provider {
    
    
        public static void main(String[] args) {
            //1、创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("120.46.143.156");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
    
            // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
            // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
            connectionFactory.setVirtualHost("/test");
    
            Connection connection = null;
            Channel channel =null;
    
            try {
                //2、根据连接工厂创建连接对象
                connection = connectionFactory.newConnection();
    
                //3、根据连接对象创建信道
                channel = connection.createChannel();
    
                // 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列
    
                //4、在信道中声明队列
                /**
                 * @Params1 queue:队列的名字
                 * @Params1 durable: 是否支持队列持久化? 这里的持久化以及就是队列信息写入磁盘,如果rabbitmq服务器重启也会恢复队列信息,但不是信息持久化,信息会失去
                 * @Params1 exclusive: 该队列是否支持独占? 就是这个队列在被一个信道占用的时候不能被其他进行访问
                 * @Params1 autoDelete: 该队列是否自动删除? 就是说这个队列中的信息被接收方拿完之后要自动删除
                 * @Params1 arguments: map类型,一些额外的参数,比如说过期时间设置等
                 */
                channel.queueDeclare("queue",false,false,false,null);
    
                //5、使用该信道使用exchange、routineKey 进行发送消息 Bytes,
    
                // 默认的交换机有一个特点,routineKey 如果和 队列名一致的话 ,那么匹配成功
    
                // 生产者不是将消息放在queue队列中,而是放在默认交换机中等待符合routingkey的队列匹配,routineKey 名字和队列名一致则匹配成功
                // queue在消费者这里生成,匹配成功之后消费者在queue中取消息
    
                /**
                 * 参数1:exchange 写交换机的名字,如果不写说明使用默认default amqp
                 * 参数2:routinekey 路由器,如果没有路由器的话默认路由器和队列同名,需要写上队列的名字
                 * 参数3:props 这里写发送的消息是否支持持久化
                 * 参数4: bytes 类型,这是我们要传递的具体消息
                 */
                for(int i=0;i<10;i++){
                    channel.basicPublish("", "queue", MessageProperties.PERSISTENT_TEXT_PLAIN, ("简单模型信息传递:"+i).getBytes());
                }
    
            } catch (Exception e){
                e.printStackTrace();
            }finally {
                if(channel!=null){
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }
    
                if(connection!=null){
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
    
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    接收者


    前面的几个步骤 都一样,

    需要注意的一点是,接收方在接收queue中的数据的时候,声明queue必须和 发送方的保持一致,所有条件都得保持一致,否则接收不到。


    (1)发送消息


    从使用信道发送消息开始,使用 basicConsume()

     /**
                 * 参数1 : 队列的名字
                 * 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
                 * 参数3 : 接收到消息之后的业务操作
                 */
    
    //5、使用该信道进行发送消息
                channel.basicConsume("queue", true, new DefaultConsumer(channel){
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("接受了发送方的消息:"+ new String(body));
                    }
                });
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    这里需要说一下,在最后的业务操作的参数是一个 Consumer,设置一个接口,我们需要写一个实现类,重写其中的 方法,对接收的message进行后续的业务操作。

    在这里插入图片描述

    在这里插入图片描述


    接受方完整代码

    package hello;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    
    public class Customer {
        public static void main(String[] args) {
            //1、创建连接工厂
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost("120.46.143.156");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("admin");
            connectionFactory.setPassword("123456");
    
            // 虚拟主机相当于一个节点,作用相当于数据库软件中的 存储各种表数据的数据库
            // 发送消息接收消息 (channel、exchange、routinekey、queue)都在虚拟主机中完成
            connectionFactory.setVirtualHost("/test");
    
            Connection connection = null;
            Channel channel =null;
    
            try {
                //2、根据连接工厂创建连接对象
                connection = connectionFactory.newConnection();
                //3、根据连接对象创建信道
                channel = connection.createChannel();
                // 这里使用的是 helloworld 简单模型,不需要交换机,不需要路由,只需要队列
                //4、在信道中声明队列
                channel.queueDeclare("queue",false,false,false,null);
                //5、使用该信道进行发送消息
                /**
                 * 参数1 : 队列的名字
                 * 参数2 : 是否自动确认,如果接收方接受了消息之后是否确认收到
                 * 参数3 : 接收到消息之后的业务操作
                 */
                channel.basicConsume("queue", true, new DefaultConsumer(channel){
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("接受了发送方的消息:"+ new String(body));
                    }
                });
            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
  • 相关阅读:
    流程控制语句 流程控制开关
    静态代理IP是什么?一文看懂静态代理IP
    作为一名软件测试工程师,需要具备哪些能力?
    B站视频下载工具的分享
    WPF 笔迹算法 从点集转笔迹轮廓
    【电源专题】BUCK电源效率和输出电压的关系
    C++:迭代器
    音频转文字怎么操作?快来看看这几个方法吧
    C++ Reference: Standard C++ Library reference: C Library: cstdio: fgetpos
    Kubernetes---使用 Deployment 运行一个无状态应用
  • 原文地址:https://blog.csdn.net/rain67/article/details/126881612