• go 实现 rpc


    go 实现 rpc

    简介

    • 远程过程调用(Remote Procedure Call, RPC) 是一个通信协议

    • go 中实现rpc 非常简单,官方提供了封装好的包,并且还有一些第三方的包

    • go 官方的 net/rpc 库使用 encoding/gob 进行编码, 支持 tcp 和 http数据传输方式,由于gob 是go 独有的编码方式,所以go 的 RPC 只支持 go 开发的服务端和客户端之间的交互

    • 官方 另外还提供了 net/rpc/jsonrpc 包实现RPC 方法,jsonrpc采用JSON进行数据编解码, 因而支持跨语言调用,

    • go 的 rpc 必须符合4个规范

      • 结构体首字符大写,跨域访问需要大写
      • 函数名必须首字母大写
      • 函数第一个参数是接收参数, 第二个参数是返回给客户端参数,必须是指针类型
      • 函数必须有一个返回值 error
    • 微服务架构下数据交互一般是对内RPC 对外 REST

    数据传输格式

    成熟的RPC 框架会有自定义传输协议,网络传输格式定义如下, 前面是固定长度的消息头,后面是变长消息体

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rIAZY6lv-1664092672416)(go%20%E5%AE%9E%E7%8E%B0%20rpc.assets/image-20220924210612262.png)]

    实现RPC

    设计

    1. 服务端接收的数据
      • 调用的函数名、参数列表
      • 通常约定函数的第二个返回值类型是 error 类型
    2. 服务端需要解决的问题
      • Client 调用时只传过来函数名,需要维护函数名到函数之间的map映射
      • 数据的动态获取需要通过 反射实现
    3. 服务端的核心功能
      • 维护函数名到函数反射值的map
      • client 端传函数名、参数列表后,服务端要解析为反射值,执行调用
      • 函数返回值的打包,并通过网络返回给客户端
    4. 客户端
      • 客户端只有函数原型,使用reflect.MakeFunc() 可以完成原型到函数的调用
      • reflect.MakeFunc() 是 Client 从函数原型到网络调用的关键

    代码

    codec.go

    package main
    
    import (
    	"bytes"
    	"encoding/gob"
    	"reflect"
    )
    
    // 定义编码解码
    type RPCData struct {
    	// 访问的函数
    	Name string
    	// 访问时传的参数
    	Args []interface{}
    }
    
    // 编码
    func encode(data RPCData) ([]byte,error)  {
    	var buf bytes.Buffer
    	// 得到字节数组的编码器
    	bufEnc := gob.NewEncoder(&buf)
    	// 注册
    	gob.Register(&reflect.Value{})
    	// 对数据编码
    	if err:=bufEnc.Encode(data);err!= nil{
    		return nil, err
    	}
    	return buf.Bytes(),nil
    }
    
    // 解码
    func decode(b []byte) (RPCData, error)  {
    	buf := bytes.NewBuffer(b)
    	// 返回字节数组解码器
    	bufDec := gob.NewDecoder(buf)
    	var data RPCData
    	if err := bufDec.Decode(&data);err!=nil{
    		return data, err
    	}
    	return data,nil
    }
    

    session.go

    package main
    
    import (
    	"encoding/binary"
    	"io"
    	"net"
    )
    
    // 编写会话中的数据读写
    
    // 会话连接的结构体
    type Session struct {
    	conn net.Conn
    }
    
    // 创建新连接
    func NewSession(conn net.Conn) *Session  {
    	return &Session{conn: conn}
    }
    
    // 向连接中写数据
    func (s *Session) Write(data []byte) error  {
    	// 4 字节头 + 数据长度的切片
    	buf :=make([]byte, 4+len(data))
    	// 写入头部数据, 记录数据长度
    	// binary 只认固定长度类型, 因此使用了 uint32
    	binary.BigEndian.PutUint32(buf[:4],uint32(len(data)))
    	// 写入数据
    	copy(buf[4:], data)
    	// 连接写数据
    	_, err := s.conn.Write(buf)
    	if err != nil {
    		return err
    	}
    	return nil
    }
    
    // 从连接中读数据
    func (s *Session) Read() ([]byte,error)  {
    	// 读取头部长度
    	header := make([]byte,4)
    	// 按头部长度, 读取头部数据
    	_, err := io.ReadFull(s.conn, header)
    	if err != nil {
    		return nil,err
    	}
    	// 读取数据长度
    	dataLen := binary.BigEndian.Uint32(header)
    	// 按照数据长度去读取数据
    	data := make([]byte,dataLen)
    	_,err = io.ReadFull(s.conn,data)
    	if err != nil {
    		return nil,err
    	}
    	return data,nil
    }
    

    server.go

    package main
    
    import (
    	"fmt"
    	"net"
    	"reflect"
    )
    
    // 申明服务端
    type Server struct {
    	// 地址
    	addr string
    	// 服务端维护的函数名到函数反射值map
    	funcs map[string]reflect.Value
    }
    
    // 创建服务端对象
    func NewServer(addr string) *Server {
    	return &Server{addr: addr, funcs: make(map[string]reflect.Value)}
    }
    
    // 服务端绑定注册方法
    // 将函数名与函数真正实现对应起来
    // 第一个参数为函数名,第二个传入真正的函数
    func (s *Server) Register(rpcName string, f interface{}) {
    	if _, ok := s.funcs[rpcName]; ok {
    		return
    	}
    	// map 中没有值, 则将映射添加进map,便于调用
    	fVal := reflect.ValueOf(f)
    	s.funcs[rpcName] = fVal
    }
    
    // 服务端等待调用
    func (s *Server) Run() {
    	// 监听
    	lis, err := net.Listen("tcp", s.addr)
    	if err != nil {
    		fmt.Printf("监听 %s , err: %v", s.addr, err)
    	}
    	for {
    		// 拿到链接
    		conn, err := lis.Accept()
    		if err != nil {
    			fmt.Printf("accept err: %v", err)
    		}
    		// 创建会话
    		srvSession := NewSession(conn)
    		// RPC 读取数据
    		b, err := srvSession.Read()
    		if err != nil {
    			fmt.Printf("Read err: %v", err)
    		}
    		// 对数据解码
    		rpcData, err := decode(b)
    		if err != nil {
    			fmt.Printf("decode err: %v", err)
    			return
    		}
    		// 根据读取到的数据的Name, 得到调用的函数名
    		f, ok := s.funcs[rpcData.Name]
    		if !ok {
    			fmt.Printf("函数 %s 不存在", rpcData.Name)
    		}
    		// 解析遍历客户端出来的参数,放到一个数组中
    		inArgs := make([]reflect.Value, 0, len(rpcData.Args))
    		for _, arg := range rpcData.Args {
    			inArgs = append(inArgs, reflect.ValueOf(arg))
    		}
    		// 反射调用方法, 传入参数
    		out := f.Call(inArgs)
    		// 解析遍历执行结果,放到一个数组中
    		outArgs := make([]interface{}, 0, len(out))
    		for _, o := range out {
    			outArgs = append(outArgs, o)
    		}
    		// 包装数据返回给客户端
    		resRPCdata := RPCData{rpcData.Name, outArgs}
    		// 编码
    		respBytes, err := encode(resRPCdata)
    		if err != nil {
    			fmt.Printf("encode err:%v", err)
    			return
    		}
    		// 使用rpc写出数据
    		err = srvSession.Write(respBytes)
    		if err != nil {
    			fmt.Printf("encode err:%v", err)
    			return
    		}
    
    	}
    }
    

    client.go

    package main
    
    import (
    	"net"
    	"reflect"
    )
    
    // 声明客户端
    type Client struct {
    	conn net.Conn
    }
    
    // 创建客户端对象
    func NewClient(conn net.Conn) *Client {
    	return &Client{conn: conn}
    }
    
    // 实现通用的RPC 客户端
    // 绑定RPC 访问的方法
    // 传入访问的函数名
    
    // 函数的具体实现在Server端,Client 只有函数原型
    // 使用MakeFunc() 完成原型到函数的调用
    
    
    func (c *Client) callRPC(rpcName string,fPtr interface{})  {
    	// 通过反射, 获取fPt 未初始化的函数原型
    	fn := reflect.ValueOf(fPtr).Elem()
    	// 另外一个函数, 作用是对第一个函数参数操作
    	// 完成与Serve 端的交互
    	f := func(args []reflect.Value)[]reflect.Value {
    		// 处理输入的参数
    		inArgs := make([]interface{},0 , len(args))
    		for _,arg := range args{
    			inArgs = append(inArgs,arg.Interface())
    		}
    		// 创建连接
    		cliSession := NewSession(c.conn)
    		// 编码数据
    		reqRpc := RPCData{Name: rpcName,Args: inArgs}
    		b, err := encode(reqRpc)
    		if err != nil {
    			panic(err)
    		}
    		// 写出数据
    		err = cliSession.Write(b)
    		if err != nil {
    			panic(err)
    		}
    		// 读取响应数据
    		respBytes, err := cliSession.Read()
    		if err != nil {
    			panic(err)
    		}
    		// 解码数据
    		respRPC, err := decode(respBytes)
    		if err != nil {
    			panic(err)
    		}
    		// 处理服务端返回的数据
    		outArgs := make([]reflect.Value, 0, len(respRPC.Args))
    		for i, arg := range respRPC.Args{
    			// 必须进行 nil 转换,不然会报错
    			if arg == nil {
    				outArgs = append(outArgs, reflect.Zero(fn.Type().Out(i)))
    				continue
    			}
    			outArgs = append(outArgs,reflect.ValueOf(arg))
    		}
    		return outArgs
    	}
    	// 参数1: 一个未初始化函数的方法值,类型是reflect.Type
    	// 参数2: 另一个函数, 作用是对第一个函数参数操作
    	// 返回 reflect.Value 类型
    	// MakeFunc 使用传入函数原型,创建一个绑定 参数2 的新值
    	v := reflect.MakeFunc(fn.Type(), f)
    	// 为函数fPtr 赋值
    	fn.Set(v)
    }
    
  • 相关阅读:
    基于深度学习的KBQA方法(pipline)的一些思考(一)
    OMS1664_ OMS1654_ OMS1644_ OMS1634_马可尼 2.5G-10Gb/s
    风控特征的优化分箱,看看这样教科书的操作
    虚拟机初始化脚本, 虚拟机相互免秘钥
    【K8S专栏】Kubernetes工作负载管理
    第三阶段学习beiqi3
    【神印王座】陈樱儿假扮魔神皇,皓晨想杀人灭口,采儿施展禁制,月夜成功自保
    基于Springboot的代驾管理系统(有报告)。Javaee项目,springboot项目。
    NPM 包开发与优化全面指南
    Windows系统利用cpolar内网穿透搭建Zblog博客网站并实现公网访问内网!
  • 原文地址:https://blog.csdn.net/qq_55752792/article/details/127039051