tinyrpc项目是github上的一个开源项目,自定义了一个基于protocol-buffer的编码器。除此之外还自定义了数据的压缩方式(提高传输速度),自定义了请求头内容(提高了安全性),关于项目的讲解可查看原作者知乎文章用Go轻松实现一个高性能RPC
本篇文章主要从代码阅读角度出发,学习go的编程思想
下载项目后,项目根目录下的client.go和server.go文件是建立客户端和服务端时,分别需要用到的两个文件,我们先查看client.go
//调用该方法可生成一个客户端rpc,是本文将的和新方法
//第一个参数是一个连接服务端的返回值
//第二个参数是变长参数,用于指定客户度的编码和压缩方式,指定则使用方法中的默认值
func NewClient(conn io.ReadWriteCloser, opts ...Option) *Client {
options := options{
compressType: compressor.Raw,
serializer: serializer.Proto,
}
for _, option := range opts {
option(&options)
}
return &Client{rpc.NewClientWithCodec(
//创建一个指定压缩方式和编码方式的编码器
codec.NewClientCodec(conn, options.compressType, options.serializer))}
}
查看创建客户端编码器的文件,codec/client.go
客户端编码器的核心就是实现 下面三个方法
WriteRequest(r *rpc.Request, param interface{})
ReadResponseHeader(r *rpc.Response)
ReadResponseBody(param interface{})
//定义一个客户端编译器类型
type clientCodec struct {
r io.Reader //读取器
w io.Writer //写入器
c io.Closer //关闭器
compressor compressor.CompressType // 压缩器
serializer serializer.Serializer // 编译器
response header.ResponseHeader // 服务端发送的信息
mutex sync.Mutex // 锁
pending map[uint64]string //待处理任务
}
// NewClientCodec Create a new client codec
func NewClientCodec(conn io.ReadWriteCloser,
compressType compressor.CompressType, serializer serializer.Serializer) rpc.ClientCodec {
return &clientCodec{
//写入、读取、关闭其实就是对tcp连接的操作
//写入和读取时,如果分多次,使用缓冲区可提高效率,所以r、w使用conn建立了缓冲器
r: bufio.NewReader(conn),
w: bufio.NewWriter(conn),
c: conn,
compressor: compressType,
serializer: serializer,
//当客户端在一次连接中,多次发送请求,会把请求放入待处理任务中,循环执行
pending: make(map[uint64]string),
}
}
// 向tcp连接中写入编译、压缩后的信息
func (c *clientCodec) WriteRequest(r *rpc.Request, param interface{}) error {
c.mutex.Lock()
//向编码器待处理任务中插入任务
//因为map并发处理同一key时会报错,所以需要加锁,避免并发
//r.Seq是客户端rpc发起请求时生成的序列号。rpc服务端会自动接收该序列号
c.pending[r.Seq] = r.ServiceMethod
c.mutex.Unlock()
//检查压缩器是否存在
if _, ok := compressor.Compressors[c.compressor]; !ok {
return NotFoundCompressorError
}
//使用编译器加密传入的参数
reqBody, err := c.serializer.Marshal(param)
if err != nil {
return err
}
//使用压缩器压缩编译后的参数
compressedReqBody, err := compressor.Compressors[c.compressor].Zip(reqBody)
if err != nil {
return err
}
//从header池中取一个header对象
//这个header其实就是一个存储客户端要发送的信息内容和自定义结构的对象
h := header.RequestPool.Get().(*header.RequestHeader)
//使用完之后,把header初始化并放回header池
defer func() {
h.ResetHeader()
header.RequestPool.Put(h)
}()
//给header赋值
h.ID = r.Seq
h.Method = r.ServiceMethod
h.RequestLen = uint32(len(compressedReqBody))
h.CompressType = header.CompressType(c.compressor)
h.Checksum = crc32.ChecksumIEEE(compressedReqBody)
//把header加密后的字节流,写入tcp链接
if err := sendFrame(c.w, h.Marshal()); err != nil {
return err
}
//把压缩编译后的参数也写入tcp链接
if err := write(c.w, compressedReqBody); err != nil {
return err
}
//从缓冲器把数据push到 缓冲区对应的write中,即调用了conn的write方法,把数据写入了tcp链接
c.w.(*bufio.Writer).Flush()
return nil
}
// 读取rpc返回的信息,并进行初步的基于header的解析。处理待处理任务和响应信息
// 若读取或解析失败,表示通讯未通过安全验证
func (c *clientCodec) ReadResponseHeader(r *rpc.Response) error {
c.response.ResetHeader()
//从编码器的读取器中,把返回字节流读取到data
//recvFrame方法只会取读取器中header的加密数据
data, err := recvFrame(c.r)
if err != nil {
return err
}
//使用hedaer解析自定义结构的字节流
err = c.response.Unmarshal(data)
if err != nil {
return err
}
//加锁,依次从待执行任务map中执行任务,避免冲突
c.mutex.Lock()
r.Seq = c.response.ID
r.Error = c.response.Error
//把修改当前请求对应的方法名
r.ServiceMethod = c.pending[r.Seq]
//执行完后删除当前任务
delete(c.pending, r.Seq)
c.mutex.Unlock()
return nil
}
//读取并解析服务端rpc返回的处理结果
//方法中的参数会传入一个指针类型的值,用于保存返回结果
func (c *clientCodec) ReadResponseBody(param interface{}) error {
if param == nil {
if c.response.ResponseLen != 0 {
if err := read(c.r, make([]byte, c.response.ResponseLen)); err != nil {
return err
}
}
return nil
}
//上面方法从读取器取走了header信息,只剩下响应信息,直接读取即可
respBody := make([]byte, c.response.ResponseLen)
err := read(c.r, respBody)
if err != nil {
return err
}
//该方法会在上个方法后执行,所以不需要重复进行header解码
//比较冗余校验值,验证数据是否被篡改
if c.response.Checksum != 0 {
if crc32.ChecksumIEEE(respBody) != c.response.Checksum {
return UnexpectedChecksumError
}
}
if _, ok := compressor.Compressors[c.response.GetCompressType()]; !ok {
return NotFoundCompressorError
}
//解压数据
resp, err := compressor.Compressors[c.response.GetCompressType()].Unzip(respBody)
if err != nil {
return err
}
//解码数据并赋值
return c.serializer.Unmarshal(resp, param)
}
func (c *clientCodec) Close() error {
return c.c.Close()
}
通过以上代码我们看到,除了压缩和编码外,还有一个基于header包的编码和解码过程。这个header其实就是该项目设计的自定义请求头,用于校验数据是否被篡改,同时也加了一层编译,提高了安全性。
下面我们看下header/header.go代码
请求和响应因为内容不同,所以设置了不同的结构体,其实为了简洁也可以使用一个
我们接下来主要分析下请求头的结构和加密解密
const (
// MaxHeaderSize = 2 + 10 + 10 + 10 + 4 (10 refer to binary.MaxVarintLen64)
MaxHeaderSize = 36
Uint32Size = 4
Uint16Size = 2
)
var UnmarshalError = errors.New("an error occurred in Unmarshal")
// CompressType type of compressions supported by rpc
type CompressType uint16
// RequestHeader request header structure looks like:
// +--------------+----------------+----------+------------+----------+
// | CompressType | Method | ID | RequestLen | Checksum |
// +--------------+----------------+----------+------------+----------+
// | uint16 | uvarint+string | uvarint | uvarint | uint32 |
// +--------------+----------------+----------+------------+----------+
type RequestHeader struct {
sync.RWMutex
CompressType CompressType
Method string
ID uint64
RequestLen uint32
Checksum uint32
}
// 加密方式其实就是把结构体中的字段+method长度,依次插入一个字节切片中
func (r *RequestHeader) Marshal() []byte {
r.RLock()
defer r.RUnlock()
idx := 0
// MaxHeaderSize = 2 + 10 + len(string) + 10 + 10 + 4
header := make([]byte, MaxHeaderSize+len(r.Method))
binary.LittleEndian.PutUint16(header[idx:], uint16(r.CompressType))
idx += Uint16Size
idx += writeString(header[idx:], r.Method)
idx += binary.PutUvarint(header[idx:], r.ID)
idx += binary.PutUvarint(header[idx:], uint64(r.RequestLen))
binary.LittleEndian.PutUint32(header[idx:], r.Checksum)
idx += Uint32Size
return header[:idx]
}
// 解码就是按照插入的顺序,依次从切片中读取相关字段
func (r *RequestHeader) Unmarshal(data []byte) (err error) {
r.Lock()
defer r.Unlock()
if len(data) == 0 {
return UnmarshalError
}
defer func() {
if r := recover(); r != nil {
err = UnmarshalError
}
}()
idx, size := 0, 0
r.CompressType = CompressType(binary.LittleEndian.Uint16(data[idx:]))
idx += Uint16Size
r.Method, size = readString(data[idx:])
idx += size
r.ID, size = binary.Uvarint(data[idx:])
idx += size
length, size := binary.Uvarint(data[idx:])
r.RequestLen = uint32(length)
idx += size
r.Checksum = binary.LittleEndian.Uint32(data[idx:])
return
}
以上分析了该项目客户端相关的代码,服务端代码与客户端略有不同,但流程都是相同的