• RPC 实战与核心原理分析


    RPC 实战与核心原理分析

    • RPCX是一个分布式的Go语言的 RPC 框架,支持Zookepper、etcd、consul多种服务发现方式,多种服务路由方式,

    例子

    服务端

    package main
    
    import (
    "flag"
    
    example "github.com/rpcxio/rpcx-examples"
    "github.com/smallnest/rpcx/server"
    )
    
    var addr = flag.String("addr", "localhost:8972", "server address")
    
    func main() {
    flag.Parse()
    
    s := server.NewServer()
    // s.RegisterName("Arith", new(example.Arith), "")
    s.Register(new(example.Arith), "")
    s.Serve("tcp", *addr)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19

    客户端

    package main
    
    import (
    	"context"
    	"flag"
    	"log"
    	"time"
    
    	example "github.com/rpcxio/rpcx-examples"
    	"github.com/smallnest/rpcx/client"
    )
    
    var (
    	addr = flag.String("addr", "localhost:8972", "server address")
    )
    
    func main() {
    	flag.Parse()
    
    	d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
    	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
    	defer xclient.Close()
    
    	args := &example.Args{
    		A: 10,
    		B: 20,
    	}
    
    	for {
    		reply := &example.Reply{}
    		err := xclient.Call(context.Background(), "Mul", args, reply)
    		if err != nil {
    			log.Fatalf("failed to call: %v", err)
    		}
    
    		log.Printf("%d * %d = %d", args.A, args.B, reply.C)
    		time.Sleep(1e9)
    	}
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40

    这是一个简单的例子,使用RPCX实现了服务端和客户端。

    RPCX实例分析

    如何发布自己的服务

    在上面的例子中,我们只需要用service.NewServer()→ RegisterName → Serve 就可以开始一个rpc服务端。我们也可以使用rpcx的插件功能,将服务端的地址注册到持久化存储当中,这样client 就可以使用服务发现的功能来对服务端进行rpc调用。

    使用RegistryPlugin的方式进行服务注册时,rpcx 会在服务启动前将服务注册到注册中心中。

    // RegisterPlugin is .
    // 注册中心接口定义,如果你要实现自己的注册中心,就需要实现这个RegisterPlugin接口
    RegisterPlugin interface {
    	Register(name string, rcvr interface{}, metadata string) error
    	Unregister(name string) error
    }
    
    // RegisterName is like Register but uses the provided name for the type
    // instead of the receiver's concrete type.
    func (s *Server) RegisterName(name string, rcvr interface{}, metadata string) error {
    	_, err := s.register(rcvr, name, true)
    	if err != nil {
    		return err
    	}
    	if s.Plugins == nil {
    		s.Plugins = &pluginContainer{}
    	}
        // RegistryPlugin 执行,进行服务注册
    	return s.Plugins.DoRegister(name, rcvr, metadata)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • rpcx 支持 ZooKeeper,Etcd,mDNS,Consul,Redis,DNS等方式进行服务注册。例子可以参考:https://github.com/rpcxio/rpcx-examples/tree/master/registry。

    如何调用他人的远程服务

    我们知道,一个RPC的调用会涉及到两端,一个是Client,一个是Server端。两端的基本工作如下

    Server 端工作

    • 监听端口
    • 响应连接请求
    • 接收数据包
    • 解析数据包
    • 调用相应方法
    • 组装请求处理结果数据包
    • 发送结果数据包
    • 序列化协议
    服务端如何接收客户端请求
    serveListener
    • 服务端在监听端口之后,会有一个serveListener 函数,
    • 这个函数一个for循环 Accept 客户端过来的连接,
    • 最后会调用 serveConn 来处理这个请求
    // serveListener accepts incoming connections on the Listener ln,
    // creating a new service goroutine for each.
    // The service goroutines read requests and then call services to reply to them.
    func (s *Server) serveListener(ln net.Listener) error {
    	var tempDelay time.Duration
    
    	s.mu.Lock()
    	s.ln = ln
    	s.mu.Unlock()
    
    	for {
            // 接受客户端过来的连接
    		conn, e := ln.Accept()         
            // 省略代码........   
            if tc, ok := conn.(*net.TCPConn); ok {
    			period := s.options["TCPKeepAlivePeriod"]
    			if period != nil {
    				tc.SetKeepAlive(true)
    				tc.SetKeepAlivePeriod(period.(time.Duration))
    				tc.SetLinger(10)
    			}
    		}
            // 处理请求之前的操作
    		conn, ok := s.Plugins.DoPostConnAccept(conn)
    		if !ok {
    			conn.Close()
    			continue
    		}
            // 保存到这个请求
    		s.mu.Lock()
    		s.activeConn[conn] = struct{}{}
    		s.mu.Unlock()
    
    		if share.Trace {
    			log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())
    		}
            // 处理连接请求
    		go s.serveConn(conn)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    serveConn

    serveConn 是读取和处理用户过来的请求,如果开启了异步写,服务端会异步response给client,
    其中 readRequest 从网络连接中读取到客户端过来的数据,这里会包含解码解压缩操作
    processOneRequest 就是真正处理客户端请求的函数了,这里会通过反射调用服务的方法

    func (s *Server) serveConn(conn net.Conn) {
    
        // 省略代码........
    
    	r := bufio.NewReaderSize(conn, ReaderBuffsize)
        // 开启异步response给client
    	var writeCh chan *[]byte
    	if s.AsyncWrite {
    		writeCh = make(chan *[]byte, 1)
    		defer close(writeCh)
    		go s.serveAsyncWrite(conn, writeCh)
    	}
    
    	// read requests and handle it
        // 读取和处理请求
    	for {
            // 省略代码......
            // 读取请求
    		req, err := s.readRequest(ctx, r)
    	    // 处理请求
    		if s.pool != nil {
    			s.pool.Submit(func() {
    				s.processOneRequest(ctx, req, conn, writeCh)
    			})
    		} else {
    			go s.processOneRequest(ctx, req, conn, writeCh)
    		}
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    服务端解析客户端请求
    • 协议详解见:RPCX协议详解
    • 在Decode中会进行协议解析
    func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Message, err error) {
        // 处理request之前的操作
    	err = s.Plugins.DoPreReadRequest(ctx)
    	if err != nil {
    		return nil, err
    	}
    	// pool req?
    	req = protocol.GetPooledMsg()
        // 解码解压缩
    	err = req.Decode(r)
    	if err == io.EOF {
    		return req, err
    	}
        // 处理request之前的操作
    	perr := s.Plugins.DoPostReadRequest(ctx, req, err)
    	if err == nil {
    		err = perr
    	}
    	return req, err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    如果配置了压缩方式,也会对服务的数据进行解压缩

    func (m *Message) Decode(r io.Reader) error {
    
        // 前面都是协议的解析
    
        // 如果配置了压缩方式,对服务的数据进行解压缩到 Payload 中
    	if m.CompressType() != None {
    		compressor := Compressors[m.CompressType()]
    		if compressor == nil {
    			return ErrUnsupportedCompressor
    		}
    		m.Payload, err = compressor.Unzip(m.Payload)
    		if err != nil {
    			return err
    		}
    	}
    
    	return err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    服务端调用相应方法和返回结果给客户端端

    接受到用户的请求之后,就是调用服务端对应的方法了,在用户消息Decode 中,服务端已经把ServicePath、ServiceMethod 解析出来了,剩下的就是只要找到对应的方法就可以执行

    	m.ServicePath = util.SliceByteToString(data[n:nEnd])
    	m.ServiceMethod = util.SliceByteToString(data[n:nEnd])
    
    • 1
    • 2

    前面在 servConn 中的函数 processOneRequest 会处理Client的请求,我们来看下,这里是怎么处理的,

    • 先解析出 Payload 中的 请求参数,
    • 然后用参数去调用 service 这个函数service := s.serviceMap[serviceName] 就是 服务注册过去的业务函数。
    • 然后用sendResponse 发送responseclient
    func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
    	serviceName := req.ServicePath
    	methodName := req.ServiceMethod
    
        // 代码省略....
    
    	// get a argv object from object pool
    	argv := reflectTypePools.Get(mtype.ArgType)
    
    	codec := share.Codecs[req.SerializeType()]
    	if codec == nil {
    		err = fmt.Errorf("can not find codec for %d", req.SerializeType())
    		return s.handleError(res, err)
    	}
        // 解析用户的请求参数
    	err = codec.Decode(req.Payload, argv)
    	if err != nil {
    		return s.handleError(res, err)
    	}
    
    	argv, err = s.Plugins.DoPreCall(ctx, serviceName, methodName, argv)
    	if err != nil {
    		// return reply to object pool
    		reflectTypePools.Put(mtype.ReplyType, replyv)
    		return s.handleError(res, err)
    	}
        // 调用service中注册的业务函数,处理业务方式
    	if mtype.ArgType.Kind() != reflect.Ptr {
    		err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
    	} else {
    		err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
    	}
    
       // 代码省略....
    	return res, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36

    Client 端工作

    • 建立与Server的连接
    • 组装数据
    • 发送数据包
    • 接收处理结果数据包
    • 解析返回数据包
    客户端和服务端的连接是怎样建立的

    client 在调用 server的时候,只需要,NewXClient,然后调用Call 就可以请求对应服务的方法。

    NewXClient
    • 通过服务发现的方式获取到服务器列表
    • 选择一种负载均衡的方式
    • 监听服务列表的变化
    func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
    	client := &xClient{
    		failMode:     failMode,
    		selectMode:   selectMode,
    		discovery:    discovery,
    		servicePath:  servicePath,
    		cachedClient: make(map[string]RPCClient),
    		option:       option,
    	}
        // 通过服务发现的方式获取到服务器列表
    	pairs := discovery.GetServices()
    	sort.Slice(pairs, func(i, j int) bool {
    		return strings.Compare(pairs[i].Key, pairs[j].Key) <= 0
    	})
    	servers := make(map[string]string, len(pairs))
    	for _, p := range pairs {
    		servers[p.Key] = p.Value
    	}
    	filterByStateAndGroup(client.option.Group, servers)
    
        // 选择一种负载均衡的方式
    	client.servers = servers
    	if selectMode != Closest && selectMode != SelectByUser {
    		client.selector = newSelector(selectMode, servers)
    	}
    
    	client.Plugins = &pluginContainer{}
    
       // 监听服务列表的变化
    	ch := client.discovery.WatchService()
    	if ch != nil {
    		client.ch = ch
    		go client.watch(ch)
    	}
    
    	return client
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    Call
    call 方法做了两件事
    • selectClient:根据负载均衡的算法选出一个服务器
    • 与服务端建立连接。
    func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, args interface{}) (RPCClient, error) {
    
        // 先通过缓存拿到RPCClient,如果能拿到,就不需要再新建连接了
    	c.mu.Lock()
    	client = c.findCachedClient(k, servicePath, serviceMethod)
    	if client != nil {
    		if !client.IsClosing() && !client.IsShutdown() {
    			c.mu.Unlock()
    			return client, nil
    		}
    		c.deleteCachedClient(client, k, servicePath, serviceMethod)
    	}
        // 为什么这里会再查一次缓存,有可能在第一次findCachedClient的时候还没有缓存,
        // 但是之前后有连接建立了,新建了缓存,这里做一个Double Check
    	client = c.findCachedClient(k, servicePath, serviceMethod)
    	c.mu.Unlock()
    
    	if client == nil || client.IsShutdown() {
    		c.mu.Lock()
    		generatedClient, err, _ := c.slGroup.Do(k, func() (interface{}, error) {
                // 去建立连接
    			return c.generateClient(k, servicePath, serviceMethod)
    		})
    		c.mu.Unlock()
    
    		c.slGroup.Forget(k)
    		if err != nil {
    			return nil, err
    		}
    
    		client = generatedClient.(RPCClient)
    		if c.Plugins != nil {
    			needCallPlugin = true
    		}
    
    		client.RegisterServerMessageChan(c.serverMessageChan)
    
    		c.mu.Lock()
    		c.setCachedClient(client, k, servicePath, serviceMethod)
    		c.mu.Unlock()
    	}
    
    	return client, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    generateClient 函数最底层又调用了 RPCClient.Connect,Connect 实现了各种网络类型的连接方式,http,ws,wss,或者自定义,也在这里会有一个client.input() 读取返回的数据, client.heartbeat 发送心跳请求

    // Connect connects the server via specified network.
    func (client *Client) Connect(network, address string) error {
    	var conn net.Conn
    	var err error
        // 实现了各种网络类型的连接方式,http,ws,wss,或者自定义
        // 默认是tcp
    	switch network {
    	case "http":
    		conn, err = newDirectHTTPConn(client, network, address)
    	case "ws", "wss":
    		conn, err = newDirectWSConn(client, network, address)
    	default:
    		fn := ConnFactories[network]
    		if fn != nil {
                // 自定义的连接方式
    			conn, err = fn(client, network, address)
    		} else {
                // tcp
    			conn, err = newDirectConn(client, network, address)
    		}
    	}
    
    	if err == nil && conn != nil {
    		if tc, ok := conn.(*net.TCPConn); ok && client.option.TCPKeepAlivePeriod > 0 {
    			_ = tc.SetKeepAlive(true)
    			_ = tc.SetKeepAlivePeriod(client.option.TCPKeepAlivePeriod)
    		}
    
    		if client.option.IdleTimeout != 0 {
    			_ = conn.SetDeadline(time.Now().Add(client.option.IdleTimeout))
    		}
    
    		if client.Plugins != nil {
    			conn, err = client.Plugins.DoConnCreated(conn)
    			if err != nil {
    				return err
    			}
    		}
    
    		client.Conn = conn
    		client.r = bufio.NewReaderSize(conn, ReaderBuffsize)
    		// c.w = bufio.NewWriterSize(conn, WriterBuffsize)
    
    		// start reading and writing since connected
            // 读取返回的数据
    		go client.input()
    
    		if client.option.Heartbeat && client.option.HeartbeatInterval > 0 {
    			go client.heartbeat()
    		}
    
    	}
    
    	if err != nil && client.Plugins != nil {
    		client.Plugins.DoConnCreateFailed(network, address)
    	}
    
    	return err
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    客户端是怎样给服务端发送数据的
    • wrapCall:建立连接之后,发送请求给服务端,最底层其实是调用了 client.send(ctx, call)

    • 首先会利用pending 一个map 结构 记录当前未结束的请求

    • 然后将当前的序列号+1

    • 对请求进行编码

    • 对服务方请求,把请求给发送到服务端

    func (client *Client) send(ctx context.Context, call *Call) {
    
        // .......
    
        // 代码省略......
        // 记录当前未结束的请求
    	if client.pending == nil {
    		client.pending = make(map[uint64]*Call)
    	}
    
    	seq := client.seq
        // 序列号+1
    	client.seq++
    	client.pending[seq] = call
    	client.mutex.Unlock()
    
    	if cseq, ok := ctx.Value(seqKey{}).(*uint64); ok {
    		*cseq = seq
    	}
    
        // 对请求进行编码 .....
    	data, err := codec.Encode(call.Args)
        // 代码省略......
        // 对服务方请求,这个时候就是把请求给发送过去了
    	_, err = client.Conn.Write(*allData)
    
        // 代码省略......
    
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    客户端是怎样获取服务端的返回数据的

    前面我们在阅读代码的时候可以看到,在 generateClient 的时候,会起一个协程执行 client.input(),这个client.input() 其实就是读取服务端返回数据的协程。

    • input 对不同的几类请求都有处理:

      • 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
        是一个request 返回的错误请求。则需要处理错误
      • 默认是一个reqeust 的 response
    func (client *Client) input() {
    	var err error
    
    	for err == nil {
            // 代码省略......
    
            // 解码
    		err = res.Decode(client.r)
    		if err != nil {
    			break
    		}
    		if client.Plugins != nil {
    			_ = client.Plugins.DoClientAfterDecode(res)
    		}
    
    		 // 代码省略......
    
            seq := res.Seq()
    		var call *Call
    		isServerMessage := (res.MessageType() == protocol.Request && !res.IsHeartbeat() && res.IsOneway())
    		if !isServerMessage {
    			client.mutex.Lock()
    			call = client.pending[seq]
    			delete(client.pending, seq)
    			client.mutex.Unlock()
    		}
    
             // 针对不同的请求进行处理
    		switch {
    		case call == nil:
             // 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
    			if isServerMessage {
    				if client.ServerMessageChan != nil {
    					client.handleServerRequest(res)
    				}
    				continue
    			}
    		case res.MessageStatusType() == protocol.Error:
                // 是一个request 返回的错误请求。则需要处理错误
    			// We've got an error response. Give this to the request
    			if len(res.Metadata) > 0 {
    				call.ResMetadata = res.Metadata
    
    				// convert server error to a customized error, which implements ServerError interface
    				if ClientErrorFunc != nil {
    					call.Error = ClientErrorFunc(res.Metadata[protocol.ServiceError])
    				} else {
    					call.Error = strErr(res.Metadata[protocol.ServiceError])
    				}
    
    			}
    
    			if call.Raw {
    				call.Metadata, call.Reply, _ = convertRes2Raw(res)
    				call.Metadata[XErrorMessage] = call.Error.Error()
    			} else if len(res.Payload) > 0 {
    				data := res.Payload
    				codec := share.Codecs[res.SerializeType()]
    				if codec != nil {
    					_ = codec.Decode(data, call.Reply)
    				}
    			}
    			call.done()
    		default:
                // 默认就是一个request 的 response 请求
    			if call.Raw {
    				call.Metadata, call.Reply, _ = convertRes2Raw(res)
    			} else {
    				data := res.Payload
    				if len(data) > 0 {
    					codec := share.Codecs[res.SerializeType()]
    					if codec == nil {
    						call.Error = strErr(ErrUnsupportedCodec.Error())
    					} else {
    						err = codec.Decode(data, call.Reply)
    						if err != nil {
    							call.Error = strErr(err.Error())
    						}
    					}
    				}
    				if len(res.Metadata) > 0 {
    					call.ResMetadata = res.Metadata
    				}
    
    			}
    
    			call.done()
    		}
    	}
        // 代码省略......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91

    其他

    消息里为什么要有SeqID

    RPCX里面对服务端的调用其实是异步的,即对于当前线程来说,将请求发送出来后,协程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:

    • 怎么让当前协程“暂停”,等结果回来后,再向后执行?
    • 如果有多个协程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是随机的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个协程调用的?
    怎么解决呢?
    • client线程每次通过socket调用一次远程接口前,生成一个唯一的ID,即SeqID(SeqID必需保证在一个Socket连接里面是唯一的),一般常常使用uint64从0开始累计数字生成唯一ID;
    • 存放到client的pending里面(requestID, call);
    • 当协程异步发送消息后,紧接着执行call.Done()的方法试图获取远程返回的结果。在Done()内部,会等待 chan *Call
    • 服务端接收到请求并处理后,将response结果(此结果中包含了前面的SeqID)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到SeqID,再从前面的pending里面get(SeqID),从而找到Call 对象,再将结果发送到chan *Call 中
    • 客户端从Done()取到Call 对象,处理返回的结果

    RPCX 核心功能

    连接管理

    • 保持与服务提供方长连接,用于传输请求数据也返回结果。

    RPCX 的连接管理比较简单,其实就是用一个map将连接对存储起来。

    client 端
    type xClient struct {
        // key: 是根据:servicePath, serviceMethod,args 由不同的负载均衡算法算出来的,
        // RPCClient 就是真正的持有连接的Client
    	cachedClient map[string]RPCClient
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5

    负载均衡

    • 确保多个服务提供方节点流量均匀/合理,支持节点扩容与灰度发布。RPCX 提供了多种负载均衡的方式。轮训,随机,带权重,一致性Hash,网络质量,地理位置。你也可以通过自己实现 Select 接口来实现自己的负载均衡方式,
    • 自己实现的负载均衡的方式的话,你就可以自己实现请求路由,达到应用隔离,读写分离,灰度发布中的作用
    type Selector interface {
    	Select(ctx context.Context, servicePath, serviceMethod string, args interface{}) string // SelectFunc
    	UpdateServer(servers map[string]string)
    }
    
    // 负载均衡的实现
    func newSelector(selectMode SelectMode, servers map[string]string) Selector {
    	switch selectMode {
        // 随机     
    	case RandomSelect:
    		return newRandomSelector(servers)
        // 轮训
    	case RoundRobin:
    		return newRoundRobinSelector(servers)
        // 带权重    
    	case WeightedRoundRobin:
    		return newWeightedRoundRobinSelector(servers)
        // 网络质量
    	case WeightedICMP:
    		return newWeightedICMPSelector(servers)
        // 一致性Hash
    	case ConsistentHash:
    		return newConsistentHashSelector(servers)
        // 
    	case SelectByUser:
    		return nil
    	default:
        // 随机
    		return newRandomSelector(servers)
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    超时处理

    • 对于长时间没有返回的请求,需要作出异常处理,及时释放资源。RPCX的超市处理其实是从两个方面来做的
      • 在底层网络连接上,通过SetDeadline 对读写请求设置读写超时时间
      • 在RPCX框架层面通过select + ctx 实现超时处理

    服务保护

    • 服务提供方为保证正常运行,主动丢弃超出处理能力外的请求。常见的有限流、熔断、流量丢弃这几种方式
      • 限流:对他人调用自己服务的限制,常见的限流算法有:固定窗口, 滑动窗口,令牌桶算法,漏桶限流算法。
      • 熔断: 当调用他人服务出现问题的时候,主动放弃调用其他服务。Rpcx 提供了一个简单的断路器 ConsecCircuitBreaker, 它在连续失败一定次数后就会断开,再经过一段时间后打开。 你可以将你的断路器设置到 Option.Breaker中。
      • 流量丢弃:框架实现,当RPC Server 端挤压的请求较多时,RPC框架直接将这个请求丢弃。在Go中,可以用一个带缓冲的channel 实现,当缓冲区满了时,直接将流量丢弃。

    失败重试

    • rpcx支持四种调用失败模式,用来处理服务调用失败后的处理逻辑, 你可以在创建XClient的时候设置它。

      • Failfast: 一旦调用一个节点失败, rpcx立即会返回错误。 注意这个错误不是业务上的 Error, 业务上服务端返回的Error应该正常返回给客户端,这里的错误可能是网络错误或者服务异常。默认策略就是.
        Failfast
      • Failtry:rpcx如果调用一个节点的服务出现错误, 它也会尝试,但是还是选择这个节点进行重试, 直到节点正常返回数据或者达到最大重试次数。
      • Failover: rpcx如果遇到错误,它会尝试调用另外一个节点, 直到服务节点能正常返回信息,或者达到最大的重试次数。 重试测试Retries在参数Option中设置, 默认设置为3。
      • Failbackup:如果服务节点在一定的时间内不返回结果, rpcx客户端会发送相同的请求到另外一个节点, 只要这两个节点有一个返回, rpcx就算调用成功。这种通过资源换取延迟的方式可以参看 Jeff Dean的文章 Achieving Rapid Response Times in Large Online Services,这种实现非常重要,我们重点来看这是怎么实现的
    Failbackup 重试模式
    • 生成两个Call,用来接受可能的两次rpc请求的结果
    • 第一个请求开始
    • 设置第一个请求多久没有返回才开启第二个请求的时间
    • select 监听 ctx是否退出,第一个请求, 是否达到设置的请求时间第一个请求还没有返回
      • 如果ctx Done。则直接返回
      • 如果第一个请求的结果返回,则处理结果
      • 如果达到设置的请求时间第一个请求还没有返回,则开始第二个请求
    • 开始发送第二个请求
    • select 就需要 ctx是否退出,第一个请求,第二个请求。看哪一个数据先准备好
      • 如果ctx Done。则直接返回
      • 如果第一个请求的结果返回,则处理结果
      • 如果第二个请求的结果返回,则处理结果
    // Failbackup 重试模式
    case Failbackup:
    		ctx, cancelFn := context.WithCancel(ctx)
    		defer cancelFn()
            // 生成两个Call,用来接受可能的两次rpc请求的结果
    		call1 := make(chan *Call, 10)
    		call2 := make(chan *Call, 10)
    		var reply1, reply2 interface{}
    
    		if reply != nil {
    			reply1 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
    			reply2 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
    		}
            // 第一个请求开始
    		_, err1 := c.Go(ctx, serviceMethod, args, reply1, call1)
    
            // 第一个请求多久没有返回才开启第二个请求
    		t := time.NewTimer(c.option.BackupLatency)
            // select 监听 ctx,call,t 看谁先返回数据
    		select {
            // 如果ctx Done。则直接返回
    		case <-ctx.Done(): // cancel by context
    			err = ctx.Err()
    			return err
            // 第一个请求的结果
    		case call := <-call1:
    			err = call.Error
                // 有请求数据返回
    			if err == nil && reply != nil {
                    // 设置给reply
    				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
    			}
                // 结束
    			return err
             // 如果t的时间先到,则这里什么也不做,而直接走到下面的代码   
    		case <-t.C:
    
    		}
            // 这里开始发送第二个请求
    		_, err2 := c.Go(ctx, serviceMethod, args, reply2, call2)
    		if err2 != nil {
    			if uncoverError(err2) {
    				c.removeClient(k, c.servicePath, serviceMethod, client)
    			}
    			err = err1
    			return err
    		}
            // 这里select 就需要监听 ctx,第一个请求call1,第二个请求call2。看哪一个数据先准备好
    		select {
    		case <-ctx.Done(): // cancel by context
    			err = ctx.Err()
    		case call := <-call1:
    			err = call.Error
    			if err == nil && reply != nil && reply1 != nil {
    				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
    			}
    		case call := <-call2:
    			err = call.Error
    			if err == nil && reply != nil && reply2 != nil {
    				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply2).Elem())
    			}
    		}
    		return err
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63

    总结

    • 通过分析了RPCX框架的实现原理,我们大概知道了一次RPC请求到底发生了什么事情。我们也可以知道

    • RPC 主要用于公司内部的服务调用,性能消耗低,传输效率高,实现复杂。

    • HTTP 主要用于对外的异构环境,浏览器接口调用,App 接口调用,第三方接口调用等。

    • RPC 使用场景(大型的网站,内部子系统较多、接口非常多的情况下适合使用 RPC):

      • 长链接。不必每次通信都要像 HTTP 一样去 3 次握手,减少了网络开销。
      • 注册发布机制。RPC 框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
      • 安全性,没有暴露资源操作。
      • 微服务支持。就是最近流行的服务化架构、服务化治理,RPC 框架是一个强力的支撑。
  • 相关阅读:
    多智能体强化学习之值函数分解:VDN、QMIX、QTRAN系列优缺点分析(转载)
    C&C++指针实训(国防科大)
    【JVM故障问题排查心得】「内存诊断系列」JVM内存与Kubernetes中pod的内存、容器的内存不一致所引发的OOMKilled问题总结(上)
    Web前端:渐进式Web应用程序有哪些主要功能
    上位机模块之halcon绘制ROI与获取ROI,在hsmartwindow实现
    JavaScript判断是否为空对象的几种方法
    Python常见的开发工具合集对比
    puzzle(102.1)不规则数独、变种数独
    (个人杂记)第六章 跑马灯实验
    前端面试题目(三十)
  • 原文地址:https://blog.csdn.net/baidu_32452525/article/details/126975916