目录
虽然Flink已经打算开始转向Netty来实现内部通信,不过截止目前版本(Flink 1.15)Flink内部的心跳、组件通信等依然是基于Akka的模型来实现的。之所以要先来了解一下Akka,就是为了了解一些Akka模型的回调机制,防止在Flink源码里迷路。话不多说,开始来聊正题。
首先Akka的Actor编程模型如下图所示:
这里介绍一下上图中的几个概念:
除此以外,还有一些重要概念要了解:
概念看困了吧,下面来看看编程模型,一个炒鸡简单的Demo:
场景:创建两个Actor并向它们发送消息
首先亮出我们案例使用的依赖:
- <dependency>
- <groupId>com.typesafe.akkagroupId>
- <artifactId>akka-bom_2.12artifactId>
- <version>2.6.19version>
- <type>pomtype>
- <scope>importscope>
- dependency>
- <dependency>
- <groupId>com.typesafe.akkagroupId>
- <artifactId>akka-actor-typed_2.12artifactId>
- <version>2.6.19version>
- dependency>
- <dependency>
- <groupId>com.typesafe.akkagroupId>
- <artifactId>akka-actor-testkit-typed_2.12artifactId>
- <version>2.6.19version>
- <scope>testscope>
- dependency>
- <dependency>
- <groupId>com.typesafe.akkagroupId>
- <artifactId>akka-testkit_2.12artifactId>
- <scope>testscope>
- <version>2.6.19version>
- dependency>
- <dependency>
- <groupId>com.typesafe.akkagroupId>
- <artifactId>akka-remote_2.12artifactId>
- <version>2.6.19version>
- dependency>
先创建两个样例类,来定义消息类型,顾名思义其中SubmitTaskMessage为发送消息的类型,SuccessSubmitTaskMessage为消息发送成功的类型,我们在后续的消息发送环节会使用这两种消息类型进行发送。
- case class SubmitTaskMessage(message: String)
-
- case class SuccessSubmitTaskMessage(message: String)
接下来创建第一个Actor:SenderActor,我们需要继承akka.actor.Actor类。正如上面聊概念时我们所说的,Actor有两个重要的生命周期方法:
- object SenderActorObject extends Actor {
-
- // 当Actor初次被调用时
- override def preStart(): Unit = {
- println("执行SenderActorObject PreStart()方法")
- }
-
- override def receive: Receive = {
- case "start" =>
- val receiveActor = this.context.actorSelection("/user/receiverActor")
- // 向第二个actor发送消息
- receiveActor ! SubmitTaskMessage("请完成#001任务!")
- case SuccessSubmitTaskMessage(msg) =>
- println(s"接收到来自${sender.path}的消息: $msg")
- }
-
- }
然后来创建第二个Actor,在perStart方法里依然只打印一句话,在receive方法里,当接收到第一个Actor发来的SubmitTaskMessage类型消息时,会向第一个Actor发送一个SuccessSubmitTaskMessage类型消息。
这里有两个重点:
- object ReceiverActor extends Actor {
-
- override def preStart(): Unit = {
- println("执行ReceiverActor()方法")
- }
- // 执行receive方法前会先执行preStart方法
- override def receive: Receive = {
- case SubmitTaskMessage(msg) =>
- println(s"接收到来自${sender.path}的消息: $msg")
- // 又向第一个sender发送消息
- sender ! SuccessSubmitTaskMessage("完成提交")
- case _ => println("未匹配的消息类型")
- }
- }
在完成两个Actor 的创建之后,我们来写主类:
在主类中,想要使用Actor,我们需要先创建ActorSystem,在完成actor 的创建之后,我们向senderActor的Ref发送一条消息“start”,正如我们上面所描述的,这个消息会去触发senderActor的receive方法,并会使SenderActor向ReceiverActor去发送一条SubmitTaskMessage类型的消息。
- object SimpleAkkaDemo {
- def main(args: Array[String]): Unit = {
-
- // 创建一个actor系统
- val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
-
- //创建一个actor
- val senderActor: ActorRef = actorSystem.actorOf(Props(SenderActorObject), "senderActor")
-
- //创建一个actor
- val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
-
- // 使用actor的引用向actor发送消息
- senderActor ! "start"
- }
- }
我们来运行一下:
正如我们所设计的,在actor初始化完成后会先去执行perStart方法,然后再收到消息后会去执行receive方法。
下面放上完整代码:
- package akka
-
- import akka.actor.{Actor, ActorRef, ActorSystem, Props}
- import com.typesafe.config.ConfigFactory
-
- object SenderActorObject extends Actor {
-
- // 当Actor初次被调用化时
- override def preStart(): Unit = {
- println("执行SenderActorObject PreStart()方法")
- }
-
- override def receive: Receive = {
-
- case "start" =>
- val receiveActor = this.context.actorSelection("/user/receiverActor")
- // 向第二个actor发送消息
- receiveActor ! SubmitTaskMessage("请完成#001任务!")
- case SuccessSubmitTaskMessage(msg) =>
- println(s"接收到来自${sender.path}的消息: $msg")
- }
-
- }
-
- object ReceiverActor extends Actor {
-
- override def preStart(): Unit = {
- println("执行ReceiverActor()方法")
- }
- // 执行receive方法前会先执行preStart方法
- override def receive: Receive = {
- case SubmitTaskMessage(msg) =>
- println(s"接收到来自${sender.path}的消息: $msg")
- // 又向第一个sender发送消息
- sender ! SuccessSubmitTaskMessage("完成提交")
- case _ => println("未匹配的消息类型")
- }
- }
-
- object SimpleAkkaDemo {
- def main(args: Array[String]): Unit = {
-
- // 创建一个actor系统
- val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
-
- //创建一个actor
- val senderActor: ActorRef = actorSystem.actorOf(Props(SenderActorObject), "senderActor")
-
- //创建一个actor
- val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
-
- // 使用actor的引用向actor发送消息
- senderActor ! "start"
- }
- }
-
- //消息封装样例类
- case class SubmitTaskMessage(message: String)
-
- case class SuccessSubmitTaskMessage(message: String)
-
到此,如果上面这个案例搞明白了的话,FlinkRPC源码中的各种Akka回调就不会迷路了。
Flink并没有使用原生的Akka模型,而是进行了封装,封装后的对象和Akka模型之间的对象关系为:
FlinkRPC组件 | Akka原生 | 意义 |
RpcGateway | ActorRef | 用于远程调用的代理接口。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法。在实现一个提供 RPC 调用的组件时,通常需要先定一个接口, 该接口继承 RpcGateway 并约定好提供的远程调用的方法。 |
RpcServer | 自身的ActorRef | 相当于 RpcEndpoint 自身的的代理对象(self gateway)。RpcServer 是 RpcService 在启动了 RpcEndpoint 之后返回的对象,每一个 RpcEndpoint 对 象内部都有一个 RpcServer 的成员变量,通过 getSelfGateway 方法就可以获得 自身的代理,然后调用该 Endpoint 提供的服务。 |
RpcEndpoint | Actor | 对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需 要继承该抽象类。另外,对于同一个 RpcEndpoint 的所有 RPC 调用都会在同一 个线程(RpcEndpoint 的“主线程”)中执行,因此无需担心并发执行的线程安全 问题。 |
RpcService | ActorSystem | 是 RpcEndpoint 的运行时环境,RpcService 提供了启动 RpcEndpoint , 连接到 远端 RpcEndpoint 并返回远端 RpcEndpoint 的代理对象等方法。此外, RpcService 还提供了某些异步任务或者周期性调度任务的方法。 |
perStart() | onStart() | RpcEndpoint(Actor)对象初始化后调用的生命周期方法,在flink中为onStart方法 |
在Flink中:继承自RPCEndpoint的类有以下四个核心组件
1、TaskExecutor 集群中从节点中最重要的角色,负责资源管理
2、Dispatcher 主节点中的一个工作角色,负责 job 调度执行
3、JobMaster 应用程序中的主控程序,类似于 Spark 中的 Driver 的作用,或者 MapReduce 中的 ApplicationMaster
4、ResourceManager 集群中的主节点 JobManager 中的负责资源管理的角色,和 TaskExecutor 一 起构成资源管理的主从架构
当在任意地方发现要创建这四个组件的任何一个组件的实例对象的时候,创建成功了之后,都会要去执 行他的 onStart() ,因为他们都是 RpcEndpoint 的子类,在集群启动的源码分析中,其实这些组件的很 多的工作流程,都被放在 onStart() 里面。