• Actor model 的理解与 protoactor-go 的分析


    Overview

    Definition

    From wikipedia

    The actor model in computer science is a mathematical model of concurrent computation that treats actor as the universal primitive of concurrent computation. In response to a message it receives, an actor can: make local decisions, create more actors, send more messages, and determine how to respond to the next message received.

    将 actor 是并发计算中的一个基本原语,actor 接收消息并对收到的消息做出响应。

    下面说说我个人对 actor 的理解吧,不一定对,有不对的地方欢迎指正!

    为什么 Actor model 可以解决并发问题?

    首先想想并发问题是如何产生的,对同一个数据进行同时写(或者同时读写)会产生并发问题。所以主需要破坏“同时”这个条件,并发问题就迎刃而解了。

    常用的互斥锁就是这是这种思路,它保证同一时刻只有一个线程可以访问到数据,在锁的保护下,多个线程对数据的访问变成有序的了。

    senders 给 actor 发送消息时,actor 将消息放在了它的邮箱 (Mail) 中,并从邮箱中一条条取出消息来处理。同一时刻,不会有两条消息被处理,这次消息的处理顺序是有序的,因此自然不会有并发问题。

    Actor model 与 CSP

    两者的概念极为相似,都符合生产者消费者模型。

    在 CSP 里面,Producer 往 channel 里投递消息,comsumer 从 channel 里取出消息来处理。

    在 Actor model 中,sender 往 actor 的 mail 里投递信息,actor 从 mail 里取出消息来处理。

    但是两者的关注点不同,CSP 里面生产者关注的是消息队列本身,生产者只负责往消息队列里面投递消息,至于这些消息被谁消费(甚至至可能被多个消费者一起消费),它不 care。

    但是 Actor model 中,生产者关注的是特定的消费者,生产者往特定的消费者投递消息,消息只能被之前所指定的消费者消费。

    message queue 的缺点

    利用 message queue 可以有效的解决并发问题,但是它也有一个很明显的缺点,那就是调用方没法及时得到处理的结果。

    举个具体的例子:客户端发起一个请求,后端收到请求后,生成了一个对应的消息并放到了消息队列上,有一个消费者不停地从消息队列中取出消息并进行有序消费。但是消息者处理完消息后是无法将处理的结果告诉生产者的。

    这个问题一般有两种解决方法:

    1. 生产者不停对消费者进行轮询,询问消息的结果。
    2. 消费者消费完消息后,通知生产者。

    这两种方式都会增加系统的复杂度。

    由于 Actor model 在我看来也是基于消费队列的,所以我很好奇它是如何做到将消息的处理结果实时地告诉 senders 的?

    所以找到了 protoactor-go 这个库来学习学习

    protoactor-go 的分析

    在 _example 目录下面有很多关于 protoactor-go 的例子。我们明确目标,直接找到 requestreponse 这个目录,这里面的例子将是就是往 actor 对象发送一个消息,并得到消息的处理结果。

    首先说一下这个库里面的一些抽象

    • Actor: 一个处理消息的对象

      type Actor interface {
      	Receive(c Context)
      }
      
    • ActorSystem: 多个 Actor 对象在一个 ActorSystem 内,ActorSystem 与 Actor 是一对多的关系

    • PID: 用来标识一个 Actor 对象

      type PID struct {
      	Address string `protobuf:"bytes,1,opt,name=Address,proto3" json:"Address,omitempty"`
      	Id      string `protobuf:"bytes,2,opt,name=Id,proto3" json:"Id,omitempty"`
      
      	p *Process
      }
      

    requestreponse/main.go 里面的代码很简单

    type Hello struct{ Who string }
    
    func Receive(context actor.Context) {
    	switch msg := context.Message().(type) {
    	case Hello:
    		context.Respond("Hello " + msg.Who)
    	}
    }
    
    func main() {
    	system := actor.NewActorSystem()
    	rootContext := system.Root
    
    	props := actor.PropsFromFunc(Receive)
    	pid := rootContext.Spawn(props)
    
    	future := rootContext.RequestFuture(pid, Hello{Who: "Roger"}, -1)
    	result, _ := future.Result() // await result
      
    	fmt.Println(result)
    	_, _ = console.ReadLine()
    }
    
    • line 11 - 12

      	system := actor.NewActorSystem()
      	rootContext := system.Root
      

      新建一个 actor system

    • line 14 - 15

      	props := actor.PropsFromFunc(Receive)
      	pid := rootContext.Spawn(props)
      

      在 actor system 内构造了一个 actor,用 PID 来标识这个 actor,该 actor 收到消息后的处理逻辑是 Receive()

    • line 17 - 18

      	future := rootContext.RequestFuture(pid, Hello{Who: "Roger"}, -1)
      	result, _ := future.Result() // await result
      

      对之前新建的 actor 发送一个消息,得到一个 future 对象,并利用 future.Result() 方法取到结果。

    调用 future.Result() 是消息压根还没有被处理完咋办?

    先看看 Result() 方法的实现

    // Result waits for the future to resolve
    func (f *Future) Result() (interface{}, error) {
    	f.wait()
    	return f.result, f.err
    }
    
    func (f *Future) wait() {
    	f.cond.L.Lock()
    	for !f.done {
    		f.cond.Wait()
    	}
    	f.cond.L.Unlock()
    }
    

    可以看到它里面用到 sync.Cond,知道 f.done 变成 true,它才会返回。而只有消息处理完毕后,f.done 才会变成 true。

    所以 future.Result() 将会一直阻塞直到消息被处理完。

    actor 怎样将消息的处理结果返回到对应的 sender?

    其实 actor 并不会把直接把结果送回给 sender,而是将结果保存在了 Future 对象内,sender 通过调用 Future.Result() 方法取到消息的处理结果。

    自 sender 发送消息起,消息时如何流转的?
    RequestFuture()

    首先进入到 RequestFuture 方法内

    // RequestFuture sends a message to a given PID and returns a Future
    func (rc *RootContext) RequestFuture(pid *PID, message interface{}, timeout time.Duration) *Future {
    	future := NewFuture(rc.actorSystem, timeout)
    	env := &MessageEnvelope{
    		Header:  nil,
    		Message: message,
    		Sender:  future.PID(),
    	}
    	rc.sendUserMessage(pid, env)
    	return future
    }
    
    • line 3

      构建了一个 future 对象 (先不用管 timeout)

    • line 4 - 8

      构建了一个 MessageEnvelope (信封)对象,注意这里信封的 sender (发件人) 是 future.PID,也就是刚刚构建的 future 对象。

      由于 MessageEnvelope 里面记录了 sender 信息,所以 actor 在处理完消息后自然可以将结果发送给 sender

    • line 9

      将信封对象发送给了目标 actor

    • line 10

      返回 future 对象,调用者调用 future.Result() 将会陷入阻塞

    func (rc *RootContext) sendUserMessage(pid *PID, message interface{}) {
    	if rc.senderMiddleware != nil {
    		// Request based middleware
    		rc.senderMiddleware(rc, pid, WrapEnvelope(message))
    	} else {
    		// tell based middleware
    		pid.sendUserMessage(rc.actorSystem, message)
    	}
    }
    

    由于我们并没有为 RootContext 设置 senderMiddleware。所以将会直接调用

    		pid.sendUserMessage(rc.actorSystem, message)
    

    其中 pid 是我们的 target actor。

    // sendUserMessage sends a messages asynchronously to the PID
    func (pid *PID) sendUserMessage(actorSystem *ActorSystem, message interface{}) {
    	pid.ref(actorSystem).SendUserMessage(pid, message)
    }
    

    pid.ref() 将会从 ActorSystem 里面取出 pid 所对应的 Actor Process

    func (pid *PID) ref(actorSystem *ActorSystem) Process {
      // ...
      // 这里面用了两个原子操作,还蛮巧妙的
    }
    

    Process 是一个接口

    // A Process is an interface that defines the base contract for interaction of actors
    type Process interface {
    	SendUserMessage(pid *PID, message interface{})
    	SendSystemMessage(pid *PID, message interface{})
    	Stop(pid *PID)
    }
    

    ActorProcess 和 futureProcess 都实现了 Process 这个接口,在此处,将会进入 ActorProcess 的实现

    func (ref *ActorProcess) SendUserMessage(pid *PID, message interface{}) {
    	ref.mailbox.PostUserMessage(message)
    }
    
    func (m *defaultMailbox) PostUserMessage(message interface{}) {
    	for _, ms := range m.mailboxStats {
    		ms.MessagePosted(message)
    	}
    	m.userMailbox.Push(message)
    	atomic.AddInt32(&m.userMessages, 1)
    	m.schedule()
    }
    
    • line 2 - 4

      先忽略

    • line 5 - 6

      往 userMailbox 内推入一条消息,并将 userMessages + 1

    • line 7

      调用 schedule()

    func (m *defaultMailbox) schedule() {
    	if atomic.CompareAndSwapInt32(&m.schedulerStatus, idle, running) {
    		m.dispatcher.Schedule(m.processMessages)
    	}
    }
    
    func (goroutineDispatcher) Schedule(fn func()) {
    	go fn()
    }
    

    这里进行了一个 CAS 操作,首先判断是否是空闲状态,如果是 idle (空闲状态) 的话,那就调用 Schedule() 方法起一个协程去处理消息。

    协程中执行的就是 processMessages() 方法了

    func (m *defaultMailbox) processMessages() {
    process:
    	m.run()
    
    	// ...
    }
    
    func (m *defaultMailbox) run() {
    	var msg interface{}
    
    	// ...
    	for {
    		if msg = m.userMailbox.Pop(); msg != nil {
    			atomic.AddInt32(&m.userMessages, -1)
    			m.invoker.InvokeUserMessage(msg)
    			// ...
    		} else {
    			return
    		}
    	}
    }
    

    在 run() 里面会利用一个 for 循环不停从 userMailbox 中取消息,并调用 InvokeUserMessage() 去处理消息

    func (ctx *actorContext) InvokeUserMessage(md interface{}) {
    	// ...
    	ctx.processMessage(md)
    	// ...	
    }
    
    func (ctx *actorContext) processMessage(m interface{}) {
    	// ...
    	ctx.messageOrEnvelope = m
    	ctx.defaultReceive()
    	ctx.messageOrEnvelope = nil // release message
    }
    
    • line 9

      将 m (需要处理的 message)放在了 actorContext.messageOrEnvelope 里面

    • line 10

      调用 defaultReceive() 处理 message

    • line 11

      将 actorContext.messageOrEnvelope 置为 nil,代表此条消息处理完了

    有一个值得注意的地方是,在 processMessage() 里面,对 messageOrEnvelope 的赋值是没有使用 mutex 或者原子操作的。因此只有一个协程执行 processMessage() 这个方法,因此对 messageOrEnvelope 的访问时安全的。

    PS: 个人感觉在 Actor model 最主要的临界区是从 actor 的 mailbox 内存入/取出消息,一旦取出消息,后面对消息的处理就是一马平川了

    func (ctx *actorContext) defaultReceive() {
    	// ...
    	ctx.actor.Receive(Context(ctx))
    }
    
    func Receive(context actor.Context) {
    	switch msg := context.Message().(type) {
    	case Hello:
    		context.Respond("Hello " + msg.Who)
    	}
    }
    

    defaultReceive() 最终调用到了在 main.go 中声明的 Receive() 方法。

    Respond()

    接下来主要看看

    context.Respond("Hello " + msg.Who)
    

    这里面所做的工作

    func (ctx *actorContext) Respond(response interface{}) {
    	// ...
    	ctx.Send(ctx.Sender(), response)
    }
    

    ctx.Sender() 用于从 message 中获取 sender 信息

    func (ctx *actorContext) Sender() *PID {
    	return UnwrapEnvelopeSender(ctx.messageOrEnvelope)
    }
    
    func UnwrapEnvelopeSender(message interface{}) *PID {
    	if env, ok := message.(*MessageEnvelope); ok {
    		return env.Sender
    	}
    	return nil
    }
    

    接下来的步骤就和之前差不多了,都是往指定的 PID 发送消息

    func (ctx *actorContext) Send(pid *PID, message interface{}) {
    	ctx.sendUserMessage(pid, message)
    }
    
    func (ctx *actorContext) sendUserMessage(pid *PID, message interface{}) {
      // ...
    	pid.sendUserMessage(ctx.actorSystem, message)
    }
    
    func (pid *PID) sendUserMessage(actorSystem *ActorSystem, message interface{}) {
    	pid.ref(actorSystem).SendUserMessage(pid, message)
    }
    

    唯一不同的是 pid.ref(actorSystem) 得到的将是一个 futureProcess ,之前得到的是 ActorProcess

    func (ref *futureProcess) SendUserMessage(pid *PID, message interface{}) {
      // ...
    	_, msg, _ := UnwrapEnvelope(message)
    	// ...
      ref.result = msg
    	ref.Stop(pid)
    }
    
    func UnwrapEnvelope(message interface{}) (ReadonlyMessageHeader, interface{}, *PID) {
    	if env, ok := message.(*MessageEnvelope); ok {
    		return env.Header, env.Message, env.Sender
    	}
    	return nil, message, nil
    }
    
    func (ref *futureProcess) Stop(pid *PID) {
    	ref.cond.L.Lock()
    	if ref.done {
    		ref.cond.L.Unlock()
    		return
    	}
    
    	ref.done = true
    	// ...
    	ref.cond.L.Unlock()
    	ref.cond.Signal()
    }
    

    如上,futureProcess.SendUserMessage() 先 msg 存入 result 中,将 done 置为 true,并唤醒在 cond 上沉睡的协程 (也就是调用 ·future.Result() 的协程)

    启发

    从 protoactor-go 中获得的启发
    1. Future 对象

      Future 这个抽象在异步编程的世界里面挺常见的,C++ 里面也是有 Future 对象的。

      想要获取异步操作的结果时,可以先让异步操作返回一个 Future 对象,并 await Future 完成即可。

      仔细一想,go 里面的 channel 不就是一个天然的 Future 对象?

    2. 将 Future 抽象为了一个 Actor Process

      这一个抽象我感觉挺巧妙的,将给 Future 赋值这个操作巧妙低转换成了给 Future Actor 发消息

    看源码的启发
    1. 带着问题看源码,要想带着问题,前提是你需要能够提出问题,这就要求你源码的功能提前就得有一些了解
    2. 抓住主干

    其他疑惑 (TODO)

    1. protoactor-go 是怎样处理垃圾消息 (没有 actor 可以处理的消息) 的 (dead letter)
    2. protoactor-go 是怎样处理跨进程调用的?
    3. protoactor-go 是如何处理 userMessage 和 sysMessage 的优先级?
  • 相关阅读:
    Springboot 配置线程池创建线程和配置 @Async 异步操作线程池
    day03 Spring-AOP面向切面编程
    [附源码]计算机毕业设计基于SpringBoot的小说阅读系统
    【MYSQL】表的综合查询
    十大经典排序算法(java实现、配图解,附源码)
    《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(5)-Fiddler监控面板详解
    【Linux开发基础知识】Makefile语法
    鸿蒙OS开发:【一次开发,多端部署】(app市场首页)项目
    谷歌在以色列的路口装上了 AI 红绿灯
    洛谷P1223 排队接水
  • 原文地址:https://www.cnblogs.com/XiaoXiaoShuai-/p/16001285.html