• 手写RPC Day2 高性能客户端


    手写RPC Day2 高性能客户端

    传送门

    1.Call 的设计

    net/rpc 而言,一个函数需要能够被远程调用,需要满足如下五个条件:

    • the method’s type is exported.
    • the method is exported.
    • the method has two arguments, both exported (or builtin) types.
    • the method’s second argument is a pointer.
    • the method has return type error.

    更直观一些:

    func (t *T) MethodName(argType T1, replyType *T2) error
    
    • 1

    根据上述要求,首先我们封装了结构体 Call 来承载一次 RPC 调用所需要的信息。

    // Call represents an active RPC.
    type Call struct {
    	Seq           uint64
    	ServiceMethod string      // format "."
    	Args          interface{} // arguments to the function
    	Reply         interface{} // reply from the function
    	Error         error       // if error occurs, it will be set
    	Done          chan *Call  // Strobes when call is complete.
    }
    
    func (call *Call) done() {
    	call.Done <- call
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    2.实现客户端

    接下来,我们将实现 GeeRPC 客户端最核心的部分 Client。

    // Client represents an RPC Client.
    // There may be multiple outstanding Calls associated
    // with a single Client, and a Client may be used by
    // multiple goroutines simultaneously.
    type Client struct {
    	cc       codec.Codec
    	opt      *Option
    	sending  sync.Mutex // protect following
    	header   codec.Header
    	mu       sync.Mutex // protect following
    	seq      uint64
    	pending  map[uint64]*Call
    	closing  bool // user has called Close
    	shutdown bool // server has told us to stop
    }
    
    var _ io.Closer = (*Client)(nil)
    
    var ErrShutdown = errors.New("connection is shut down")
    
    // Close the connection
    func (client *Client) Close() error {
    	client.mu.Lock()
    	defer client.mu.Unlock()
    	if client.closing {
    		return ErrShutdown
    	}
    	client.closing = true
    	return client.cc.Close()
    }
    
    // IsAvailable return true if the client does work
    func (client *Client) IsAvailable() bool {
    	client.mu.Lock()
    	defer client.mu.Unlock()
    	return !client.shutdown && !client.closing
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • cc 是消息的编解码器,和服务端类似,用来序列化将要发送出去的请求,以及反序列化接收到的响应。
    • sending 是一个互斥锁,和服务端类似,为了保证请求的有序发送,即防止出现多个请求报文混淆。
    • header 是每个请求的消息头,header 只有在请求发送时才需要,而请求发送是互斥的,因此每个客户端只需要一个,声明在 Client 结构体中可以复用。
    • seq 用于给发送的请求编号,每个请求拥有唯一编号。这里本质是一个 记数器,为了方便给每个call的seq编号。
    • pending 存储未处理完的请求,键是编号,值是 Call 实例。
    • closing 和 shutdown 任意一个值置为 true,则表示 Client 处于不可用的状态,但有些许的差别,closing 是用户主动关闭的,即调用 Close 方法,而 shutdown 置为 true 一般是有错误发生。

    在 main 函数中使用了 client.Call 并发了 5 个 RPC 同步调用,参数和返回值的类型均为 string。

    func main() {
        log.SetFlags(0)
    	addr := make(chan string)
    	go startServer(addr)
    	client, _ := geerpc.Dial("tcp", <-addr)
    	defer func() { _ = client.Close() }()
    
    	time.Sleep(time.Second)
    	// send request & receive response
    	var wg sync.WaitGroup
    	for i := 0; i < 5; i++ {
    		wg.Add(1)
    		go func(i int) {
    			defer wg.Done()
    			args := fmt.Sprintf("geerpc req %d", i)
    			var reply string
    			if err := client.Call("Foo.Sum", args, &reply); err != nil {
    				log.Fatal("call Foo.Sum error:", err)
    			}
    			log.Println("reply:", reply)
    		}(i)
    	}
    	wg.Wait()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    3.流程

    client, _ := geerpc.Dial("tcp", <-addr) 创建client。

    调用client.Call发送请求,func (client *Client) receive()处理请求,与Day1类似。

    那么首先实现接收功能,接收到的响应有三种情况:

    • call 不存在,可能是请求没有发送完整,或者因为其他原因被取消,但是服务端仍旧处理了。
    • call 存在,但服务端处理出错,即 h.Error 不为空。
    • call 存在,服务端处理正常,那么需要从 body 中读取 Reply 的值。
    func (client *Client) receive() {
    	var err error
    	for err == nil {
    		var h codec.Header
    		if err = client.cc.ReadHeader(&h); err != nil {
    			break
    		}
    		call := client.removeCall(h.Seq)
    		switch {
    		case call == nil:
    			// it usually means that Write partially failed
    			// and call was already removed.
    			err = client.cc.ReadBody(nil)
    		case h.Error != "":
    			call.Error = fmt.Errorf(h.Error)
    			err = client.cc.ReadBody(nil)
    			call.done()
    		default:
    			err = client.cc.ReadBody(call.Reply)
    			if err != nil {
    				call.Error = errors.New("reading body " + err.Error())
    			}
    			call.done()
    		}
    	}
    	// error occurs, so terminateCalls pending calls
    	client.terminateCalls(err)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    请求处理完后,然后响应报文给client。


    这里支持并发的原因是因为client拥有请求的一个队列map[uint64]*Call,一个chan *Call通道,可以支持发送多个请求,保存多个响应。

    响应采用阻塞同步的形式。每次server这边响应后,然后client使用receive()函数接受对应call的reply,然后发送call.Done()表示成功接收响应。

    与Day1不同的是,Day2将client封装起来了。

    输出结果一样。

  • 相关阅读:
    【深度学习】Python 快速入门
    每日一学————基本配置和管理
    LeetCode:2. 两数之和
    微信小程序用canvasToTempFilePath压缩图片,开发工具压缩正常而真机上比例失调
    Nginx之正则表达式、location匹配简介及rewrite重写
    【Spring注解必知必会】深度解析@Configuration注解
    “AttackCombo-basketball“ app Tech Support(URL)
    位段 联合体 枚举
    契约测试(上):什么是契约测试
    项目完成 - 基于Django3.x版本 - 开发部署小结
  • 原文地址:https://blog.csdn.net/weixin_45750972/article/details/127766759