• Spring-RabbitMQ 异步消息接收实践


    Springboot 版本: 2.7.0

    核心组件

    Spring-RabbitMQ 异步消息接收实现主要有2个核心组件:

    1. MessageListener (package org.springframework.amqp.core.MessageListener):用于接收AMQP消息异步投递的一个接口。用户需要实现该接口去与消息中间件进行交互。
    2. Container: 是 AMQP 队列和 MessageListener 实例之间的一个桥梁,必须为其提供一个 ConnectionFactory、 一个MessageListener 、 一个或多个Queue。

    代码实现

    案例一、 实现MessageListener接口接收消息

    RabbitMQ 配置类:

    @Slf4j
    @Configuration
    public class RabbitConfiguration {
        public final static String TOPIC_EXCHANGE = "myExchange";
        public final static String QUEUE_NAME = "myPushQueue";
    
        @Bean
        public RabbitAdmin amqpAdmin(ConnectionFactory connectionFactory) {
            return new RabbitAdmin(connectionFactory);
        }
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            template.setMessageConverter(new Jackson2JsonMessageConverter());
            template.setExchange(TOPIC_EXCHANGE);
            template.setConfirmCallback((correlationData, ack, cause) -> {
                if (ack) {
                    log.info("消息:{}发送成功", correlationData.getId());
                } else {
                    log.error("消息:{}发送失败,失败原因为:{}", correlationData.getId(), cause);
                }
            });
    
            template.setMandatory(true);
            template.setReturnsCallback(returned -> {
                log.error("消息:{}路由失败, 失败原因为:{}", returned.getMessage().toString(), returned.getReplyText());
            });
            return template;
        }
    
        @Bean
        public TopicExchange topicExchange() {
            return new TopicExchange(TOPIC_EXCHANGE, true, false);
        }
    
        @Bean
        public Queue queue() {
            return new Queue(QUEUE_NAME);
        }
    
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(queue()).to(topicExchange()).with("my.test.*");
        }
    
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueues(queue());
            container.setMessageListener(exampleListener());
            return container;
        }
    
    	// 消费者接收消息
        @Bean
        public MessageListener exampleListener() {
            return message -> log.info("received -> {}", message);
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61

    配置文件:

    spring:
      rabbitmq:
        host: localhost
        port: 5672
        username: admin
        password: admin
        virtual-host: my_vhost
        # 消息确认(ACK)
        publisher-confirm-type: CORRELATED #correlated #确认消息已发送到交换机(Exchange)
        publisher-returns: true #确认消息已发送到队列(Queue)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    生产者:

        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(){
            CorrelationData correlationData = new CorrelationData();
            rabbitTemplate.convertAndSend("my.test.msg", new User("Kleven", 18), correlationData);
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    @ToString
    public class User implements Serializable {
        private static final long serialVersionUID = -5079682733940745661L;
    
        private String name;
        private Integer age;
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    结果:

        : received -> (Body:'{"name":"Kleven","age":18}' MessageProperties [headers={spring_listener_return_correlation=4c01ac5f-196e-498f-973b-429d023087e3, spring_returned_message_correlation=bd9d577f-c427-4fa2-b37e-423665f59e4f, __TypeId__=info.kleven.javacamp.spring.rabbitmq.domain.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myExchange, receivedRoutingKey=my.test.msg, deliveryTag=1, consumerTag=amq.ctag-UvD_6hpDGZVjIeGDtoOtXw, consumerQueue=myPushQueue])
        : 消息:bd9d577f-c427-4fa2-b37e-423665f59e4f发送成功
    
    • 1
    • 2

    案例二、 实现ChannelAwareMessageListener接口接收消息

    可以同时获得Channel对象实例,只修改案例一配置中的消费者代码。
    配置类:

        @Bean
        public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            container.setQueues(queue());
            container.setMessageListener(exampleChannelAwareMessageListener());
            return container;
        }
    
    	//消费者
        @Bean
        public ChannelAwareMessageListener exampleChannelAwareMessageListener() {
            return (message, channel) -> {
                log.info("channel -> {}", channel);
                log.info("received -> {}", message);
            };
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    结果:

         : channel -> Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://admin@127.0.0.1:5672/my_vhost,1), conn: Proxy@5101669b Shared Rabbit Connection: SimpleConnection@bc510fd6 [delegate=amqp://admin@127.0.0.1:5672/my_vhost, localPort= 56290]
         : received -> (Body:'{"name":"Kleven","age":18}' MessageProperties [headers={spring_listener_return_correlation=71e02a0e-2067-43ac-bf4e-6eb561df9a6b, spring_returned_message_correlation=2cab0928-19c1-49fb-a02a-e75331665207, __TypeId__=info.kleven.javacamp.spring.rabbitmq.domain.User}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=myExchange, receivedRoutingKey=my.test.msg, deliveryTag=1, consumerTag=amq.ctag-q-VGpyVJmm9ikD_Ku4jMZg, consumerQueue=myPushQueue])
         : 消息:2cab0928-19c1-49fb-a02a-e75331665207发送成功
    
    • 1
    • 2
    • 3

    案例三、使用@RabbitListener标签

    多数情况下都会使用这个方案,很方便也很灵活。下面是个简单的例子。
    从案例一的配置类中移除消费者相关代码,其他不变。

    消费者:

    @RabbitListener(queues = "myPushQueue")
    public void processOrder(String message) {
        log.info("received -> {}", message);
    }
    
    • 1
    • 2
    • 3
    • 4

    结果:

         : 消息:9bacf20e-3bb1-4f5e-95a9-a5fb9cd7ff9a发送成功
         : received -> {"name":"Kleven","age":18}
    
    • 1
    • 2
  • 相关阅读:
    【Linux-Windows】通过浏览器批量下载图像数据
    【状语从句练习题】从句时态补充
    mksh linux
    WEB 渗透之信息收集
    老嫂子的保姆级科普 选择视频剪辑软件就从阅读本文开始
    ChatGLM2 源码解析:`ChatGLMTokenizer`
    查找最大元素
    【数据结构】栈和队列
    Milvus向量数据库:高效处理海量非结构化数据的利器
    Linux入门操作介绍
  • 原文地址:https://blog.csdn.net/u012359704/article/details/126273872