• Microservices communication


    1、What

    1.1 Introduction to Microservice communication(微服务通信简介)

    In our usual microservice architecture, the service must handle requests from the application client. In addition, services sometimes need to collaborate with each other to handle these requests, so they must use interprocess communication protocols.
    我们常用的微服务架构中,服务必须处理来自应用程序客户端的请求。此外,服务与服务之间有时还需要协作来处理这些请求,因此它们必须使用进程间通信协议。

    • In monolithic applications, calls between modules are made through methods or functions at the programming language level.
      在单体式应用中,各个模块之间的调用是通过编程语言级别的方法或者函数来实现的。
    • But a distributed application based on microservices runs on multiple machines. In general, each service instance is a process.
      但是一个基于微服务的分布式应用是运行在多台机器上的。一般来说,每个服务实例都是一个进程。
    • Microservices-based applications are distributed systems that run on multiple processes or services, often even across multiple servers or hosts. Each service instance is typically a process.
      基于微服务的应用程序是在多个进程或服务上运行的分布式系统,通常甚至跨多个服务器或主机。 每个服务实例通常是一个进程。
    • Therefore, microservices must interact using either an in-process communication protocol (such as HTTP, AMQP) or a binary protocol (such as TCP), depending on the nature of each service.
      因此,微服务必须使用进程内通信协议(如 HTTP、AMQP)或二进制协议(如 TCP)进行交互,具体取决于每个服务的性质。

    1.2 Several modes of communication(几种通信方式

    1.2.1 Remote Procedure Invocation(远程过程调用RPI)

    Use synchronous request/response based IPC for inter-service communication. The client issues a request to the service using a request/reply based protocol.
    使用同步的基于请求/响应的IPC 进行服务间通信。客户端使用基于请求/回复的协议向服务发出请求。

    Common case(常用案例):

    • REST
    • gRPC
    • Apache Thrift

    1.2.2 Messaging(消息传递)

    Inter-service communication using asynchronous messaging. A service that communicates by exchanging messages over a messaging channel.
    使用异步消息传递进行服务间通信。通过消息传递通道交换消息进行通信的服务。

    There are several different styles of asynchronous communication :
    有几种不同风格的异步通信

    • Request/Response - The service sends a request message to the receiver and expects to receive a reply message immediately
      请求/响应 - 服务向接收者发送请求消息并期望立即收到回复消息
    • Notification - The sender sends a message to the recipient without expecting a reply. It was not sent.
      通知 - 发件人向收件人发送消息,但不期待回复。也没有发送。
    • Request/Asynchronous Response - The service sends a request message to the recipient and expects to eventually receive a reply message
      请求/异步响应 - 服务向接收者发送请求消息并期望最终收到回复消息
    • Publish/Subscribe - The service publishes messages to zero or more recipients
      发布/订阅 - 服务向零个或多个收件人发布消息
    • Publish/Respond asynchronously - The service publishes a request to one or more recipients, some of whom send back a reply
      发布/异步响应 - 服务向一个或多个收件人发布请求,其中一些人发回回复

    Common case(常用案例):

    • Apache Kafka
    • RabbitMQ

    1.2.3 Domain-specific protocol(特定与域的协议)

    Use domain-specific protocols for inter-service communication.
    使用特定于域的协议进行服务间通信。

    Common case(常用案例):

    • 电子邮件协议,例如 SMTP 和 IMAP
    • RTMP、HLS、HDS等流媒体协议

    1.2.4 Idempotent Consumer(幂等消费者)

    In enterprise applications, to ensure that the program executes correctly, at least one messaging pass is used to ensure that the message broker delivers the message to the consumer even if the program fails. Consumer consumption messages must be idempotent: processing the same message repeatedly must produce the same result as processing the message once. If the consumer is not idempotent, multiple calls may result in errors. While some consumers are inherently idempotent, others must keep track of the messages they have processed in order to detect and discard duplicate messages.
    在企业应用程序中,为了保证程序执行的正确性,至少用一次消息传递来保证程序即使发生错误,消息代理也会将消息传递给消费者。消费者消费消息必须是幂等的:重复处理同一条消息的结果必须与处理一次消息的结果相同。如果消费者不是幂等的,则多次调用可能会导致错误。有些消费者天生是幂等的,其他消费者必须跟踪他们已处理的消息,以便检测和丢弃重复消息。

    You can make the consumer idempotent by recording in the database the ID of the message it has successfully processed. When processing messages, consumers can detect and discard duplicate messages by querying the database. There are several different places to store message ids. One option is to have consumers use a separate PROCESSED_MESSAGES table. Another option is to let the consumer store the ID in the business entity it creates or updates.
    您可以通过在数据库中记录它已成功处理的消息的 ID 来使消费者具有幂等性。在处理消息时,消费者可以通过查询数据库来检测和丢弃重复消息。有几个不同的地方可以存储消息 ID。一种选择是让消费者使用单独的PROCESSED_MESSAGES表。另一种选择是让消费者将 ID 存储在它创建或更新的业务实体中。

    1.3 What is the RPC(什么是RPC) ?

    Remote Produce Call (RPC) is a technical term. HTTP is a protocol. RPC can be implemented through HTTP or a Socket that implements a set of protocols. So another way of thinking about RPC is why there is an implementation other than HTTP, and why, since proprietary protocols are not universal except for HTTP implementations.
    RPC ( Remote Produce Call ) 是一种技术的概念名词,HTTP是一种协议,RPC可以通过 HTTP 来实现,也可以通过Socket自己实现一套协议来实现.所以可以换一种理解,为何 RPC 还有除 HTTP 之外的实现法,有何必要,毕竟除了HTTP实现外,私有协议不具备通用性.

    2、Why

    2.1 Advantages and disadvantages of remote procedure calls(远程过程调用 的优缺点)

    PROS(优点):

    • Simple and familiar
      简单而熟悉
    • It’s easy to request/reply
      请求/回复很容易
    • Simpler system because there are no intermediate agents
      更简单的系统,因为没有中间代理

    CONS(缺点):

    • Generally, only request/reply is supported, and other interaction modes such as notification, request/asynchronous response, publish/subscribe, publish/asynchronous response are not supported
      通常只支持请求/回复,不支持通知、请求/异步响应、发布/订阅、发布/异步响应等其他交互模式
    • Availability is reduced because the client and service must be available during the interaction
      可用性降低,因为客户端和服务必须在交互期间可用

    2.2 Pros and cons of messaging(消息传递的优缺点)

    PROS(优点):

    • The runtime is loosely coupled because it separates the message sender from the consumer
      运行时耦合松散,因为它将消息发送者与消费者分离
    • Improved availability because message brokers buffer messages until consumers can process them
      提高了可用性,因为消息代理会缓冲消息,直到消费者能够处理它们
    • Supports multiple communication modes, including request/reply, notification, request/asynchronous response, publish/subscribe, publish/asynchronous response, and so on
      支持多种通信模式,包括请求/回复、通知、请求/异步响应、发布/订阅、发布/异步响应等

    CONS(缺点):

    • Additional complexity of the message broker, which must be highly available
    • 消息代理的额外复杂性,必须具有高可用性

    Problems in existence(存在的问题):

    • Request/reply communication is more complex
      请求/回复式的沟通更复杂

    Why use RPC(为什么使用 RPC)?
    HTTP interface is a communication method often used in the early stage of information islanding when there are not many interfaces and less interaction between the system and the system.
    http接口是在接口不多、系统与系统交互较少的情况下,解决信息孤岛初期常使用的一种通信手段;
    The advantage is simple, direct, convenient development.
    优点就是简单、直接、开发方便。

    If you have a large website with many internal subsystems and interfaces, the benefits of RPC framework can be seen:
    如果是一个大型的网站,内部子系统较多、接口非常多的情况下,RPC框架的好处就显示出来了:

    First of all, it is a long link, so it does not have to shake hands 3 times like HTTP for each communication, which reduces the network overhead;
    The second is that RPC framework generally has a registry, rich monitoring management; Publishing, offline interfaces, dynamic extensions, etc., are unaware, unified operations for the caller.
    And finally, security.
    首先就是长链接,不必每次通信都要像http一样去3次握手什么的,减少了网络开销;
    其次就是RPC框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
    最后是安全性。

    RPC can decouple services(RPC能解耦服务)
    RPC: remote procedure call. The core of RPC is not what protocol to use. The purpose of RPC is to let you call a remote method locally, but the call is transparent to you, and you don’t know where the called method is deployed.
    RPC:远程过程调用。RPC的核心并不在于使用什么协议。RPC的目的是让你在本地调用远程的方法,而对你来说这个调用是透明的,你并不知道这个调用的方法是部署哪里。

    Services can be decoupled through RPC, which is the real purpose of using RPC. The principle of RPC mainly uses the dynamic proxy mode, as for HTTP protocol, it is just a transport protocol. Refer to Spring Remoting for simple implementations and Dubbo for more complex implementations.
    通过RPC能解耦服务,这才是使用RPC的真正目的。RPC的原理主要用到了动态代理模式,至于http协议,只是传输协议而已。简单的实现可以参考spring remoting,复杂的实现可以参考dubbo。

    RPC =socket + dynamic proxy
    rpc=socket + 动态代理

    3、How

    3.1 How to choose the appropriate communication method for interaction(如何选择交互合适的通信方式)

    When selecting inter-process Communication (IPC) for a service, consider how the services interact. There are many interaction patterns between client and server, which can be categorized in two dimensions.
    当为某一个服务选择IPC(Inter-Process Communication,进程间通信)时,首先需要考虑服务之间如何交互。客户端和服务器之间有很多的交互模式,我们可以从两个维度进行归类。

    Is the first dimension one-to-one or one-to-many:
    第一个维度是一对一还是一对多:

    • One-to-one: Each client request has a service instance to respond.
      一对一: 每个客户端请求有一个服务实例来响应。
    • One-to-many: Each client request has multiple service instances to respond to
      一对多: 每个客户端请求有多个服务实例来响应

    The second dimension is whether these interactions are synchronous or asynchronous:
    第二个维度是这些交互式同步还是异步:

    • Synchronous mode: Client requests require an immediate response from the server and may even be blocked while waiting.
      同步模式: 客户端请求需要服务端即时响应,甚至可能由于等待而阻塞。
    • Asynchronous mode: Client requests do not block processes, and server responses can be non-immediate.
      异步模式: 客户端请求不会阻塞进程,服务端的响应可以是非即时的。

    3.1.1 There are several ways of one-to-one interaction(一对一的交互模式有以下几种方式)

    • Request/Response: A client sends a request to the server and waits for a response. The client expects this response to arrive immediately. In a thread-based application, the waiting process can cause the thread to block.
      请求/响应: 一个客户端向服务器端发起请求,等待响应。客户端期望此响应即时到达。在一个基于线程的应用中,等待过程可能造成线程阻塞。
    • Notification (also known as a one-way request) : A client request is sent to a server without expecting a response from the server.
      通知(也就是常说的单向请求): 一个客户端请求发送到服务端,但是并不期望服务端响应。
    • Request/Asynchronous response: The client sends a request to the server, and the server responds to the request asynchronously. The client does not block and is designed so that the default response does not arrive immediately.
      请求/异步响应: 客户端发送请求到服务端,服务端异步响应请求。客户端不会阻塞,而且被设计成默认响应不会立刻到达。

    3.1.2 One-to-many interaction patterns can occur in the following ways(一对多的交互模式有以下几种方式)

    • Publish/subscribe mode: The client publishes notification messages that are consumed by zero or more interested services.
      发布/ 订阅模式: 客户端发布通知消息,被零个或者多个感兴趣的服务消费。
    • Publish/asynchronous response mode: The client publishes a request message and then waits for a response from the service of interest.
      发布/异步响应模式: 客户端发布请求消息,然后等待从感兴趣服务发回的响应。

    4、Sample

    training项目中的使用:

    4.1 sync

    我们的一些业务场景需要实时获取结果,强调时效性,服务间的通信可以采用同步通信。项目中通过openfeign这个组件来实现。
    Some of our business scenarios require real-time access to results, emphasizing timeliness, and communication between services can use synchronous communication. The project is implemented through the component openfeign.

    比如我们transaction服务查询交易信息的时候会调用product服务来获取产品的一些详细信息。
    For example, when our transaction service queries transaction information, it calls the product service to get some details of the product.

    @EnableConfigurationProperties({LiquibaseProperties.class, ApplicationProperties.class})
    @ComponentScan(basePackages = "com.pg")
    @EnableFeignClients("com.pg") // 
    @EnableCaching
    @SpringBootApplication
    public class PgTransactionApp {
    
        private static final Logger log = LoggerFactory.getLogger(PgTransactionApp.class);
    
        public static void main(String[] args) throws UnknownHostException {
            SpringApplication app = new SpringApplication(PgTransactionApp.class);
            DefaultProfileUtil.addDefaultProfile(app);
            Environment env = app.run(args).getEnvironment();
        }
    }
    
    @AuthorizedFeignClient(name = "pg-products", url = "${gcspa.feign-client.pg-gcspa-products-uri}",
        contextId = "ProductServiceClient")
    public interface ProductServiceClient {
    
        @Timed
        @RequestMapping(value = "/api/v1/{marketcode}/merchandises/productcode/{productcode}", method = RequestMethod.GET)
        ProductFormProductServiceDto getProductByProductCode(@PathVariable(value = "marketcode") String marketcode,@PathVariable(value = "productcode")  String productcode);
    }
    

    4.2 async

    有些业务场景对时效性要求不是很高,服务间的通讯可以采用异步通信的方式,异步调用可以提高系统的吞吐量,项目中是使用Microsoft Azure Service Bus来实现。
    Some business scenarios do not require high timeliness, and communication between services can use asynchronous communication, asynchronous invocation can improve the system throughput, and the project is using Microsoft Azure Service Bus to achieve this.

    比如transaction服务创建交易的时候,要推送消息到campaign服务,campaign做验证以及后续逻辑处理
    For example, when the transaction service creates a transaction, it pushes a message to the campaign service, which does the validation and subsequent logic processing.

    producer:

    @Service
    public class TransactionToCampaignMessageService {
    
        @Autowired
        private ObjectMapper objectMapper;
    
        @Autowired
        private ISendEvent sendEvent;
    
        @Value("${messaging.topics.***}")
        private String ValidateCampaignTopic;
    
        public void sendCampaignValidateMessage(TransactionDTO transactionDTO)
            throws JsonProcessingException {
            String payload = objectMapper.writeValueAsString(transactionDTO);
            sendEvent.sendEventToEventHub(EventType.ORDER_CREATED, transactionDTO.getId(),
                DomainType.ORDER, payload, ValidateCampaignTopic);
            log.info("=== Send Transaction To Campaign === [{}]", transactionDTO);
        }
    }
    

    consumer:

    @Component
    @ConditionalOnProperty(name = "messaging.status", havingValue = "enabled")
    public class TransactionValidationConsumer extends AsyncServiceBusTopicConsumer {
    
        /**
         * Establishes a connection with the Azure Service Bus and starts consuming messages from the
         * given topic with the given subscription (consumer-group).
         *
         * @param topicName        - topic to consume messages from;
         * @param subscriptionName - name of subscription (consumer-group);
         * @param receiver         - implementation of {@link SparcReceiver} to process messages.
         */
        public TransactionValidationConsumer(
            @Value("${messaging.topics.****}")
            String topicName,
            @Value("${messaging.topics.*****}")
            String subscriptionName,
            TransactionValidationReceiver receiver) {
            super(topicName, subscriptionName, receiver);
        }
    
    }
    
    @Slf4j
    @RequiredArgsConstructor
    @Component
    @ConditionalOnProperty(name = "messaging.status", havingValue = "enabled")
    public class TransactionValidationReceiver implements SparcReceiver {
    
        @Autowired
        private ObjectMapper objectMapper;
        @Autowired
        private CommonCombinationCampaignRule commonCombinationCampaignRule;
        @Autowired
        private ISendEvent sendEvent;
        @Value("${messaging.topics.****}")
        private String transactionValidationReturnTopic;
    
    
        @Override
        public void processMessage(CommonServiceBusMessage message) {
            log.info("=== TransactionValidationReceiver === [{}]", message);
            ContextUtil contextUtil = (ContextUtil) SpringUtil.getBean("contextUtil");
            contextUtil.put(MARKET_CODE,message.getMarketCode());
            try {
                EventDTO eventDTO = objectMapper.readValue(((String) message.getPayload()),
                    EventDTO.class);
                MTransactionValidationDTO dto = objectMapper.readValue(eventDTO.getPayLoad(),
                    MTransactionValidationDTO.class);
                TransactionValidationResultDTO resultDTO = new TransactionValidationResultDTO();
                boolean flag = commonCombinationCampaignRule.evaluateRule(dto);
                resultDTO.setId(dto.getId());
                resultDTO.setValid(flag);
                Optional.of(flag).filter(f->!f).ifPresent(
                    f -> resultDTO.setMessage(ExpressionErrorMsgHandler.getErrorMsgs()));
                String payload = objectMapper.writeValueAsString(resultDTO);
                boolean result = sendEvent.sendEventToEventHub(EventType.CAMPAIGN_VALIDATED,
                    dto.getId(),
                    DomainType.CAMPAIGN, objectMapper.writeValueAsString(resultDTO),
                    transactionValidationReturnTopic);
                log.info("=== TransactionValidationSend === [{}]", payload);
                if (!result) {
                    throw new SystemErrorException(MESSAGE_SEND_ERROR.getStatusCode());
                }
            } catch (JsonProcessingException e) {
                throw new SystemErrorException(MESSAGE_SEND_ERROR.getStatusCode());
            }
        }
    }
    
    
    messaging:
      status: enabled
      service-bus-connection-string: ******
      topics:
        to_produce:
          test1: ${test1:test1111}
        to_consume:
          test_demo: ${test_demo111}
    
  • 相关阅读:
    ROS2自定义接口Python实现机器人移动
    独立站必看攻略
    centos7安装后的基础配置
    PyTorch : 了解Tensor(张量)及其创建方法
    SpringBoot 刷新上下文6--加载并注册BeanDefinition
    R 方差分析 analysis of variance
    spring IOC 为什么能降低耦合
    结构体定义struct和typedef struct的区别(重新整理版)
    四台kvm虚拟机搭建Hadoop HA集群
    服务器使用ssl证书有哪些好处
  • 原文地址:https://blog.csdn.net/qq_34377273/article/details/127093456