• RabbitMQ系列【16】AmqpTemplate接口详解


    有道无术,术尚可求,有术无道,止于术。

    前言

    RabbitTemplatespring-amqp提供的一个 RabbitMQ 消息操作模板类,在之前我们使用它完成了简单的消息发送。

    RabbitTemplate 主要提供了发送消息、接收消息以及其他附加功能,内部封装了RabbitMQ原生API,大大简化了使用 RabbitMQ操作。

    RabbitTemplate 主要实现了AmqpTemplateRabbitOperations接口:
    在这里插入图片描述

    AmqpTemplate

    AmqpTemplate接口主要声明了三类方法:

    public interface AmqpTemplate {
    	// 发送消息
        void send(Message var1) throws AmqpException;
    
        // 接收消息
        Message receive() throws AmqpException;
    	
        // 发送消息并接收回复
        Message sendAndReceive(Message var1) throws AmqpException;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    API

    首先,我们看下AmqpTemplate中声明的各种方法。

    send

    send 方法一共有三个,需要创建Message消息对象,将消息封装到该对象内发送,如果没有指定交换机、路由键,将使用默认值,也就是空字符串。

    	// 发送消息到默认交换机、默认路由KEY
    	void send(Message message) throws AmqpException;
    
    	// 发送消息到默认交换机、使用指定路由KEY
    	void send(String routingKey, Message message) throws AmqpException;
    
    	// 发送消息到指定交换机、使用指定路由KEY
    	void send(String exchange, String routingKey, Message message) throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    示例:

            Message message = new Message("消息".getBytes());
            rabbitTemplate.send(message);
            rabbitTemplate.send("route.key", message);
            rabbitTemplate.send("exchange_name", "route.key", message);
    
    • 1
    • 2
    • 3
    • 4

    convertAndSend

    convertAndSend 方法可以转换对象并发送,并可以添加一个消息处理器MessagePostProcessor

    	// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用默认路由KEY
    	void convertAndSend(Object message) throws AmqpException;
    
    	// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEY
    	void convertAndSend(String routingKey, Object message) throws AmqpException;
    
    	// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY
    	void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
    
    	 // 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY
    	 // 在发送消息之前添加一个消息处理器MessagePostProcessor 
    	void convertAndSend(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
    
    	// 将Java对象转换为Amqp{@link Message}并将其发送到默认交换机、使用自定义路由KEY
    	// 在发送消息之前添加一个消息处理器MessagePostProcessor 
    	void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
    			throws AmqpException;
    
    	// 将Java对象转换为Amqp{@link Message}并将其发送到自定义交换机、使用自定义路由KEY
    	// 在发送消息之前添加一个消息处理器MessagePostProcessor
    	void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor)
    			throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    MessagePostProcessor 是一个函数型接口,提供了一个postProcessMessage方法处理消息,由于直接发送的是对象,如果需要设置一些消息的属性,就需要使用该接口进行设置,例如:

            MessagePostProcessor messagePostProcessor = message1 -> {
                MessageProperties messageProperties = message1.getMessageProperties();
                messageProperties.setExpiration("1000");
                return message1;
            };
            rabbitTemplate.convertAndSend("","","消息",messagePostProcessor);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    receive

    一般获取消息有两种处理模式:

    • push:由RabbitMQ主动将消息推送给订阅队列的消费者,调用channel.basicConsume方法。
    • pull:主动从指定队列中拉取消息,需要消费者调用channel.basicGet方法。

    receive 方法,就是从主动队列中获取消息。

    	// 如果默认队列中有消息,则接收消息。立即返回,可能有NULL值
    	@Nullable
    	Message receive() throws AmqpException;
    
    	// 从指定队列中获取消息。立即返回,可能有NULL值
    	@Nullable
    	Message receive(String queueName) throws AmqpException;
    
    	// 如果默认队列中有消息,则接收消息。可能有NULL值,并指定一个超时时间
    	@Nullable
    	Message receive(long timeoutMillis) throws AmqpException;
    
    	// 从指定队列中获取消息。可能有NULL值,并指定一个超时时间
    	@Nullable
    	Message receive(String queueName, long timeoutMillis) throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15

    receiveAndConvert

    receiveAndConvert可以拉取消息并进行对象转换。

    
    	// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。
    	@Nullable
    	Object receiveAndConvert() throws AmqpException;
    
    	// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。
    	@Nullable
    	Object receiveAndConvert(String queueName) throws AmqpException;
    
    	// 如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能有NULL值,并指定一个超时时间
    	@Nullable
    	Object receiveAndConvert(long timeoutMillis) throws AmqpException;
    
    	// 从指定队列中接收消息并将其转换为Java对象,并指定一个超时时间,可能为null 值。
    	@Nullable
    	Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
    
    	//  如果默认队列中有消息,则接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。
    	@Nullable
    	<T> T receiveAndConvert(ParameterizedTypeReference<T> type) throws AmqpException;
    
    	// 从指定队列中接收消息并将其转换为Java对象。立即返回,可能为null 值。并可以添加一个消息转换器SmartMessageConverter。
    	@Nullable
    	<T> T receiveAndConvert(String queueName, ParameterizedTypeReference<T> type) throws AmqpException;
    
    	// 如果默认队列中有消息,则接收消息并将其转换为Java对象。可能有NULL值,并指定一个超时时间及消息转换器SmartMessageConverter。
    	@Nullable
    	<T> T receiveAndConvert(long timeoutMillis, ParameterizedTypeReference<T> type) throws AmqpException;
    
    	// 从指定队列中接收消息并将其转换为Java对象。可能为null 值。并指定一个超时时间及消息转换器SmartMessageConverter。
    	@Nullable
    	<T> T receiveAndConvert(String queueName, long timeoutMillis, ParameterizedTypeReference<T> type)
    			throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33

    示例:

            // 接收消息,队列存在是,会报错;队列中没有消息,返回NULL
            Message bootQueue1= rabbitTemplate.receive("bizQueue");
            Message bootQueue2 = rabbitTemplate.receive("bizQueue",1000);
            Message bootQueue3= rabbitTemplate.receive("backupQueue");
    
    • 1
    • 2
    • 3
    • 4

    使用消息转换器,可以直接发送、接收对象:

            // User 需要实现Serializable
            User user = new User();
            user.setName("张三");
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            rabbitTemplate.convertAndSend(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,user);
            User receiveUser = rabbitTemplate.receiveAndConvert("bizQueue", new ParameterizedTypeReference<User>() {});
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    receiveAndReply

    receiveAndReply支持在获取消息时传入一个回调函数ReceiveAndReplyCallback,处理接收到消息和回复消息的业务逻辑。

    receiveAndReply应用于RPC模式Server端,Server收到消息,并回复消息给客户端:
    在这里插入图片描述
    该模式用的比较少,实现起来也比较麻烦,这里就不演示了。

        // 收到消息并回复,R:接收到的消息 S: 返回的消息
    	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
    
    	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback) throws AmqpException;
    
    	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback, String replyExchange, String replyRoutingKey)
    			throws AmqpException;
    
    	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback, String replyExchange,
    			String replyRoutingKey) throws AmqpException;
    
    	<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
    
    	<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    			ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    sendAndReceive

    sendAndReceive也属于RPC模式,发送消息并接收回复消息,属于Client端。

    	@Nullable
    	Message sendAndReceive(Message message) throws AmqpException;
    
    	@Nullable
    	Message sendAndReceive(String routingKey, Message message) throws AmqpException;
    
    	@Nullable
    	Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    convertSendAndReceive

    convertSendAndReceive以及convertSendAndReceiveAsType是对sendAndReceive的扩展,可以直接发送对象消息,并可以设置类型转换器:

    	@Nullable
    	Object convertSendAndReceive(Object message) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String routingKey, Object message) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String exchange, String routingKey, Object message) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String routingKey, Object message, MessagePostProcessor messagePostProcessor)
    			throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String exchange, String routingKey, Object message,
    			MessagePostProcessor messagePostProcessor) throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(Object message, ParameterizedTypeReference<T> responseType)
    			throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(String routingKey, Object message,
    			ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
    			ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,
    			ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(String routingKey, Object message,
    			MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> responseType)
    			throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
    			MessagePostProcessor messagePostProcessor, ParameterizedTypeReference<T> responseType)
    			throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
  • 相关阅读:
    二十三种设计模式
    Java并发编程实战基础概要
    第六章 搭建Vitest前端单元测试环境
    799. 香槟塔 : 简单线性 DP 运用题
    【ContextCapture建模精品教程】三维实景模型生成集群设置(2)——工程文件网络路径设置
    eNSP - BGP 查看命令
    【C++初阶(五)类和对象(上)】
    Centos修改系统时间
    Linux命令查看pcap包报文数量、包体包含内容、包长
    构建系列之webpack窥探上
  • 原文地址:https://blog.csdn.net/qq_43437874/article/details/128057272