• RabbitMQ-java使用消息队列


    1 java操作消息队列

    1.1 java实现生产者

    新建一个springboot项目,导入依赖

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.14.2</version>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述
    导入依赖后,实现生产者和消费者,首先是生产者,生产者负责将消息发送到消息队列

        public static void main(String[] args) {
            //使用ConnectionFactory来创建连接
            ConnectionFactory factory = new ConnectionFactory();
    
            //设定连接信息,基操
            factory.setHost("8.130.172.119");
            factory.setPort(5672);  //注意这里写5672,是amqp协议端口
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/test");
    
            //创建连接
            try(Connection connection = factory.newConnection()){
    
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    在这里插入图片描述
    直接在程序种定义并创建消息队列,客户端需要通过连接(connection)创建一个新的通道(Channel),同一个连接下可以很多个通道,这样就不用创建很多个连接也能支持分开发送。

    try(Connection connection = factory.newConnection();
        Channel channel = connection.createChannel()){   //通过Connection创建新的Channel
      	//声明队列,如果此队列不存在,会自动创建
        channel.queueDeclare("yyds", false, false, false, null);
      	//将队列绑定到交换机
        channel.queueBind("yyds", "amq.direct", "my-yyds");
      	//发布新的消息,注意消息需要转换为byte[]
        channel.basicPublish("amq.direct", "my-yyds", null, "Hello World!".getBytes());
    }catch (Exception e){
        e.printStackTrace();
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    其中queeuDeclare方法的参数如下:

    • queue: 队列的名称 (默认创建后routingKey和队列名称一致)
    • durable: 是否持久化。
    • exclusive: 是否排他,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。排他队列是基于Connection可见,同-个Connection的不同Channel是可以同时访问同-个连接创建的排他队列,并且,如果一个Connection已经声明了一个排他队列,其他的Connection是不允许建立同名的排他队列的,即使该队列是持久化的,一旦Connection关闭或者客户端退出,该排他队列都会自动被删除。
    • autoDelete: 是否自动删除
    • arguments:设置队列的其他一些参数,这里我们暂时不需要什么其他参数

    其中queueBind方法的参数如下:

    • queue: 需要绑定的队列名称
    • exchange: 需要绑定的交换机名称。
    • routingKey:

    其中basicPublic方法的参数如下:

    • exchange: 对应的Exchange名称,我们这里就使用第二个直连交换机。
    • routingKey: 这里我们填写绑定时指定的routingKey,其实和之前在管理页面操作一样
    • props: 其他的配置。
    • body: 消息本体

    当前队列状态
    在这里插入图片描述
    运行测试类
    在这里插入图片描述
    出现新的队列
    在这里插入图片描述
    点击队列名称进入详情
    在这里插入图片描述
    获取到java发送的消息
    在这里插入图片描述

    1.2 java实现消费者

    直接从指定的队列去取出数据,不需要再管交换机,但是还是通过connection去创建channel。消费者代码如下

    package com.example.rbbitmqtest;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
        public static void main(String[] args) throws IOException, TimeoutException {
            //使用ConnectionFactory来创建连接
            ConnectionFactory factory = new ConnectionFactory();
    
            //设定连接信息,基操
            factory.setHost("8.130.172.119");
            factory.setPort(5672);  //注意这里写5672,是amqp协议端口
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/test");
    
            //这里不使用try-with-resource,因为消费者是一直等待新的消息到来,然后按照
            //我们设定的逻辑进行处理,所以这里不能在定义完成之后就关闭连接
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //创建一个基本的消费者
            channel.basicConsume("yyds", false, (s, delivery) -> {
                System.out.println(new String(delivery.getBody()));
                //basicAck是确认应答,第一个参数是当前的消息标签,后面的参数是
                //是否批量处理消息队列中所有的消息,如果为false表示只处理当前消息
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                //basicNack是拒绝应答,最后一个参数表示是否将当前消息放回队列,如果
                //为false,那么消息就会被丢弃
                //channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, false);
                //跟上面一样,最后一个参数为false,只不过这里省了
                //channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
            }, s -> {});
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    当创建完basicConsume后会一直等待消息,不会自动关闭。

    其中basucConsume方法参数如下:

    • queue - 消息队列名称,直接指定。
    • autoAck今自动应答,消费者从消息队列取出数据后,需要跟服务器进行确认应答,当服务器收到确认后,会自动将消息删除,如果开启自动应答,那么消息发出后会直接删除。
    • deliver - 消息接收后的函数回调,我们可以在回调中对消息进行处理,处理完成后,需要给服务器确认应答
    • cancel - 当消费者取消订阅时进行的函数回调,这里暂时用不到。
      其中第二个参数为false时,需要手动调用ack的四种方式,若为true,则会默认为ack中的第二个方式,即拿到消息后就把消息消耗掉队列就不存在。
      在这里插入图片描述

    测试channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false,false);
    其中第一个false代表是否把后边消息全部处理了,false代表不处理
    第二个false代表是否丢回消息队列,若为true,得到消息后还会把消息返回给消息队列

    在这里插入图片描述
    在这里插入图片描述
    此时消息队列没有消息了
    在这里插入图片描述
    在界面把队列删除
    在这里插入图片描述
    以上为RabbitMQ的简单使用,通过java连接到服务器。

    2 SpringBoot整合消息队列客户端

    新建一个springboot项目

    导入依赖

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    • 1
    • 2
    • 3
    • 4

    在这里插入图片描述

    修改配置文件

    在这里插入图片描述

    创建配置类

    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    @Configuration
    public class RabbitConfiguration {
        @Bean("directExchange")  //定义交换机Bean,可以很多个
        public Exchange exchange(){
            return ExchangeBuilder.directExchange("amq.direct").build();
        }
    
        @Bean("yydsQueue")     //定义消息队列
        public Queue queue(){
            return QueueBuilder
              				.nonDurable("yyds")   //非持久化类型
              				.build();
        }
    
        @Bean("binding")
        public Binding binding(@Qualifier("directExchange") Exchange exchange,
                               @Qualifier("yydsQueue") Queue queue){
          	//将我们刚刚定义的交换机和队列进行绑定
            return BindingBuilder
                    .bind(queue)   //绑定队列
                    .to(exchange)  //到交换机
                    .with("my-yyds")   //使用自定义的routingKey
                    .noargs();
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29

    在这里插入图片描述

    2.1 创建一个生产者

    当前的管理端队列为空
    在这里插入图片描述

    直接在测试类中:

    @SpringBootTest
    class SpringCloudMqApplicationTests {
    
      	//RabbitTemplate为我们封装了大量的RabbitMQ操作,已经由Starter提供,因此直接注入使用即可
        @Resource
        RabbitTemplate template;
    
        @Test
        void publisher() {
          	//使用convertAndSend方法一步到位,参数基本和之前是一样的
          	//最后一个消息本体可以是Object类型,非常方便
            template.convertAndSend("amq.direct", "my-yyds", "Hello World11!");
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    运行测试类
    在这里插入图片描述
    新的消息队列
    在这里插入图片描述
    查看详情
    在这里插入图片描述
    取出消息
    在这里插入图片描述

    2.2 创建消费者

    因为消费者实际上就是一直等待消息然后处理的角色,因此只需要创建一个监听器就行了,它会一直等待消息到来然后在进行处理:

    @Component  //注册为Bean
    public class TestListener {
    
        @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
        public void test(Message message){
            System.out.println(new String(message.getBody()));
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在这里插入图片描述

    修改pom文件,改变依赖

    在这里插入图片描述
    在这里插入图片描述

    修改依赖改为web方式后会一直启动,这样监听器也会一直监听

    在这里插入图片描述

    启动后可以看到队列的消息即被监听到了。

    在这里插入图片描述

    测试监听效果

    在这里插入图片描述

    控制台不用重启可以监听到消息

    在这里插入图片描述

    若需要确保消息能够被消费者接收并处理,然后得到消费者的反馈,也可以,修改测试类中生产者测试代码:

    @Test
    void publisher() {
      	//会等待消费者消费然后返回响应结果
        Object res = template.convertSendAndReceive("amq.direct", "my-yyds", "Hello World!");
        System.out.println("收到消费者响应:"+res);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    在这里插入图片描述

    修改消费者监听代码

        @RabbitListener(queues = "yyds")   //定义此方法为队列yyds的监听器,一旦监听到新的消息,就会接受并处理
        public String test(String message){
            System.out.println(message);
            return "响应成功";
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    在这里插入图片描述
    重启application,然后运行生产者发送消息测试类.在这里插入图片描述

    如果说需要直接接收一个JSON格式消息,并希望得到实体类。

    在这里插入图片描述
    在这里插入图片描述

    创建用于JSON转换的Bean,这样就可以接收的JSON格式转化为实体类对象,发送的时候也是以JSON格式发送

    在这里插入图片描述

    消费者指定转换器,修改接收对象。然后重启application服务

    在这里插入图片描述

    在rabbitmq网页管理端发送json数据{"id":1,"name":"LB"}
    在这里插入图片描述

    接收成功

    在这里插入图片描述

    测试类发送实体类格式信息

    在这里插入图片描述

    接收到消息

    在这里插入图片描述
    这样就实现了Springboot操作rabbitmq消息队列

  • 相关阅读:
    【云原生 • Kubernetes】k8s功能特性、k8s集群架构介绍
    MySQL的事务隔离是如何实现的?
    滑动页面滚动条,获取div显示顶部是那个div
    Wayland:推动Linux桌面进入下一代图形显示时代
    简述人工智能,及其三大学派:符号主义、连接主义、行为主义
    Leetcode 454 四数相加II(哈希表 + getOrDefault方法用于获取Map中指定键的值,如果键不存在,则返回一个默认值)
    Linux操作-1之内容切割命令cut
    Liunx 服务器安装Oracle JDK
    【校招VIP】产品思维设计之需求分析
    React 组件性能优化
  • 原文地址:https://blog.csdn.net/weixin_43917045/article/details/133527616