In our usual microservice architecture, the service must handle requests from the application client. In addition, services sometimes need to collaborate with each other to handle these requests, so they must use interprocess communication protocols.
我们常用的微服务架构中,服务必须处理来自应用程序客户端的请求。此外,服务与服务之间有时还需要协作来处理这些请求,因此它们必须使用进程间通信协议。
Use synchronous request/response based IPC for inter-service communication. The client issues a request to the service using a request/reply based protocol.
使用同步的基于请求/响应的IPC 进行服务间通信。客户端使用基于请求/回复的协议向服务发出请求。
Common case(常用案例):
Inter-service communication using asynchronous messaging. A service that communicates by exchanging messages over a messaging channel.
使用异步消息传递进行服务间通信。通过消息传递通道交换消息进行通信的服务。
There are several different styles of asynchronous communication :
有几种不同风格的异步通信:
Common case(常用案例):
Use domain-specific protocols for inter-service communication.
使用特定于域的协议进行服务间通信。
Common case(常用案例):
In enterprise applications, to ensure that the program executes correctly, at least one messaging pass is used to ensure that the message broker delivers the message to the consumer even if the program fails. Consumer consumption messages must be idempotent: processing the same message repeatedly must produce the same result as processing the message once. If the consumer is not idempotent, multiple calls may result in errors. While some consumers are inherently idempotent, others must keep track of the messages they have processed in order to detect and discard duplicate messages.
在企业应用程序中,为了保证程序执行的正确性,至少用一次消息传递来保证程序即使发生错误,消息代理也会将消息传递给消费者。消费者消费消息必须是幂等的:重复处理同一条消息的结果必须与处理一次消息的结果相同。如果消费者不是幂等的,则多次调用可能会导致错误。有些消费者天生是幂等的,其他消费者必须跟踪他们已处理的消息,以便检测和丢弃重复消息。
You can make the consumer idempotent by recording in the database the ID of the message it has successfully processed. When processing messages, consumers can detect and discard duplicate messages by querying the database. There are several different places to store message ids. One option is to have consumers use a separate PROCESSED_MESSAGES table. Another option is to let the consumer store the ID in the business entity it creates or updates.
您可以通过在数据库中记录它已成功处理的消息的 ID 来使消费者具有幂等性。在处理消息时,消费者可以通过查询数据库来检测和丢弃重复消息。有几个不同的地方可以存储消息 ID。一种选择是让消费者使用单独的PROCESSED_MESSAGES表。另一种选择是让消费者将 ID 存储在它创建或更新的业务实体中。
Remote Produce Call (RPC) is a technical term. HTTP is a protocol. RPC can be implemented through HTTP or a Socket that implements a set of protocols. So another way of thinking about RPC is why there is an implementation other than HTTP, and why, since proprietary protocols are not universal except for HTTP implementations.
RPC ( Remote Produce Call ) 是一种技术的概念名词,HTTP是一种协议,RPC可以通过 HTTP 来实现,也可以通过Socket自己实现一套协议来实现.所以可以换一种理解,为何 RPC 还有除 HTTP 之外的实现法,有何必要,毕竟除了HTTP实现外,私有协议不具备通用性.
PROS(优点):
CONS(缺点):
PROS(优点):
CONS(缺点):
Problems in existence(存在的问题):
Why use RPC(为什么使用 RPC)?
HTTP interface is a communication method often used in the early stage of information islanding when there are not many interfaces and less interaction between the system and the system.
http接口是在接口不多、系统与系统交互较少的情况下,解决信息孤岛初期常使用的一种通信手段;
The advantage is simple, direct, convenient development.
优点就是简单、直接、开发方便。
If you have a large website with many internal subsystems and interfaces, the benefits of RPC framework can be seen:
如果是一个大型的网站,内部子系统较多、接口非常多的情况下,RPC框架的好处就显示出来了:
First of all, it is a long link, so it does not have to shake hands 3 times like HTTP for each communication, which reduces the network overhead;
The second is that RPC framework generally has a registry, rich monitoring management; Publishing, offline interfaces, dynamic extensions, etc., are unaware, unified operations for the caller.
And finally, security.
首先就是长链接,不必每次通信都要像http一样去3次握手什么的,减少了网络开销;
其次就是RPC框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
最后是安全性。
RPC can decouple services(RPC能解耦服务)
RPC: remote procedure call. The core of RPC is not what protocol to use. The purpose of RPC is to let you call a remote method locally, but the call is transparent to you, and you don’t know where the called method is deployed.
RPC:远程过程调用。RPC的核心并不在于使用什么协议。RPC的目的是让你在本地调用远程的方法,而对你来说这个调用是透明的,你并不知道这个调用的方法是部署哪里。
Services can be decoupled through RPC, which is the real purpose of using RPC. The principle of RPC mainly uses the dynamic proxy mode, as for HTTP protocol, it is just a transport protocol. Refer to Spring Remoting for simple implementations and Dubbo for more complex implementations.
通过RPC能解耦服务,这才是使用RPC的真正目的。RPC的原理主要用到了动态代理模式,至于http协议,只是传输协议而已。简单的实现可以参考spring remoting,复杂的实现可以参考dubbo。
RPC =socket + dynamic proxy
rpc=socket + 动态代理
When selecting inter-process Communication (IPC) for a service, consider how the services interact. There are many interaction patterns between client and server, which can be categorized in two dimensions.
当为某一个服务选择IPC(Inter-Process Communication,进程间通信)时,首先需要考虑服务之间如何交互。客户端和服务器之间有很多的交互模式,我们可以从两个维度进行归类。
Is the first dimension one-to-one or one-to-many:
第一个维度是一对一还是一对多:
The second dimension is whether these interactions are synchronous or asynchronous:
第二个维度是这些交互式同步还是异步:
training项目中的使用:
我们的一些业务场景需要实时获取结果,强调时效性,服务间的通信可以采用同步通信。项目中通过openfeign这个组件来实现。
Some of our business scenarios require real-time access to results, emphasizing timeliness, and communication between services can use synchronous communication. The project is implemented through the component openfeign.
比如我们transaction服务查询交易信息的时候会调用product服务来获取产品的一些详细信息。
For example, when our transaction service queries transaction information, it calls the product service to get some details of the product.
@EnableConfigurationProperties({LiquibaseProperties.class, ApplicationProperties.class})
@ComponentScan(basePackages = "com.pg")
@EnableFeignClients("com.pg") //
@EnableCaching
@SpringBootApplication
public class PgTransactionApp {
private static final Logger log = LoggerFactory.getLogger(PgTransactionApp.class);
public static void main(String[] args) throws UnknownHostException {
SpringApplication app = new SpringApplication(PgTransactionApp.class);
DefaultProfileUtil.addDefaultProfile(app);
Environment env = app.run(args).getEnvironment();
}
}
@AuthorizedFeignClient(name = "pg-products", url = "${gcspa.feign-client.pg-gcspa-products-uri}",
contextId = "ProductServiceClient")
public interface ProductServiceClient {
@Timed
@RequestMapping(value = "/api/v1/{marketcode}/merchandises/productcode/{productcode}", method = RequestMethod.GET)
ProductFormProductServiceDto getProductByProductCode(@PathVariable(value = "marketcode") String marketcode,@PathVariable(value = "productcode") String productcode);
}
有些业务场景对时效性要求不是很高,服务间的通讯可以采用异步通信的方式,异步调用可以提高系统的吞吐量,项目中是使用Microsoft Azure Service Bus来实现。
Some business scenarios do not require high timeliness, and communication between services can use asynchronous communication, asynchronous invocation can improve the system throughput, and the project is using Microsoft Azure Service Bus to achieve this.
比如transaction服务创建交易的时候,要推送消息到campaign服务,campaign做验证以及后续逻辑处理
For example, when the transaction service creates a transaction, it pushes a message to the campaign service, which does the validation and subsequent logic processing.
producer:
@Service
public class TransactionToCampaignMessageService {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private ISendEvent sendEvent;
@Value("${messaging.topics.***}")
private String ValidateCampaignTopic;
public void sendCampaignValidateMessage(TransactionDTO transactionDTO)
throws JsonProcessingException {
String payload = objectMapper.writeValueAsString(transactionDTO);
sendEvent.sendEventToEventHub(EventType.ORDER_CREATED, transactionDTO.getId(),
DomainType.ORDER, payload, ValidateCampaignTopic);
log.info("=== Send Transaction To Campaign === [{}]", transactionDTO);
}
}
consumer:
@Component
@ConditionalOnProperty(name = "messaging.status", havingValue = "enabled")
public class TransactionValidationConsumer extends AsyncServiceBusTopicConsumer {
/**
* Establishes a connection with the Azure Service Bus and starts consuming messages from the
* given topic with the given subscription (consumer-group).
*
* @param topicName - topic to consume messages from;
* @param subscriptionName - name of subscription (consumer-group);
* @param receiver - implementation of {@link SparcReceiver} to process messages.
*/
public TransactionValidationConsumer(
@Value("${messaging.topics.****}")
String topicName,
@Value("${messaging.topics.*****}")
String subscriptionName,
TransactionValidationReceiver receiver) {
super(topicName, subscriptionName, receiver);
}
}
@Slf4j
@RequiredArgsConstructor
@Component
@ConditionalOnProperty(name = "messaging.status", havingValue = "enabled")
public class TransactionValidationReceiver implements SparcReceiver {
@Autowired
private ObjectMapper objectMapper;
@Autowired
private CommonCombinationCampaignRule commonCombinationCampaignRule;
@Autowired
private ISendEvent sendEvent;
@Value("${messaging.topics.****}")
private String transactionValidationReturnTopic;
@Override
public void processMessage(CommonServiceBusMessage message) {
log.info("=== TransactionValidationReceiver === [{}]", message);
ContextUtil contextUtil = (ContextUtil) SpringUtil.getBean("contextUtil");
contextUtil.put(MARKET_CODE,message.getMarketCode());
try {
EventDTO eventDTO = objectMapper.readValue(((String) message.getPayload()),
EventDTO.class);
MTransactionValidationDTO dto = objectMapper.readValue(eventDTO.getPayLoad(),
MTransactionValidationDTO.class);
TransactionValidationResultDTO resultDTO = new TransactionValidationResultDTO();
boolean flag = commonCombinationCampaignRule.evaluateRule(dto);
resultDTO.setId(dto.getId());
resultDTO.setValid(flag);
Optional.of(flag).filter(f->!f).ifPresent(
f -> resultDTO.setMessage(ExpressionErrorMsgHandler.getErrorMsgs()));
String payload = objectMapper.writeValueAsString(resultDTO);
boolean result = sendEvent.sendEventToEventHub(EventType.CAMPAIGN_VALIDATED,
dto.getId(),
DomainType.CAMPAIGN, objectMapper.writeValueAsString(resultDTO),
transactionValidationReturnTopic);
log.info("=== TransactionValidationSend === [{}]", payload);
if (!result) {
throw new SystemErrorException(MESSAGE_SEND_ERROR.getStatusCode());
}
} catch (JsonProcessingException e) {
throw new SystemErrorException(MESSAGE_SEND_ERROR.getStatusCode());
}
}
}
messaging:
status: enabled
service-bus-connection-string: ******
topics:
to_produce:
test1: ${test1:test1111}
to_consume:
test_demo: ${test_demo111}