• RabbitMQ:简单模式(Hello World)



    📃个人主页:不断前进的皮卡丘
    🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
    🔥个人专栏:消息中间件

    基本介绍

    先来看看RabbitMQ架构图

    • Broker :接收和分发消息的应用, RabbitMQ Server 就是 Message Broker
    • Virtual host: 出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似 于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost ,每个用户在自己的 vhost 创建 exchange / queue 等
    • Connection: publisher / consumer 和 broker 之间的 TCP 连接
    • Channel :如果每一次访问 RabbitMQ 都建立一个 Connection ,在消息量大的时候建立 TCP连接 的开销将是巨大的,效率也较低。 Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯, AMQP method 包含了 channel id 帮助客 户端和 message broker 识别 channel ,所以 channel 之间是完全隔离的。 Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP 连接 的开销
    • Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key ,分发消息到 queue 中去。常用的类型有: direct (point-to-point), topic (publish-subscribe) and fanout(multicast)
    • Queue: 消息最终被送到这里等待 consumer 取走
    • Bindingexchange 和 queue 之间的虚拟连接, binding 中可以包含 routing key , Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据

    简单模式
    image.png

    生产者

    我们先创建一个maven工程,然后引入相关依赖

    <dependencies>
      <dependency>
        <groupId>com.rabbitmqgroupId>
        <artifactId>amqp-clientartifactId>
        <version>5.16.0version>
        <scope>compilescope>
      dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    public class Producer {
        //队列名称
        static final String QUEUE_NAME = "helo-queue";
    
        public static void main(String[] args) {
            try {
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                //设置连接参数
                //服务器IP地址
                factory.setHost("192.168.88.133");
                //连接端口
                factory.setPort(5672);
                //设置连接的虚拟机名称
                factory.setVirtualHost("/myhost");
                //用户名
                factory.setUsername("admin");
                //密码
                factory.setPassword("123456");
    
                //2.创建Connection对象
                Connection connection = factory.newConnection();
                //3.创建信道对象
                Channel channel = connection.createChannel();
                //4.声明队列(队列名称,是否持久化,是否独占连接,是否在不适用队列的时候自动删除,队列其他参数)
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                //5.准备发送信息
                String msg="hello rabbitmq!!!!!";
                /**
                 * 参数1:交换机名称,不填写交换机名称的话则使用默认的交换机
                 * 参数2:队列名称(路由key)
                 * 参数3:其他参数
                 * 参数4:消息内容
                 */
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
                System.out.println("已经发送消息到队列");
    
                // 关闭资源
                channel.close();
                connection.close();
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52

    然后我们运行程序,在浏览器输入服务器地址以及对应端口15672,我们可以看到队列以及发送的消息
    image.png
    image.png

    消费者

    • 独占队列意思是只有一个连接可以操作改队列
    public class Consumer {
        //队列名称
        static final String QUEUE_NAME = "helo-queue";
    
        public static void main(String[] args) {
            try {
                //1.创建连接工厂
                ConnectionFactory factory = new ConnectionFactory();
                //设置连接参数
                //服务器IP地址
                factory.setHost("192.168.88.133");
                //连接端口
                factory.setPort(5672);
                //设置连接的虚拟机名称
                factory.setVirtualHost("/myhost");
                //用户名
                factory.setUsername("admin");
                //密码
                factory.setPassword("a87684009.");
    
                //2.创建Connection对象
                Connection connection = factory.newConnection();
                //3.创建信道对象
                Channel channel = connection.createChannel();
                //4.声明队列(队列名称,是否持久化,是否独占连接,是否在不使用队列的时候自动删除,队列其他参数)
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                //5.接收消息
                DefaultConsumer consumer=new DefaultConsumer(channel){
                    /**
                     * 消费回调函数,当收到消息以后,会自动执行这个方法
                     * @param consumerTag 消费者标识
                     * @param envelope    消息包的内容(比如交换机,路由key,消息id等)
                     * @param properties   属性信息
                     * @param body         消息数据
                     * @throws IOException
                     */
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消息内容body:"+new String(body,"UTF-8"));
    
                    }
                };
    
                //监听消息(队列名称,是否自动确认消息,消费对象)
    
                channel.basicConsume(QUEUE_NAME,true,consumer);
    
    
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
    
    
        }
    
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62

    消费者不能关闭连接,生产者可以关闭连接
    image.png
    image.png

    简单模式总结

    image.png

    • P:生产者,也就是发送消息的程序
    • C:消费者,也就是消息的接收着,会一直等待消息的发送,不能关闭连接
    • Queue:消息队列,类似邮箱,可以缓存消息。生产者向其投递消息,消费者从中取出消息。
  • 相关阅读:
    SpringCloud学习笔记-Nacos服务分级存储模型
    SA实战 ·《SpringCloud Alibaba实战》专栏开篇
    实现图的遍历
    Python dcm转jpg与jpg转dcm
    【Svelte】-(5)DOM事件 / 事件修饰符 / 组件事件 / 事件转发 (组件之间的事件沟通)
    Interceptor的使用场景:拦截请求中的租户信息,注入到租户上下文中
    js日期排序
    【unity实战】Unity实现2D人物双击疾跑
    简单但现代的服务器仪表板Dashdot
    数据库的约束和设计
  • 原文地址:https://blog.csdn.net/qq_52797170/article/details/127134557