• RabbitMQ系列【17】RabbitOperations接口详解


    有道无术,术尚可求,有术无道,止于术。

    前言

    在上一篇,我们介绍了RabbitTemplate 实现AmqpTemplate接口的所有方法,接下来学习下其实现的另外一个接口RabbitOperations

    AmqpTemplate是对AMQP协议的支持,完成了基本的发送、接收消息,而RabbitOperations是对RabbitMQ的直接集成,提供了更细致的操作。

    send

    send 方法,主要是添加了一个CorrelationData参数。

    CorrelationData用于发布确认、退回模式时进行数据封装,该对象会返回ACK以及原因,开启了退回模式时,还会返回退回信息。

    public class CorrelationData implements Correlation {
    	// 异步执行的结果,Confirm表示返回结果的类型
    	private final SettableListenableFuture<Confirm> future = new SettableListenableFuture();
    	// 唯一ID,如果未提供id将自动设置为唯一值。
        private volatile String id;
        // 退回时返回信息
        private volatile ReturnedMessage returnedMessage;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    	// 发送消息,传递CorrelationData 对象
    	default void send(String routingKey, Message message, CorrelationData correlationData)
    			throws AmqpException {
    
    		throw new UnsupportedOperationException("This implementation does not support this method");
    	}
    	
    	// 指定交换机、路由、传递CorrelationData 对象
    	void send(String exchange, String routingKey, Message message, CorrelationData correlationData)
    			throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    convertAndSend

    convertAndSendsend方法的基础上,可以直接发送JAVA 对象,并可以添加一个MessagePostProcessor 消息处理器。

    	// 使用自定义路由KEY。发送消息到默认交换机,并携带CorrelationData 
    	void convertAndSend(String routingKey, Object message, CorrelationData correlationData) throws AmqpException;
    
    	void convertAndSend(String exchange, String routingKey, Object message, CorrelationData correlationData)
    			throws AmqpException;
    
    	void convertAndSend(Object message, MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
    			throws AmqpException;
    
    	void convertAndSend(String routingKey, Object message, MessagePostProcessor messagePostProcessor,
    			CorrelationData correlationData) throws AmqpException;
    			
    	void convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor,
    			CorrelationData correlationData) throws AmqpException;
    	
    	void correlationConvertAndSend(Object message, CorrelationData correlationData) throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    convertSendAndReceive

    convertSendAndReceiveAmqpTemplate接口中发送并接收消息一样,是RPC模式,区别是多了个CorrelationData 参数。

    	@Nullable
    	Object convertSendAndReceive(Object message, CorrelationData correlationData) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String routingKey, Object message, CorrelationData correlationData)
    			throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String exchange, String routingKey, Object message,
    			CorrelationData correlationData) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(Object message, MessagePostProcessor messagePostProcessor,
    			CorrelationData correlationData) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String routingKey, Object message,
    			MessagePostProcessor messagePostProcessor, CorrelationData correlationData) throws AmqpException;
    
    	@Nullable
    	Object convertSendAndReceive(String exchange, String routingKey, Object message,
    			MessagePostProcessor messagePostProcessor, CorrelationData correlationData)
    			throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(Object message, CorrelationData correlationData,
    			ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(String routingKey, Object message, CorrelationData correlationData,
    			ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    	@Nullable
    	default <T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
    			@Nullable CorrelationData correlationData, ParameterizedTypeReference<T> responseType)
    					throws AmqpException {
    
    		return convertSendAndReceiveAsType(exchange, routingKey, message, null, correlationData, responseType);
    	}
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(Object message, MessagePostProcessor messagePostProcessor,
    			CorrelationData correlationData, ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(String routingKey, Object message,
    			MessagePostProcessor messagePostProcessor, CorrelationData correlationData,
    			ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    	@Nullable
    	<T> T convertSendAndReceiveAsType(String exchange, String routingKey, Object message,
    			@Nullable MessagePostProcessor messagePostProcessor,
    			@Nullable CorrelationData correlationData,
    			ParameterizedTypeReference<T> responseType) throws AmqpException;
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55

    execute

    execute方法可以获取原生Channel执行操作,需要一个ChannelCallback参数。

    	@Nullable
    	<T> T execute(ChannelCallback<T> action) throws AmqpException;
    
    • 1
    • 2

    ChannelCallback是一个函数式接口,使用该接口,可以获取RabbitMQChannel,执行任意操作,并返回结果。

    @FunctionalInterface
    public interface ChannelCallback<T> {
    	/**
    	 * @param channel 通道
    	 * @return 返回结果
    	 */
        @Nullable
        T doInRabbit(Channel var1) throws Exception;
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    示例:

            ChannelCallback<Boolean> stringChannelCallback = new ChannelCallback<Boolean>() {
                @Override
                public Boolean doInRabbit(Channel channel) throws Exception {
                    // 调用Channel 发送消息
                    channel.basicPublish(MqBizConfig.BIZ_EXCHANGE,MqBizConfig.BIZ_ROUTE_KEY,null,"消息".getBytes());
                    System.out.println("doInRabbit");
                    return true;
                }
            };
            Boolean execute = rabbitTemplate.execute(stringChannelCallback);
            System.out.println("结果:"+execute);
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    invoke

    invoke方法需要一个OperationsCallback参数,在该对象的doInRabbit()方法中,任何操作都使用相同的专用通道,该通道将在结束时关闭(不会返回到缓存)。这种使用方式就叫做范围内操作。

    	@Nullable
    	default <T> T invoke(OperationsCallback<T> action) throws AmqpException {
    		ret
    
    	@Nullable
    	<T> T invoke(OperationsCallback<T> action, @Nullable com.rabbitmq.client.ConfirmCallback acks,
    			@Nullable com.rabbitmq.client.ConfirmCallback nacks);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    OperationsCallback操作回调,可以获取RabbitOperations 执行操作,并返回结果。

    	@FunctionalInterface
    	interface OperationsCallback<T> {
    		/**
    		 * @param operations RabbitOperations.
    		 * @return 结果.
    		 */
    		@Nullable
    		T doInRabbit(RabbitOperations operations);
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    waitForConfirms

    waitForConfirmswaitForConfirmsOrDie都是等待确认,但是必须在invoke方法中使用,

    	// 等待确认 
    	boolean waitForConfirms(long timeout) throws AmqpException;
    
    	// 等待确认,异常后信道被关闭,生产者发布不能继续发布消息
    	void waitForConfirmsOrDie(long timeout) throws AmqpException;
    
    • 1
    • 2
    • 3
    • 4
    • 5

    getConnectionFactory

    返回此操作的连接工厂。

    	ConnectionFactory getConnectionFactory();
    
    • 1

    Other

    startstop没有实现的方法,只是为了向后兼容。

    	@Override
    	default void start() {
    		// No-op - implemented for backward compatibility
    	}
    
    	@Override
    	default void stop() {
    		// No-op - implemented for backward compatibility
    	}
    
    	@Override
    	default boolean isRunning() {
    		return false;
    	}
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
  • 相关阅读:
    归一化原来这么重要!深入浅出详解Transformer中的Normalization
    CSDN客诉周报第11期|修复6个重大bug,解决32个次要bug
    webpack解析ol依赖的时候报错
    代码随想录算法训练营第四十三天| LeetCode1049. 最后一块石头的重量 II、LeetCode494. 目标和、LeetCode474. 一和零
    python图片分享平台毕业设计开题报告
    Artplayer视频JSON解析播放器源码|支持弹幕|json数据模式
    AI:75-基于生成对抗网络的虚拟现实场景增强
    Python 爬虫之scrapy 库
    如何在微服务中设计用户权限策略?
    五、Docker客户端和守护进程
  • 原文地址:https://blog.csdn.net/qq_43437874/article/details/128081590