• 一、Flink 1.13 源码解析前导——Akka通信模型


    点击这里查看 Flink 1.13 源码解析 目录汇总

    目录

    前言

    一、Akka通信模型的概念

    二、Akka编程模型

    三、与FlinkRpc的联系


    前言

            虽然Flink已经打算开始转向Netty来实现内部通信,不过截止目前版本(Flink 1.15)Flink内部的心跳、组件通信等依然是基于Akka的模型来实现的。之所以要先来了解一下Akka,就是为了了解一些Akka模型的回调机制,防止在Flink源码里迷路。话不多说,开始来聊正题。

    一、Akka通信模型的概念

            首先Akka的Actor编程模型如下图所示:

            这里介绍一下上图中的几个概念:

    1. Actor:负责进行通信的组件
    2. ActorSystem:是管理Actor生命周期的组件
    3. MailBox:Actor 发送给它的消息都首先储存在 MailBox 中,通过这种方式可以实现异步通信。  每个 Actor 是单线程的处理方式,不断的从 MailBox 拉取消息执行处理,所以对于 Actor 的消息处理,不适合调用会阻塞的处理方法。 

            除此以外,还有一些重要概念要了解:

    1. Actor具有两个重要的生命周期方法:perStart和receive,在Actor被创建时会调用perStart方法,在Actor收到消息时会调用receive方法  
    2. Actor 可以改变他自身的状态,可以接收消息,也可以发送消息,还可以生成新的 Actor  
    3. 每一个 ActorSystem 和 Actor都在启动的时候会给定一个 name,如果要从 ActorSystem 中,获取一个 Actor,则通过以下的方式来进行 Actor 的 获取:akka.tcp://actorsystem_name@bigdata02:9527/user/actor_name 来进行定位  
    4. 如果一个 Actor 要和另外一个 Actor 进行通信,则必须先获取对方 Actor 的 ActorRef 对象,然后通过该对象发送消息即可。  
    5. 通过 tell 发送异步消息,不接收响应,通过 ask 发送异步消息,得到 Future 返回,通过异步回到返回处理结果。  

    二、Akka编程模型

            概念看困了吧,下面来看看编程模型,一个炒鸡简单的Demo:
            场景:创建两个Actor并向它们发送消息
            首先亮出我们案例使用的依赖:

    1. <dependency>
    2. <groupId>com.typesafe.akkagroupId>
    3. <artifactId>akka-bom_2.12artifactId>
    4. <version>2.6.19version>
    5. <type>pomtype>
    6. <scope>importscope>
    7. dependency>
    8. <dependency>
    9. <groupId>com.typesafe.akkagroupId>
    10. <artifactId>akka-actor-typed_2.12artifactId>
    11. <version>2.6.19version>
    12. dependency>
    13. <dependency>
    14. <groupId>com.typesafe.akkagroupId>
    15. <artifactId>akka-actor-testkit-typed_2.12artifactId>
    16. <version>2.6.19version>
    17. <scope>testscope>
    18. dependency>
    19. <dependency>
    20. <groupId>com.typesafe.akkagroupId>
    21. <artifactId>akka-testkit_2.12artifactId>
    22. <scope>testscope>
    23. <version>2.6.19version>
    24. dependency>
    25. <dependency>
    26. <groupId>com.typesafe.akkagroupId>
    27. <artifactId>akka-remote_2.12artifactId>
    28. <version>2.6.19version>
    29. dependency>

            先创建两个样例类,来定义消息类型,顾名思义其中SubmitTaskMessage为发送消息的类型,SuccessSubmitTaskMessage为消息发送成功的类型,我们在后续的消息发送环节会使用这两种消息类型进行发送。

    1. case class SubmitTaskMessage(message: String)
    2. case class SuccessSubmitTaskMessage(message: String)

            接下来创建第一个Actor:SenderActor,我们需要继承akka.actor.Actor类。正如上面聊概念时我们所说的,Actor有两个重要的生命周期方法:

    • perStart方法:在下面这段代码里,当Actor创建完成会调用 perStart方法,在perStart方法里我们打印一句话。
    • receive方法:当Actor接收到消息时,会调用receive方法,在receive方法内部,当SenderActor接收到“start”消息后,将向另一个Actor发送一个SubmitTaskMessage消息类型。
    1. object SenderActorObject extends Actor {
    2. // 当Actor初次被调用时
    3. override def preStart(): Unit = {
    4. println("执行SenderActorObject PreStart()方法")
    5. }
    6. override def receive: Receive = {
    7. case "start" =>
    8. val receiveActor = this.context.actorSelection("/user/receiverActor")
    9. // 向第二个actor发送消息
    10. receiveActor ! SubmitTaskMessage("请完成#001任务!")
    11. case SuccessSubmitTaskMessage(msg) =>
    12. println(s"接收到来自${sender.path}的消息: $msg")
    13. }
    14. }

            然后来创建第二个Actor,在perStart方法里依然只打印一句话,在receive方法里,当接收到第一个Actor发来的SubmitTaskMessage类型消息时,会向第一个Actor发送一个SuccessSubmitTaskMessage类型消息。
            这里有两个重点:

    1. receive方法回复消息时会回复给触发当前receive方法的Actor,换句话说A向B发送了消息,触发了B的receive方法,那么B的receive方法里的回复消息会直接回复给A
    2.  “!”是akka定义的一个发送消息的方法,这是一个方法!
    1. object ReceiverActor extends Actor {
    2. override def preStart(): Unit = {
    3. println("执行ReceiverActor()方法")
    4. }
    5. // 执行receive方法前会先执行preStart方法
    6. override def receive: Receive = {
    7. case SubmitTaskMessage(msg) =>
    8. println(s"接收到来自${sender.path}的消息: $msg")
    9. // 又向第一个sender发送消息
    10. sender ! SuccessSubmitTaskMessage("完成提交")
    11. case _ => println("未匹配的消息类型")
    12. }
    13. }

            在完成两个Actor 的创建之后,我们来写主类:
            在主类中,想要使用Actor,我们需要先创建ActorSystem,在完成actor 的创建之后,我们向senderActor的Ref发送一条消息“start”,正如我们上面所描述的,这个消息会去触发senderActor的receive方法,并会使SenderActor向ReceiverActor去发送一条SubmitTaskMessage类型的消息。

    1. object SimpleAkkaDemo {
    2. def main(args: Array[String]): Unit = {
    3. // 创建一个actor系统
    4. val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
    5. //创建一个actor
    6. val senderActor: ActorRef = actorSystem.actorOf(Props(SenderActorObject), "senderActor")
    7. //创建一个actor
    8. val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
    9. // 使用actor的引用向actor发送消息
    10. senderActor ! "start"
    11. }
    12. }

            我们来运行一下:

            正如我们所设计的,在actor初始化完成后会先去执行perStart方法,然后再收到消息后会去执行receive方法。
            下面放上完整代码: 

    1. package akka
    2. import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    3. import com.typesafe.config.ConfigFactory
    4. object SenderActorObject extends Actor {
    5. // 当Actor初次被调用化时
    6. override def preStart(): Unit = {
    7. println("执行SenderActorObject PreStart()方法")
    8. }
    9. override def receive: Receive = {
    10. case "start" =>
    11. val receiveActor = this.context.actorSelection("/user/receiverActor")
    12. // 向第二个actor发送消息
    13. receiveActor ! SubmitTaskMessage("请完成#001任务!")
    14. case SuccessSubmitTaskMessage(msg) =>
    15. println(s"接收到来自${sender.path}的消息: $msg")
    16. }
    17. }
    18. object ReceiverActor extends Actor {
    19. override def preStart(): Unit = {
    20. println("执行ReceiverActor()方法")
    21. }
    22. // 执行receive方法前会先执行preStart方法
    23. override def receive: Receive = {
    24. case SubmitTaskMessage(msg) =>
    25. println(s"接收到来自${sender.path}的消息: $msg")
    26. // 又向第一个sender发送消息
    27. sender ! SuccessSubmitTaskMessage("完成提交")
    28. case _ => println("未匹配的消息类型")
    29. }
    30. }
    31. object SimpleAkkaDemo {
    32. def main(args: Array[String]): Unit = {
    33. // 创建一个actor系统
    34. val actorSystem = ActorSystem("SimpleAkkaDemo", ConfigFactory.load())
    35. //创建一个actor
    36. val senderActor: ActorRef = actorSystem.actorOf(Props(SenderActorObject), "senderActor")
    37. //创建一个actor
    38. val receiverActor: ActorRef = actorSystem.actorOf(Props(ReceiverActor), "receiverActor")
    39. // 使用actor的引用向actor发送消息
    40. senderActor ! "start"
    41. }
    42. }
    43. //消息封装样例类
    44. case class SubmitTaskMessage(message: String)
    45. case class SuccessSubmitTaskMessage(message: String)

    到此,如果上面这个案例搞明白了的话,FlinkRPC源码中的各种Akka回调就不会迷路了。

    三、与FlinkRpc的联系

    Flink并没有使用原生的Akka模型,而是进行了封装,封装后的对象和Akka模型之间的对象关系为:

    FlinkRPC组件Akka原生意义
    RpcGatewayActorRef用于远程调用的代理接口。 RpcGateway 提供了获取其所代理的 RpcEndpoint 的地址的方法。在实现一个提供 RPC 调用的组件时,通常需要先定一个接口, 该接口继承 RpcGateway 并约定好提供的远程调用的方法。
    RpcServer自身的ActorRef相当于 RpcEndpoint 自身的的代理对象(self gateway)。RpcServer 是 RpcService 在启动了 RpcEndpoint 之后返回的对象,每一个 RpcEndpoint 对 象内部都有一个 RpcServer 的成员变量,通过 getSelfGateway 方法就可以获得 自身的代理,然后调用该 Endpoint 提供的服务。
    RpcEndpointActor对 RPC 框架中提供具体服务的实体的抽象,所有提供远程调用方法的组件都需 要继承该抽象类。另外,对于同一个 RpcEndpoint 的所有 RPC 调用都会在同一 个线程(RpcEndpoint 的“主线程”)中执行,因此无需担心并发执行的线程安全 问题。
    RpcServiceActorSystem是 RpcEndpoint 的运行时环境,RpcService 提供了启动 RpcEndpoint , 连接到 远端 RpcEndpoint 并返回远端 RpcEndpoint 的代理对象等方法。此外, RpcService 还提供了某些异步任务或者周期性调度任务的方法。
    perStart()onStart()RpcEndpoint(Actor)对象初始化后调用的生命周期方法,在flink中为onStart方法

    在Flink中:继承自RPCEndpoint的类有以下四个核心组件

    1、TaskExecutor 集群中从节点中最重要的角色,负责资源管理

    2、Dispatcher 主节点中的一个工作角色,负责 job 调度执行

    3、JobMaster 应用程序中的主控程序,类似于 Spark 中的 Driver 的作用,或者 MapReduce 中的 ApplicationMaster

    4、ResourceManager 集群中的主节点 JobManager 中的负责资源管理的角色,和 TaskExecutor 一 起构成资源管理的主从架构

    当在任意地方发现要创建这四个组件的任何一个组件的实例对象的时候,创建成功了之后,都会要去执 行他的 onStart() ,因为他们都是 RpcEndpoint 的子类,在集群启动的源码分析中,其实这些组件的很 多的工作流程,都被放在 onStart() 里面。

  • 相关阅读:
    html 笔记:CSS
    想要精通算法和SQL的成长之路 - 受限条件下可到达节点的数目
    小柏实战学习Liunx(图文教程二十二)
    浅谈一下Java锁机制
    Trino Worker 规避 OOM 思路
    Ollama--本地大语言模型LLM运行专家
    数据库DQL语句归纳及练习
    【mindspore1.5.0】安装mindspore报libcuda.so没找到和libcudnn没找到
    判定转状态+序列问题上树形dp:0909T2
    Android 13.0 屏蔽Launcher3桌面app图标的长按功能
  • 原文地址:https://blog.csdn.net/EdwardWong_/article/details/126517741