• Spring RabbitMQ那些事(1-交换机配置&消息发送订阅实操)


    一、序言

    在上一节 RabbitMQ中的核心概念和交换机类型 中我们介绍了RabbitMQ中的一些核心概念,尤其是各种交换机的类型,接下来我们将具体讲解各种交换机的配置和消息订阅实操。


    二、配置文件application.yml

    我们先上应用启动配置文件application.yml,如下:

    server:
      port: 8080
    spring:
      rabbitmq:
        addresses: localhost:5672
        username: admin
        password: admin
        virtual-host: /
        listener:
          type: simple
          simple:
            acknowledge-mode: auto
            concurrency: 5
            max-concurrency: 20
            prefetch: 5
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    备注:这里我们指定了RabbitListenerContainerFactory的类型为SimpleRabbitListenerContainerFactory,并且指定消息确认模式为自动确认

    三、RabbitMQ交换机和队列配置

    Spring官方提供了一套 流式API 来定义队列交换机绑定关系,非常的方便,接下来我们定义4种类型的交换机和相应队列的绑定关系。

    1、定义4个队列

    /**
     * 定义4个队列
     */
    @Configuration
    protected static class QueueConfig {
    
    	@Bean
    	public Queue queue1() {
    		return QueueBuilder.durable("queue-1").build();
    	}
    
    	@Bean
    	public Queue queue2() {
    		return QueueBuilder.durable("queue-2").build();
    	}
    
    	@Bean
    	public Queue queue3() {
    		return QueueBuilder.durable("queue-3").build();
    	}
    
    	@Bean
    	public Queue queue4() {
    		return QueueBuilder.durable("queue-4").build();
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    2、定义Fanout交换机和队列绑定关系

    /**
     * 定义Fanout交换机和对应的绑定关系
     */
    @Configuration
    protected static class FanoutExchangeBindingConfig {
    
    	@Bean
    	public FanoutExchange fanoutExchange() {
    		return ExchangeBuilder.fanoutExchange("fanout-exchange").build();
    	}
    
    	/**
    	 * 定义多个Fanout交换机和队列的绑定关系
    	 * @param fanoutExchange
    	 * @param queue1
    	 * @param queue2
    	 * @param queue3
    	 * @param queue4
    	 * @return
    	 */
    	@Bean
    	public Declarables bindQueueToFanoutExchange(FanoutExchange fanoutExchange, Queue queue1, Queue queue2, Queue queue3, Queue queue4) {
    		Binding queue1Binding = BindingBuilder.bind(queue1).to(fanoutExchange);
    		Binding queue2Binding = BindingBuilder.bind(queue2).to(fanoutExchange);
    		Binding queue3Binding = BindingBuilder.bind(queue3).to(fanoutExchange);
    		Binding queue4Binding = BindingBuilder.bind(queue4).to(fanoutExchange);
    		return new Declarables(queue1Binding, queue2Binding, queue3Binding, queue4Binding);
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    备注:这里我们将4个队列绑定到了名为fanout-exchange的交换机上。

    2、定义Direct交换机和队列绑定关系

    @Configuration
    protected static class DirectExchangeBindingConfig {
    
    	@Bean
    	public DirectExchange directExchange() {
    		return ExchangeBuilder.directExchange("direct-exchange").build();
    	}
    
    	@Bean
    	public Binding bindingQueue3ToDirectExchange(DirectExchange directExchange, Queue queue3) {
    		return BindingBuilder.bind(queue3).to(directExchange).with("queue3-route-key");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    备注:这里我们定义了名为direct-exchange的交换机并通过路由keyqueue3-route-keyqueue-3绑定到了该交换机上。


    3、定义Topic交换机和队列绑定关系

    @Configuration
    protected static class TopicExchangeBindingConfig {
    
    	@Bean
    	public TopicExchange topicExchange() {
    		return ExchangeBuilder.topicExchange("topic-exchange").build();
    	}
    
    	@Bean
    	public Declarables bindQueueToTopicExchange(TopicExchange topicExchange, Queue queue1, Queue queue2) {
    		Binding queue1Binding = BindingBuilder.bind(queue1).to(topicExchange).with("com.order.*");
    		Binding queue2Binding = BindingBuilder.bind(queue2).to(topicExchange).with("com.#");
    		return new Declarables(queue1Binding, queue2Binding);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    这里我们定义了名为topic-exchange类型的交换机,该类型交换机支持路由key通配符匹配,*代表一个任意字符,#代表一个或多个任意字符。

    备注:

    1. 通过路由keycom.order.*queue-1绑定到了该交换机上。
    2. 通过路由key com.#queue-2也绑定到了该交换机上。

    4、定义Header交换机和队列绑定关系

    @Configuration
    protected static class HeaderExchangeBinding {
    
    	@Bean
    	public HeadersExchange headersExchange() {
    		return ExchangeBuilder.headersExchange("headers-exchange").build();
    	}
    
    	@Bean
    	public Binding bindQueueToHeadersExchange(HeadersExchange headersExchange, Queue queue4) {
    		return BindingBuilder.bind(queue4).to(headersExchange).where("function").matches("logging");
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    备注:这里我们定义了名为headers-exchange类型的交换机,并通过参数function=loggingqueue-4绑定到了该交换机上。


    四、RabbitMQ消费者配置

    Spring RabbitMQ中支持注解式监听端点配置,用于异步接收消息,如下:

    @Slf4j
    @Component
    public class RabbitMqConsumer {
    
    	@RabbitListener(queues = "queue-1")
    	public void handleMsgFromQueue1(String msg) {
    		log.info("Message received from queue-1, message body: {}", msg);
    	}
    
    	@RabbitListener(queues = "queue-2")
    	public void handleMsgFromQueue2(String msg) {
    		log.info("Message received from queue-2, message body: {}", msg);
    	}
    
    	@RabbitListener(queues = "queue-3")
    	public void handleMsgFromQueue3(String msg) {
    		log.info("Message received from queue-3, message body: {}", msg);
    	}
    
    	@RabbitListener(queues = "queue-4")
    	public void handleMsgFromQueue4(String msg) {
    		log.info("Message received from queue-4, message body: {}", msg);
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    备注:这里我们分别定义了4个消费者,分别用来接受4个队列的消息。

    五、RabbitMQ生产者

    @Slf4j
    @Component
    @RequiredArgsConstructor
    public class RabbitMqProducer {
    
    	private final RabbitTemplate rabbitTemplate;
    
    	public void sendMsgToFanoutExchange(String body) {
    		log.info("开始发送消息到fanout-exchange, 消息体:{}", body);
    
    		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
    		rabbitTemplate.send("fanout-exchange", StringUtils.EMPTY, message);
    	}
    
    	public void sendMsgToDirectExchange(String body) {
    		log.info("开始发送消息到direct-exchange, 消息体:{}", body);
    
    		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
    		rabbitTemplate.send("direct-exchange", "queue3-route-key", message);
    	}
    
    	public void sendMsgToTopicExchange(String routingKey, String body) {
    		log.info("开始发送消息到topic-exchange, 消息体:{}", body);
    
    		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).build();
    		rabbitTemplate.send("topic-exchange", routingKey, message);
    	}
    
    	public void sendMsgToHeadersExchange(String body) {
    		log.info("开始发送消息到headers-exchange, 消息体:{}", body);
    
    		MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setHeader("function", "logging").build();
    		Message message = MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
    		rabbitTemplate.send("headers-exchange", StringUtils.EMPTY, message);
    	}
    
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38

    六、测试用例

    这里写了个简单的Controller用来测试,如下:

    @RestController
    @RequiredArgsConstructor
    public class RabbitMsgController {
    
    	private final RabbitMqProducer rabbitMqProducer;
    
    	@RequestMapping("/exchange/fanout")
    	public ResponseEntity<String> sendMsgToFanoutExchange(String body) {
    		rabbitMqProducer.sendMsgToFanoutExchange(body);
    		return ResponseEntity.ok("广播消息发送成功");
    	}
    
    	@RequestMapping("/exchange/direct")
    	public ResponseEntity<String> sendMsgToDirectExchange(String body) {
    		rabbitMqProducer.sendMsgToDirectExchange(body);
    		return ResponseEntity.ok("消息发送到Direct交换成功");
    	}
    
    	@RequestMapping("/exchange/topic")
    	public ResponseEntity<String> sendMsgToTopicExchange(String routingKey, String body) {
    		rabbitMqProducer.sendMsgToTopicExchange(routingKey, body);
    		return ResponseEntity.ok("消息发送到Topic交换机成功");
    	}
    
    	@RequestMapping("/exchange/headers")
    	public ResponseEntity<String> sendMsgToHeadersExchange(String body) {
    		rabbitMqProducer.sendMsgToHeadersExchange(body);
    		return ResponseEntity.ok("消息发送到Headers交换机成功");
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    1、发送到FanoutExchage

    直接访问http://localhost:8080/exchange/fanout?body=hello,可以看到该消息广播到了4个队列上。

    2023-11-07 17:41:12.959  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到fanout-exchange, 消息体:hello
    2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#1-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
    2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#0-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
    2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#3-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
    2023-11-07 17:41:12.972  INFO 39460 --- [ntContainer#2-4] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello
    
    • 1
    • 2
    • 3
    • 4
    • 5

    2、发送到DirectExchage

    访问http://localhost:8080/exchange/direct?body=hello,可以看到消息通过路由keyqueue3-route-key发送到了queue-3上。

    2023-11-07 17:43:26.804  INFO 39460 --- [nio-8080-exec-1] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到direct-exchange, 消息体:hello
    2023-11-07 17:43:26.822  INFO 39460 --- [ntContainer#3-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-3, message body: hello
    
    • 1
    • 2

    3、发送到TopicExchange

    访问http://localhost:8080/exchange/topic?body=hello&routingKey=com.order.create,路由key为 com.order.create的消息分别发送到了queue-1queue-2上。

    2023-11-07 17:44:45.301  INFO 39460 --- [nio-8080-exec-4] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到topic-exchange, 消息体:hello
    2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#1-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-1, message body: hello
    2023-11-07 17:44:45.312  INFO 39460 --- [ntContainer#2-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-2, message body: hello
    
    • 1
    • 2
    • 3

    4、发动到HeadersExchage

    访问http://localhost:8080/exchange/headers?body=hello,消息通过头部信息function=logging发送到了headers-exchange上。

    2023-11-07 17:47:21.736  INFO 39460 --- [nio-8080-exec-9] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到headers-exchange, 消息体:hello
    2023-11-07 17:47:21.749  INFO 39460 --- [ntContainer#0-3] c.u.r.i.consumer.RabbitMqConsumer        : Message received from queue-4, message body: hello
    
    • 1
    • 2

    七、结语

    下一节我们将会介绍通过两种方式借由RabbitMQ实现延迟消息发送和订阅,敬请期待。
    在这里插入图片描述

  • 相关阅读:
    P1147 连续自然数和 【双指针(尺取法)】
    医学影像特征提取与导出
    MindSpore: mindspore.dataset.CocoDataset返回的dataset的‘category_id’数据是一维向量还是标量
    【2023】COMAP美赛数模中的大型语言模型LLM和生成式人工智能工具的使用
    八个提升编程体验的VS Code插件
    【OpenCV学习】第5课:图像模糊(均值滤波,高斯滤波)
    MyBatis-Plus DQL与其他知识点
    【C++类和对象】探索static成员、友元以及内部类
    ZYNQ自带I2S_IP核分析
    net-java-php-python-社会福利保障系统计算机毕业设计程序
  • 原文地址:https://blog.csdn.net/lingbomanbu_lyl/article/details/134249252