对 net/rpc
而言,一个函数需要能够被远程调用,需要满足如下五个条件:
更直观一些:
func (t *T) MethodName(argType T1, replyType *T2) error
根据上述要求,首先我们封装了结构体 Call 来承载一次 RPC 调用所需要的信息。
// Call represents an active RPC.
type Call struct {
Seq uint64
ServiceMethod string // format "."
Args interface{} // arguments to the function
Reply interface{} // reply from the function
Error error // if error occurs, it will be set
Done chan *Call // Strobes when call is complete.
}
func (call *Call) done() {
call.Done <- call
}
接下来,我们将实现 GeeRPC 客户端最核心的部分 Client。
// Client represents an RPC Client.
// There may be multiple outstanding Calls associated
// with a single Client, and a Client may be used by
// multiple goroutines simultaneously.
type Client struct {
cc codec.Codec
opt *Option
sending sync.Mutex // protect following
header codec.Header
mu sync.Mutex // protect following
seq uint64
pending map[uint64]*Call
closing bool // user has called Close
shutdown bool // server has told us to stop
}
var _ io.Closer = (*Client)(nil)
var ErrShutdown = errors.New("connection is shut down")
// Close the connection
func (client *Client) Close() error {
client.mu.Lock()
defer client.mu.Unlock()
if client.closing {
return ErrShutdown
}
client.closing = true
return client.cc.Close()
}
// IsAvailable return true if the client does work
func (client *Client) IsAvailable() bool {
client.mu.Lock()
defer client.mu.Unlock()
return !client.shutdown && !client.closing
}
Close
方法,而 shutdown 置为 true 一般是有错误发生。在 main 函数中使用了 client.Call
并发了 5 个 RPC 同步调用,参数和返回值的类型均为 string。
func main() {
log.SetFlags(0)
addr := make(chan string)
go startServer(addr)
client, _ := geerpc.Dial("tcp", <-addr)
defer func() { _ = client.Close() }()
time.Sleep(time.Second)
// send request & receive response
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()
args := fmt.Sprintf("geerpc req %d", i)
var reply string
if err := client.Call("Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply:", reply)
}(i)
}
wg.Wait()
}
client, _ := geerpc.Dial("tcp", <-addr)
创建client。
调用client.Call
发送请求,func (client *Client) receive()
处理请求,与Day1类似。
那么首先实现接收功能,接收到的响应有三种情况:
func (client *Client) receive() {
var err error
for err == nil {
var h codec.Header
if err = client.cc.ReadHeader(&h); err != nil {
break
}
call := client.removeCall(h.Seq)
switch {
case call == nil:
// it usually means that Write partially failed
// and call was already removed.
err = client.cc.ReadBody(nil)
case h.Error != "":
call.Error = fmt.Errorf(h.Error)
err = client.cc.ReadBody(nil)
call.done()
default:
err = client.cc.ReadBody(call.Reply)
if err != nil {
call.Error = errors.New("reading body " + err.Error())
}
call.done()
}
}
// error occurs, so terminateCalls pending calls
client.terminateCalls(err)
}
请求处理完后,然后响应报文给client。
这里支持并发的原因是因为client拥有请求的一个队列map[uint64]*Call
,一个chan *Call
通道,可以支持发送多个请求,保存多个响应。
响应采用阻塞同步的形式。每次server这边响应后,然后client使用receive()函数接受对应call的reply,然后发送call.Done()表示成功接收响应。
与Day1不同的是,Day2将client封装起来了。
输出结果一样。