• 8.RabbitMQ系列之RPC


    1. RPC

    Remote Procedure Call:远程过程调用,一次远程过程调用的流程即客户端发送一个请求到服务端,服务端根据请求信息进行处理后返回响应信息,客户端收到响应信息后结束

    2. Client interface客户端接口

    为了说明如何使用RPC服务,我们将把“发送方”和“接收方”更改为“客户端”和“服务器”。当我们调用服务时,我们将得到我们对应的斐波那契值

    Integer response = (Integer) template.convertSendAndReceive
        (exchange.getName(), "rpc", 5);
    System.out.println(" [.] Got '" + response + "'");
    
    • 1
    • 2
    • 3
    3. Callback queue回调队列

    一般来说,通过RabbitMQ执行RPC很容易。客户端发送请求消息,服务器进行回复。为了接收响应,我们需要随请求一起发送“回调”队列地址。当我们使用上述convertSendAndReceive()方法时,Spring AMQP的RabbitTemplate为我们处理回调队列。使用RabbitTemplate时,无需执行任何其他设置

    Message properties消息属性

    • deliveryMode:将消息标记为持久(值为2)或瞬态(任何其他值)
    • contentType: 用于描述编码的mime-type。例如,对于经常使用的JSON编码,将此属性设置为:application/JSON是一种很好的做法
    • replyTo:常用于命名回调队列
    • correlationId: 用于将RPC响应与请求关联
    4. Correlation Id关联ID

    Spring AMQP允许您关注正在使用的消息方式,并隐藏支持此方式所需的消息管道的详细信息。例如,本机客户端通常会为每个RPC请求创建一个回调队列。这效率很低,所以另一种方法是为每个客户端创建一个回调队列

    这引发了一个新问题,在该队列中收到响应后,不清楚响应属于哪个请求。此时将使用correlationId属性。Spring AMQP自动为每个请求设置唯一值。此外,它还处理拥有正确correlationID的响应

    Spring AMQP使RPC样式更容易的一个原因是,有时您可能希望忽略回调队列中的未知消息,而不是由于错误而失败。这是由于服务器端可能存在竞争条件。虽然不太可能,但RPC服务器可能会在向我们发送应答之后,但在发送请求的确认消息之前死亡。如果发生这种情况,重新启动的RPC服务器将再次处理该请求。Spring AMQP客户端优雅地处理重复的响应,理想情况下RPC应该是幂等的

    4. 总结

    1

    RPC工作流程如下:

    1. RpcConfig新建一个直连交换器rpc并绑定队列
    2. 客户端调用convertSendAndReceive方法,设置exchange、routing key、message
    3. 请求并发送至队列rpc.requests
    4. 服务端监听队列中的客户端请求,请求一旦进入队列,服务端开始工作并使用队列的replyTo字段进行响应,
    5. 客户端等待回调队列中消息,当消息到达,其核查correlationId字段,如果匹配,则返回响应消息给应用。这在RabbitTemplate自动实现
    5. 完整代码
    @Configuration
    public class RpcConfig {
    
        @Bean
        public DirectExchange rpc() {
            return new DirectExchange("rpc");
        }
    
        private static class ServerConfig {
    
            @Bean
            public Queue rpcQueue() {
                return new Queue("rpc.requests");
            }
    
            @Bean
            public Binding binding(DirectExchange rpc,
                                   Queue rpcQueue) {
                return BindingBuilder.bind(rpcQueue)
                        .to(rpc)
                        .with("rpc");
            }
        }
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    @Component
    public class RpcClient {
    
        private RabbitTemplate rabbitTemplate;
    
        public RpcClient(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    
        public void send() {
            System.out.println(" [客户端] 发送请求10");
            Integer response = (Integer) rabbitTemplate.convertSendAndReceive("rpc", "rpc", 10);
            System.out.println(" [客户端] 收到响应 " + response + "");
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    @Component
    public class RpcServer {
    
        @RabbitListener(queues = "rpc.requests")
        public int fibonacci(int n) {
            System.out.println(" [服务端] 收到请求" + n);
            int result = fib(n);
            System.out.println(" [服务端] 响应请求 " + result);
            return result;
        }
    
        public int fib(int n) {
            return n == 0 ? 0 : n == 1 ? 1 : (fib(n - 1) + fib(n - 2));
        }
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    @SpringBootTest
    public class RabbitTest {
        @Autowired
        private RpcClient rpcClient;
    
        @Test
        public void testRpc() {
            rpcClient.send();
        }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    2
    欢迎关注公众号算法小生或沈健的技术博客shenjian.online

  • 相关阅读:
    负载均衡算法
    关于ETL的两种架构(ETL架构和ELT架构)
    前端实现 查询包含分页 以及封装table表格 上手即用!
    python--转换wrf输出的风场数据为网页可视化的json格式
    vue中使用swiper 已配置loop 不轮播问题
    C语言数组在内存中是怎样表示的?
    2023.9.8 基于传输层协议 UDP 和 TCP 编写网络通信程序
    哈哈Niushop v5值得我们期待吗?新版本这几点我爱了!
    论文《Sequential Recommendation with Graph Neural Networks》阅读
    理解内存,让Android性能没有问题
  • 原文地址:https://blog.csdn.net/SJshenjian/article/details/127338110