• 【RabbitMQ实战】 03 SpringBoot RabbitMQ生产者和消费者示例


    上一节我们写了一段原生API来进行生产和消费的例子。实际上SpringBoot对原生RabbitMQ客户端做了二次封装,让我们使用API的代价更低。

    一、配置文件

    依赖引入

    <dependencies>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-amqpartifactId>
        dependency>
        <dependency>
            <groupId>org.springframework.bootgroupId>
            <artifactId>spring-boot-starter-webartifactId>
        dependency>
        <dependency>
            <groupId>com.google.code.gsongroupId>
            <artifactId>gsonartifactId>
            <version>2.8.6version>
        dependency>
    dependencies>
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    RabbitMQ的配置如下

    spring:
      rabbitmq:
        host: 192.168.56.201
        port: 5672
        username: hello
        password: world
        #虚拟host
        virtual-host: virtual01
        template:
          mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
        publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功
        publisher-returns: true #是否开启生产者returns
        listener:
          simple:
            acknowledge-mode: manual #手动回复方式,一般建议手动回复,即需要我们自己调用对应的ACK方法
            prefetch: 10 #每个消费者可拉取的,还未ack的消息数量
            concurrency: 3 #消费端(每个Listener)的最小线程数
            max-concurrency: 10 #消费端(每个Listener)的最大线程数
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18

    每个配置的具体含义,详见配置

    二、生产者代码

    @Slf4j
    @RestController
    @RequestMapping("/rabbit")
    public class RabbitSendController implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
        private static final String EXCHANGE_NAME = "my_exchange";
        private static final String ROUTING_KEY = "my_routing";
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 正常发送并被broker接收
         * @return
         */
        @RequestMapping("send")
        public String send() {
            for (int i = 0; i < 10; i++) {
                OrderInfo orderInfo = new OrderInfo();
                orderInfo.setAddress("成都市高新区");
                orderInfo.setOrderId(String.valueOf(i));
                orderInfo.setProductName("华为P60:" + i);
    
                //设置回调关联的一个id
                String messageId = UUID.randomUUID().toString();
                log.info("开始发送消息,当前消息关联id为:{}", messageId);
                CorrelationData correlationData = new CorrelationData(messageId);
    
                MessageProperties messageProperties = new MessageProperties();
                messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8))
                        .andProperties(messageProperties).build();
                //设置ack回调
                rabbitTemplate.setConfirmCallback(this);
                //退回消息的回调
                rabbitTemplate.setReturnCallback(this);
                rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message, correlationData);
            }
            return "ok";
        }
    
        /**
         * 设置一个非法的路由键,模拟消息被broker退回的情况,前提是
         * spring.rabbitmq.template.mandatory=true 当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ会调用Basic.Return命令将消息返回给生产者。当为false时,则直接丢弃消息
         * 

    * spring.rabbitmq.publisher-returns=true 生产者回调确认机制,由回调来确定消息是否发布成功 * * @return */ @RequestMapping("send-return") public String sendAndReturn() { OrderInfo orderInfo = new OrderInfo(); orderInfo.setAddress("成都市高新区"); orderInfo.setOrderId("111"); orderInfo.setProductName("小米13"); //设置回调关联的一个id String messageId = UUID.randomUUID().toString(); log.info("开始发送消息,当前消息关联id为:{}", messageId); CorrelationData correlationData = new CorrelationData(messageId); MessageProperties messageProperties = new MessageProperties(); messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); Message message = MessageBuilder.withBody(new Gson().toJson(orderInfo).getBytes(StandardCharsets.UTF_8)) .andProperties(messageProperties).build(); //设置ack回调 rabbitTemplate.setConfirmCallback(this); //退回消息的回调 rabbitTemplate.setReturnCallback(this); //下面这个RoutingKey是没有绑定的,所以发不出去 rabbitTemplate.convertAndSend(EXCHANGE_NAME, "error.routing", message, correlationData); return "ok"; } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData == null) { return; } String messageId = correlationData.getId(); if (ack) { log.info("【confirm回调方法】,消息发布成功,messageId={}", messageId); } else { log.info("【confirm回调方法】,消息发布失败,messageId={}", messageId); } } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("【returnedMessage回调方法】,消息被退回,message={},replyCode:{},replyText:{},exchange:{},routingKey:{}", new String(message.getBody()), replyCode, replyText, exchange, routingKey); } }

    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94

    代码说明

    • 使用RabbitTemplate可以发送消息
    • 这个Controller定义了一个发送的接口,调用RabbitTemplate将消息发送出去
    • 实现了ConfirmCallback接口,对应着我们配置publisher-confirm-type: correlated #生产者回调确认机制,由回调来确定消息是否发布成功。发送成功时,会回调confirm方法。
    • 实现了ReturnCallback接口,对应着我们的配置
      • publisher-returns: true #是否开启生产者returns。开启生产者returns机制,被回退回来的消费,会调用returnedMessage方法
      • mandatory: true #当mandatory设置为true时,交换器无法根据自身的类型和路由键找到一个符合条件的队
    • 注意上面两个实现接口,要调用下面两行代码,才能生效。
      rabbitTemplate.setConfirmCallback(this);
      rabbitTemplate.setReturnCallback(this);

    访问:http://localhost:8080/rabbit/send
    输出日志如下

    开始发送消息,当前消息关联id为:b60196e7-4ff2-4926-8a1f-bd0872b236f8
    开始发送消息,当前消息关联id为:03232b2c-b755-4b46-9a8c-6b2bbf3b2bd6
    【confirm回调方法】,消息发布成功,messageId=b60196e7-4ff2-4926-8a1f-bd0872b236f8
    【confirm回调方法】,消息发布成功,messageId=03232b2c-b755-4b46-9a8c-6b2bbf3b2bd6
    ...
    
    • 1
    • 2
    • 3
    • 4
    • 5

    模拟一个发送失败,消息被Broker退回的情况
    访问:http://localhost:8080/rabbit/send-return
    输出日志如下
    可以发现,即使被退回的情况,confirm方法也会被成功执行。这个要注意。

    开始发送消息,当前消息关联id为:f89eb7c2-340b-42ef-9454-7dca03d895a9
    【returnedMessage回调方法】,消息被退回,message={"orderId":"111","productName":"小米13","address":"成都市高新区"},replyCode:312,replyText:NO_ROUTE,exchange:my_exchange,routingKey:error.routing
    【confirm回调方法】,消息发布成功,messageId=f89eb7c2-340b-42ef-9454-7dca03d895a9
    
    • 1
    • 2
    • 3

    三、消费者代码

    @Slf4j
    @Component
    public class RabbitOrderConsumer {
        private static final String EXCHANGE_NAME = "my_exchange";
        private static final String QUEUE_NAME = "my_queue";
        private static final String ROUTING_KEY = "my_routing";
    
        @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = QUEUE_NAME, durable = "true"),
                exchange = @Exchange(value = EXCHANGE_NAME, type = "topic", durable = "true"), key = ROUTING_KEY)})
        public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
            log.info("接收到消息:{},deliveryTag:{}", new String(message.getBody(), StandardCharsets.UTF_8), tag);
            channel.basicAck(tag, false);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    代码说明

    • 定义了@RabbitListener,将Exchange和Queue进行绑定
    • channel.basicAck(tag, false);是手动回复ack。注意手动回复ack表示该消息已被客户端成功消费,且在配置文件中要配置ack方式为手动,即上面配置文件中的acknowledge-mode: manual
      运行消费者代码,日志输出如下
    接收到消息:{"orderId":"1","productName":"华为P60:1","address":"成都市高新区"},deliveryTag:1
    接收到消息:{"orderId":"4","productName":"华为P60:4","address":"成都市高新区"},deliveryTag:2
    
    • 1
    • 2

    示例代码git仓库:https://gitee.com/syk1234/mqdmo

    四、更多关于@RabbitListener参数说明如下

    @Queue注解为我们提供了队列相关的一些属性,具体如下:

    • name: 队列的名称;

    • durable: 是否持久化;

    • exclusive: 是否独享、排外的;

    • autoDelete: 是否自动删除;

    • arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:

    • x-message-ttl:消息的过期时间,单位:毫秒;

    • x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;

    • x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;

    • x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;

    • x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;

    • x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;

    • x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值

    • x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)

    • x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;

    • x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;

    • x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。

    @RabbitListener 提供消费者配置

    • ackMode:覆盖容器工厂 AcknowledgeMode属性。
    • admin:参考AmqpAdmin.
    • autoStartup:设置为 true 或 false,以覆盖容器工厂中的默认设置。
    • QueueBinding[] bindings:QueueBinding提供监听器队列名称以及交换和可选绑定信息的数组。
    • concurrency:消费并发数。
    • containerFactory:RabbitListenerContainerFactory的bean名称 ,没有则使用默认工厂。
    • converterWinsContentType:设置为“false”以使用“replyContentType”属性的值覆盖由消息转换器设置的任何内容类型标头。
    • errorHandler:消息异常时调用的方法名。
    • exclusive:当为true时,容器中的单个消费者将独占使用 queues(),从而阻止其他消费者从队列接收消息。
    • executor:线程池bean的名称
    • group:如果提供,则此侦听器的侦听器容器将添加到以该值作为其名称的类型为 的 bean 中Collection。
    • id:为此端点管理的容器的唯一标识符。
    • messageConverter:消息转换器。
    • priority:此端点的优先级。
    • String[] queues:监听的队列名称
    • Queue[] queuesToDeclare:监听的队列Queue注解对象,与bindings()、queues()互斥。
    • replyContentType:用于设置回复消息的内容类型。
    • replyPostProcessor:在ReplyPostProcessor发送之前处理响应的 bean 名称 。
    • returnExceptions:设置为“true”以导致使用正常replyTo/@SendTo语义将侦听器抛出的异常发送给发送者。
  • 相关阅读:
    gopacket reassembly源码分析
    Auto-WEKA(Waikato Environment for Knowledge Analysis)
    基于SpringBoot和Vue的商品秒杀系统设计与实现
    iNFTnews | DAO在NFT领域中的作用
    2022最火接口测试神器【ApiFox】APIFox接口测试工具快速使用上手教程
    数据结构——快排与归并
    强化学习学习笔记
    salesforce零基础学习(一百一十二)项目中的零碎知识点小总结(四)
    2022年了!还在用定时器实现动画?赶紧试试requestAnimationFrame吧!
    【SI好文翻译】铜箔表面纹理对损耗的影响:一个有效的模型(四)
  • 原文地址:https://blog.csdn.net/suyuaidan/article/details/133217872