• RabbitMQ第一个实操小案例


    二、RabbitMQ实操小案例

    进入RabbitMQ的官网。选择 Document 菜单项,然后点击Tutorials下的 Get Started。我们可以看看官网给我们的案例实操。
    在这里插入图片描述
    里面一共有7个Demo,下面我们就根据他给的Demo进行实操,熟悉一下我们的RabbitMq。
    前面的两种,是最基本的,只有发布者和订阅者和一个队列,并没有我们的交换机。
    其中,
    第一种属于基本消息队列
    第二种属于工作消息队列
    在这里插入图片描述
    后面这三种是有交换机的,根据交换机类型的不同,发布和订阅的方式可以分为三种:

    1. 广播(对应官网的案例3——Publish/Subscribe)
    2. 路由(对应官网的案例4——Routing)
    3. 主题(对应官网的案例5——Topics)

    下面,我们就来跑一下官网的这些案例。

    2.1 Hello World!

    官方的Hello World案例,只有三个角色,Publisher(发布者)、Queue(队列)、Consumer(消费者)。
    在这里插入图片描述

    其中,

    • Publisher(发布者)只负责把消息不停地发送到Queue(队列)中
    • Queue(队列)负责接收并缓存消息
    • Consumer(消费者)负责订阅队列中的消息

    下面,我们来写一个最简单的实例,首先,创建一个空白的Module作为父工程。(结构要多干净就多干净)
    在这里插入图片描述
    然后我们写一下父工程的pom.xml文件如下:

    
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0modelVersion>
    
        <groupId>org.examplegroupId>
        <artifactId>mq-demo1artifactId>
        <version>1.0-SNAPSHOTversion>
    
        <packaging>pompackaging>
    
        <parent>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-parentartifactId>
            <version>2.3.9.RELEASEversion>
            <relativePath/>
        parent>
    
        <properties>
            <maven.compiler.source>8maven.compiler.source>
            <maven.compiler.target>8maven.compiler.target>
        properties>
    
        <dependencies>
            <dependency>
                <groupId>org.projectlombokgroupId>
                <artifactId>lombokartifactId>
            dependency>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-amqpartifactId>
            dependency>
            
            <dependency>
                <groupId>org.springframework.bootgroupId>
                <artifactId>spring-boot-starter-testartifactId>
            dependency>
        dependencies>
    
    project>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    然后,我们创建两个Module,一个叫publisher(发布者),一个叫consumer(消费者),为了尽可能地干净,我用IDEA,直接创建了两个空白的Maven工程。
    在这里插入图片描述
    然后,我们先写Publisher这个Module。SpringBoot的流程应该都滚瓜烂熟了吧。首先写我们SpringBoot的启动类 PublisherApplication.java ,如下:

    package com.demo.mq;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class PublisherApplication {
        public static void main(String[] args) {
            SpringApplication.run(PublisherApplication.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    然后我们写我们的测试文件 PublisherTest.java 如下:

    package com.demo.mq.test;
    
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class PublisherTest {
        @Test
        public void sendMsg() throws IOException, TimeoutException{
            //1、建立连接
            ConnectionFactory factory = new ConnectionFactory();
            //1.1 设置连接参数(主机号、端口号、vhost、用户名、密码)
            factory.setHost("192.168.10.20");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("admin");
            factory.setPassword("root");
    
            //1.2 建立连接
            Connection connection = factory.newConnection();
    
            //2. 创建通道 Channel
            Channel channel = connection.createChannel();
    
            //3. 创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            //4. 发送消息
            String msg = "Hello,World!";
            channel.basicPublish("", queueName, null, msg.getBytes());
            System.out.println("消息发送成功:"+msg);
    
            //5. 关闭通道和连接
            channel.close();
            connection.close();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    运行该测试方法后,我们查看我们的RabbitMQ控制台可以看到,此时在Queues下有一个叫simple.queue的队列,该队列的虚拟主机是/,此时有一条记录(Ready)正准备被别人收听:
    在这里插入图片描述
    然后,我们写我们的消费者的代码:
    首先,先写我们的SpringBoot启动文件 ConsumerApplication.java

    package com.demo.mq;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ConsumerApplication {
        public static void main(String[] args) {
            SpringApplication.run(ConsumerApplication.class, args);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    然后写我们的测试类 ConsumerTest.java

    package com.demo.mq.test;
    
    import com.rabbitmq.client.*;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class ConsumerTest {
        @Test
        public void testConsumer() throws IOException, TimeoutException {
            // 1.建立连接
            ConnectionFactory factory = new ConnectionFactory();
            // 1.1. 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
            factory.setHost("192.168.10.20");
            factory.setPort(5672);
            factory.setVirtualHost("/");
            factory.setUsername("admin");
            factory.setPassword("root");
            // 1.2. 建立连接
            Connection connection = factory.newConnection();
    
            // 2. 创建通道Channel
            Channel channel = connection.createChannel();
    
            // 3. 创建队列
            String queueName = "simple.queue";
            channel.queueDeclare(queueName, false, false, false, null);
    
            // 4. 订阅消息
            channel.basicConsume(queueName, true, new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope,
                                           AMQP.BasicProperties properties, byte[] body) throws IOException {
                    // 5. 处理消息
                    String message = new String(body);
                    System.out.println("接收到消息:【" + message + "】");
                }
            });
            System.out.println("等待接收消息。。。。");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    然后运行我们的启动文件,结果如下:
    在这里插入图片描述
    这里,之所以先输出等待接收消息,是因为RabbitMQ消息队列传输是异步的,在System.out.println(“等待接收消息。。。。”); 执行完后,Hello,World!才从RabbitMQ中被消费者接收到并打印在控制台上。

    此时,我们的RabbitMQ的Queue如下,可以看到,已经等待接收的消息了:
    在这里插入图片描述

    2.2 Spring AMQP

    在介绍第二个案例前,为了简化我们消息的发送和接收,我们先介绍一下SpringAMQP。那么,什么是AMQP,什么又是Spring AMQP呢?

    • AMQP(Advanced Message Queuing Protocol):用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
    • Spring AMQP:基于** AMQP 协议** 定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中,spring-amqp 是基础抽象spring-rabbit 是底层的默认实现

    Spring AMQP 是Spring 全家桶里面的一个“小辣翅”。官网地址:https://spring.io/projects/spring-amqp

    官方说他的特征主要有三:

    1. 用于异步处理入站消息的侦听器容器。
    2. RabbitTemplate,发送和接收消息。
    3. RabbitAdmin,用于自动声明队列,交换和绑定。

    说完 SpringAMQP,我们用他来实现上面的Hello World!消息的发送和接收。

    步骤一、引入 SpringAMQP 依赖。

    
     <dependency>
         <groupId>org.springframework.bootgroupId>
         <artifactId>spring-boot-starter-amqpartifactId>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    步骤二、配置RabbitMQ地址。
    使用springAMQP后,我们就不需要每次都自己去配置我们的rabbitMQ的IP地址、端口号、账号、密码和虚拟主机了。我们只需要在application.yaml文件中配置即可:

    spring:
      rabbitmq:
        host: 192.168.83.129
        port: 5672
        virtual-host: /
        username: admin
        password: root
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    步骤三、在publisher服务中编写测试方法,向simple.queue发送消息。

    配置好我们的 rabbitMQ 地址后,我们编写我们的测试方法如下:

    package com.demo.mq.test;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    import javax.annotation.Resource;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringAmqpTest {
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void testPublish() {
            String queueName = "simple.queue";
            String message = "Hello,World!";
            rabbitTemplate.convertAndSend(queueName, message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    编写完毕,我们跑一下这个测试方法,结果如下:
    在这里插入图片描述
    我们再看一下我们的rabbitMQ客户端,可以看到,他确确实实有收到一条消息。
    在这里插入图片描述

    OK,发送我们会了,那么接收呢?其实很简单,也是三个步骤:

    步骤一、引入 spring-amqp 依赖。

    
     <dependency>
         <groupId>org.springframework.bootgroupId>
         <artifactId>spring-boot-starter-amqpartifactId>
     dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    步骤二、配置 RabbitMQ 地址。

    spring:
      rabbitmq:
        host: 192.168.83.129
        port: 5672
        virtual-host: /
        username: admin
        password: root
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    步骤三、在 consumer 服务中编写方法,接收simple.queue的消息。(记得将服务注册到Spring容器中)

    package com.demo.mq.listener;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class SpringRabbitListener {
    
        @RabbitListener(queues = "simple.queue")
        public void listenQueueMessage(String msg) throws InterruptedException{
            System.out.println("监听到的消息为:【"+ msg +"】");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    然后我们运行我们的 ConsumerApplication,可以看到,成功接收到了RabbitMQ请求队列刚才接收到的消息。
    在这里插入图片描述
    我们甚至可以再去publisher里面再发送几次消息,看看是不是能正常收到。这里,我又跑去发送了两次,可以看到,我们都可以正常的接收到消息。
    在这里插入图片描述

  • 相关阅读:
    反向传播back propagation
    dva搭建项目实例
    CV每日论文--2024.6.25
    大浪淘沙,自动驾驶迎来下半场
    算法通关村第九关-白银挑战二分查找与高频搜索树
    Flask 学习-27.flask_jwt_extended插件学习current_user的使用
    算法工程题(二叉树递归)
    LQ0190 李白打酒【填空题】
    排序(3)【归并排序】【计数排序】【排序算法度及其稳定性分析】
    传输层——TCP协议
  • 原文地址:https://blog.csdn.net/weixin_44741023/article/details/127154722