• 手写RPC框架-第四天超时处理


    手写RPC框架-第四天超时处理

    1.为什么需要超时处理机制

    超时处理是 RPC 框架一个比较基本的能力,如果缺少超时处理机制,无论是服务端还是客户端都容易因为网络或其他错误导致挂死,资源耗尽,这些问题的出现大大地降低了服务的可用性。因此,我们需要在 RPC 框架中加入超时处理的能力。

    纵观整个远程调用的过程,需要客户端处理超时的地方有:

    • 与服务端建立连接,导致的超时
    • 发送请求到服务端,写报文导致的超时
    • 等待服务端处理时,等待处理导致的超时(比如服务端已挂死,迟迟不响应)
    • 从服务端接收响应时,读报文导致的超时

    需要服务端处理超时的地方有:

    • 读取客户端请求报文时,读报文导致的超时
    • 发送响应报文时,写报文导致的超时
    • 调用映射服务的方法时,处理报文导致的超时

    GeeRPC 在 3 个地方添加了超时处理机制。分别是:

    1)客户端创建连接时
    2)客户端 Client.Call() 整个过程导致的超时(包含发送报文,等待处理,接收报文所有阶段)
    3)服务端处理报文,即 Server.handleRequest 超时。

    2.创建连接超时

    为了实现上的简单,将超时设定放在了 Option 中。ConnectTimeout 默认值为 10s,HandleTimeout 默认值为 0,即不设限。

    type Option struct {
    	MagicNumber    int           // MagicNumber marks this's a geerpc request
    	CodecType      codec.Type    // client may choose different Codec to encode body
    	ConnectTimeout time.Duration // 0 means no limit
    	HandleTimeout  time.Duration
    }
    
    var DefaultOption = &Option{
    	MagicNumber:    MagicNumber,
    	CodecType:      codec.GobType,
    	ConnectTimeout: time.Second * 10,
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    客户端连接超时,只需要为 Dial 添加一层超时处理的外壳即可。

    type clientResult struct {
    	client *Client
    	err    error
    }
    
    type newClientFunc func(conn net.Conn, opt *Option) (client *Client, err error)
    
    func dialTimeout(f newClientFunc, network, address string, opts ...*Option) (client *Client, err error) {
    	opt, err := parseOptions(opts...)
    	if err != nil {
    		return nil, err
    	}
    	conn, err := net.DialTimeout(network, address, opt.ConnectTimeout)
    	if err != nil {
    		return nil, err
    	}
    	// close the connection if client is nil
    	defer func() {
    		if err != nil {
    			_ = conn.Close()
    		}
    	}()
    	ch := make(chan clientResult)
    	go func() {
    		client, err := f(conn, opt)
    		ch <- clientResult{client: client, err: err}
    	}()
    	if opt.ConnectTimeout == 0 {
    		result := <-ch
    		return result.client, result.err
    	}
    	select {
    	case <-time.After(opt.ConnectTimeout):
    		return nil, fmt.Errorf("rpc client: connect timeout: expect within %s", opt.ConnectTimeout)
    	case result := <-ch:
    		return result.client, result.err
    	}
    }
    
    // Dial connects to an RPC server at the specified network address
    func Dial(network, address string, opts ...*Option) (*Client, error) {
    	return dialTimeout(NewClient, network, address, opts...)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    在这里实现了一个超时处理的外壳 dialTimeout,这个壳将 NewClient 作为入参,在 2 个地方添加了超时处理的机制。

    1. net.Dial 替换为 net.DialTimeout,如果连接创建超时,将返回错误。
      2)使用子协程执行 NewClient,执行完成后则通过信道 ch 发送结果,如果 time.After() 信道先接收到消息,则说明 NewClient 执行超时,返回错误。

    3.Client.Call 超时

    Client.Call 的超时处理机制,使用 context 包实现,控制权交给用户,控制更为灵活。

    // Call invokes the named function, waits for it to complete,
    // and returns its error status.
    func (client *Client) Call(ctx context.Context, serviceMethod string, args, reply interface{}) error {
    	call := client.Go(serviceMethod, args, reply, make(chan *Call, 1))
    	select {
    	case <-ctx.Done():
    		client.removeCall(call.Seq)
    		return errors.New("rpc client: call failed: " + ctx.Err().Error())
    	case call := <-call.Done:
    		return call.Error
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    用户可以使用 context.WithTimeout 创建具备超时检测能力的 context 对象来控制。例如:

    ctx, _ := context.WithTimeout(context.Background(), time.Second)
    var reply int
    err := client.Call(ctx, "Foo.Sum", &Args{1, 2}, &reply)
    ...
    
    • 1
    • 2
    • 3
    • 4

    4.服务端处理超时

    这一部分的实现与客户端很接近,使用 time.After() 结合 select+chan 完成。

    day4-timeout/server.go

    func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup, timeout time.Duration) {
    	defer wg.Done()
    	called := make(chan struct{})
    	sent := make(chan struct{})
    	go func() {
    		err := req.svc.call(req.mtype, req.argv, req.replyv)
    		called <- struct{}{}
    		if err != nil {
    			req.h.Error = err.Error()
    			server.sendResponse(cc, req.h, invalidRequest, sending)
    			sent <- struct{}{}
    			return
    		}
    		server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
    		sent <- struct{}{}
    	}()
    
    	if timeout == 0 {
    		<-called
    		<-sent
    		return
    	}
    	select {
    	case <-time.After(timeout):
    		req.h.Error = fmt.Sprintf("rpc server: request handle timeout: expect within %s", timeout)
    		server.sendResponse(cc, req.h, invalidRequest, sending)
    	case <-called:
    		<-sent
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    这里需要确保 sendResponse 仅调用一次,因此将整个过程拆分为 calledsent 两个阶段,在这段代码中只会发生如下两种情况:

    1. called 信道接收到消息,代表处理没有超时,继续执行 sendResponse。
    2. time.After() 先于 called 接收到消息,说明处理已经超时,called 和 sent 都将被阻塞。在 case <-time.After(timeout) 处调用 sendResponse

    这里只有函数sendResponse执行成功,才会往sent信道中写入。

    因此下面的select可以判断是否超时。


    关于time.After 和 context.WithTimeout怎么使用,可以参考我下面的文章。

    传送门

  • 相关阅读:
    【软考】系统集成项目管理工程师(十)项目质量管理【3分】
    让交互更加生动!有意思的鼠标跟随 3D 旋转动效
    linux源码安装postgresql以及smlar插件
    [南京大学2022操作系统-P11] 操作系统上的进程 (最小 Linux; fork, execve 和 exit)
    借助reCAPTCHA实现JavaScript验证码功能
    网页保存为pdf神器(可自定义编辑)—Print Edit WE
    这份华为以太网接口配置命令太真香了
    使用Hybrid Flow并添加API访问控制
    强推Linux高性能服务器编程, 真的是后端开发技术提升, 沉淀自身不容错过的一本经典书籍
    程序人生——Java中字符串使用的建议
  • 原文地址:https://blog.csdn.net/weixin_45750972/article/details/127801011