• Golang net/http 标准库源码学习


    CS架构

    http协议下,交互框架是由客户端Client和服务端Server两块组成的cs架构

    request
    response
    client
    Server

    net/http标准库启动http服务

    // 注册对应的请求地址 /ping 的 HandleFunc
    http.HandleFunc("/ping",func{})
    // 启动端口8091的http服务,监听请求
    http.ListAndServe(":8091",nil)
    
    • 1
    • 2
    • 3
    • 4

    http标准库发送请求

    reqBody, _ := json.Marshal(map[string]string{"key1": "val1", "key2": "val2"})
    resp, _ := http.Post(":8091", "application/json", bytes.NewReader(reqBody))
    defer resp.Body.Close()
    respBody, _ := io.ReadAll(resp.Body)
    
    • 1
    • 2
    • 3
    • 4

    根据以上的两段代码作为切入点,研究一下net/http标准库。

    源码位置

    模块文件
    服务端net/http/server.go
    客户端-主流程net/http/client.go
    客户端-构造请求net/http/request.go
    客户端-网络交互net/http/transport.go

    服务端 Server.go 解析

    整个 http 服务端模块封装在 Server 中,Handler 是最核心的成员字段,实现了 path 到处理函数 handleFunc 的映射。如果构造器没有显式的声明这个,则由 DefaultServeMux 默认来实现。

    Server 结构体

    type Server struct {
        // server 的地址
        Addr string
        // 路由处理器.
        Handler Handler // handler to invoke, http.DefaultServeMux if nil
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    Handler

    Handler,是一个 interface,定义了方法ServeHTTP。方法的作用是根据http请求request中的请求路径path,映射到对应的handler处理函数,对请求进行处理和响应。

    type Handler interface {
        ServeHTTP(ResponseWriter, *Request)
    }
    
    • 1
    • 2
    • 3
    DafultServeMux

    类型是ServeMux,是对Handler的一个实现,内部通过map维护了从path到handler的映射关系。

    type ServeMux struct {
    	// 锁?
        mu sync.RWMutex
        // 存储映射的 map
        m map[string]muxEntry
        es []muxEntry // slice of entries sorted from longest to shortest.
        hosts bool // whether any patterns contain hostnames
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    muxEntry 为一个handler单元,即 path + handler两部分
    type muxEntry struct {
    	// 处理方法
        h Handler
        // 请求地址
        pattern string 
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    由此可以得出,映射在Server中保存的数据结构是
    保存
    handler = muxEntry 地址方法映射关系
    ServerMux = map 类型保存 地址和handler = DafultServeMux = Handler
    Handler = 请求处理方法
    pattern = 请求地址
    注册handler

    在 net/http 包下声明了一个单例 ServeMux,当用户直接通过公开方法 http.HandleFunc 注册 handler 时,则会将其注册到 DefaultServeMux 当中。

    // 默认map,存储handler映射信息
    var DefaultServeMux = &defaultServeMux
    var defaultServeMux ServeMux
    // handlerFunc 注册方法,传入path跟handlerfunc
    func HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
    	// 内部调用的是默认map的handleFunc
        DefaultServeMux.HandleFunc(pattern, handler)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    在 ServeMux.HandleFunc 内部会将处理函数 handler 转为实现了 ServeHTTP 方法的 HandlerFunc 类型,将其作为 Handler interface 的实现类注册到 ServeMux 的路由 map 当中。

    // 高阶函数 方法类型
    type HandlerFunc func(ResponseWriter, *Request)
    
    // HandlerFunc 方法类型也实现了 ServeHTTP 即 Handler
    func (f HandlerFunc) ServeHTTP(w ResponseWriter, r *Request) {
        f(w, r)
    }
    
    // ServeMux 的 HandleFunc , http.HandleFunc 中调用
    func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {
        // ...
        // 强转为HandleFunc,也是一个Handler了
        mux.Handle(pattern, HandlerFunc(handler))
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    实现路由注册的核心逻辑位于 ServeMux.Handle 方法中,两个核心逻辑值得一提:

    • 将 path 和 handler 包装成一个 muxEntry,以 path 为 key 注册到路由 map ServeMux.m 中
    • 响应模糊匹配机制. 对于以 ‘/’ 结尾的 path,根据 path 长度将 muxEntry 有序插入到数组 ServeMux.es 中.
    func (mux *ServeMux) Handle(pattern string, handler Handler) {
    	// map 上锁
        mux.mu.Lock()
        // 默认解锁
        defer mux.mu.Unlock()
        // ...
    	// 创建 muxEntry 保存
        e := muxEntry{h: handler, pattern: pattern}
        // 将pattern作为key存到serveMux中
        mux.m[pattern] = e
        // 结尾为 / 的判断
        if pattern[len(pattern)-1] == '/' {
            mux.es = appendSorted(mux.es, e)
        }
        // ...
    }
    
    // 结尾为/的操作方法
    func appendSorted(es []muxEntry, e muxEntry) []muxEntry {
    	// 最长到最短数组长度
        n := len(es)
        // 根据path的长度排序
        i := sort.Search(n, func(i int) bool {
            return len(es[i].pattern) < len(e.pattern)
        })
        // 长度最小的情况
        if i == n {
            return append(es, e)
        }
        // 将muxEntry根据长度插入最长到最短的数组中去
        es = append(es, muxEntry{}) // try to grow the slice in place, any entry works.
        copy(es[i+1:], es[i:])      // Move shorter entries down
        es[i] = e
        return es
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    总结插入逻辑处理
    1. http.HandleFunc() 调用的是默认实现类的DefaultServeMux.HandleFunc()
    2. DefaultServeMux.HandleFunc(),将handle转换为实现了 ServeHTTP 的 HandleFunc 类型
    3. 通过serveMux的Handle()方法进行插入,key为path
    4. 如果结尾为 / 的地址则通过长度进行排序
    服务启动 http.ListenAndServe()

    调用 net/http 包下的公开方法 ListenAndServe,可以实现对服务端的一键启动. 内部会声明一个新的 Server 对象,嵌套执行 Server.ListenAndServe 方法

    func ListenAndServe(addr string, handler Handler) error {
    	// 创建Server
        server := &Server{Addr: addr, Handler: handler}
        // 执行Server的ListenAndServe
        return server.ListenAndServe()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Server.ListenAndServe 方法中,根据用户传入的端口,申请到一个监听器 listener,继而调用 Server.Serve 方法

    func (srv *Server) ListenAndServe() error {
        // ...
        addr := srv.Addr
        if addr == "" {
            addr = ":http"
        }
        // 申请监听器
        ln, err := net.Listen("tcp", addr)
        // ...    
        // 调用Server.Serve方法
        return srv.Serve(ln)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Server.Serve 方法很核心,体现了 http 服务端的运行架构:for + listener.accept 模式

    • 将 server 封装成一组 kv 对,添加到 context 当中
    • 开启 for 循环,每轮循环调用 Listener.Accept 方法阻塞等待新连接到达
    • 每有一个连接到达,创建一个 goroutine 异步执行 conn.serve 方法负责处理
    var ServerContextKey = &contextKey{"http-server"}
    
    type contextKey struct {
        name string
    }
    
    func (srv *Server) Serve(l net.Listener) error {
       // ...
       // 写入kv
       ctx := context.WithValue(baseCtx, ServerContextKey, srv)
        for {
        	// 监听请求
            rw, err := l.Accept()
            // ...
            connCtx := ctx
            // ...
            // handler处理器
            c := srv.newConn(rw)
            // ...
            go c.serve(connCtx)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22

    conn.serve 是响应客户端连接的核心方法:

    • 从 conn 中读取到封装到 response 结构体,以及请求参数 http.Request
    • 调用 serveHandler.ServeHTTP 方法,根据请求的 path 为其分配 handler
    • 通过特定 handler 处理并响应请求
     func (c *conn) serve(ctx context.Context) {
        // ...
        c.r = &connReader{conn: c}
        c.bufr = newBufioReader(c.r)
        c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)
        for {
            w, err := c.readRequest(ctx)
            // ...
            // 根据request查找
            serverHandler{c.server}.ServeHTTP(w, w.req)
            w.cancelCtx()
            // ...
        }
    } 
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    在 serveHandler.ServeHTTP 方法中,会对 Handler 作判断,倘若其未声明,则取全局单例 DefaultServeMux 进行路由匹配

    func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {
        handler := sh.srv.Handler
        if handler == nil {
            handler = DefaultServeMux
        }
        // ...
        // 调用自己
        handler.ServeHTTP(rw, req)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    接下来,兜兜转转依次调用 ServeMux.ServeHTTP、ServeMux.Handler、ServeMux.handler 等方法,最终在 ServeMux.match 方法中,以 Request 中的 path 为 pattern,在路由字典 Server.m 中匹配 handler,最后调用 handler.ServeHTTP 方法进行请求的处理和响应.

    func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {
        // ...
        h, _ := mux.Handler(r)
        h.ServeHTTP(w, r)
    }
    
    func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {
        // ...
        return mux.handler(host, r.URL.Path)
    }
    
    func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {
    	// 加锁
        mux.mu.RLock()
        // 默认释放
        defer mux.mu.RUnlock()
        
        // ...
        // 查找,根据地址key
        h, pattern = mux.match(path)
        // ...
        return
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    值得一提的是,当通过路由字典 Server.m 未命中 handler 时,此时会启动模糊匹配模式,两个核心规则如下:

    • 以 ‘/’ 结尾的 pattern 才能被添加到 Server.es 数组中,才有资格参与模糊匹配
    • 模糊匹配时,会找到一个与请求路径 path 前缀完全匹配且长度最长的 pattern,其对应的handler 会作为本次请求的处理函数.
    func (mux *ServeMux) match(path string) (h Handler, pattern string) {
        v, ok := mux.m[path]
        if ok {
            return v.h, v.pattern
        }
    
        // ServeMux.es 本身是按照 pattern 的长度由大到小排列的
        for _, e := range mux.es {
            if strings.HasPrefix(path, e.pattern) {
                return e.h, e.pattern
            }
        }
        return nil, ""
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    客户端 Client.go 解析

    使用Client封装了整个模块,结构体如下

    • Transport:负责 http 通信的核心部分,也是接下来的讨论重点
    • Jar:cookie 管理
    • Timeout:超时设置
    type Client struct {
        // ...
        Transport RoundTripper
        // ...
        Jar CookieJar
        // ...
        Timeout time.Duration
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    RoundTripper

    RoundTripper 是通信模块的 interface,需要实现方法 Roundtrip,即通过传入请求 Request,与服务端交互后获得响应 Response.

    type RoundTripper interface {
        RoundTrip(*Request) (*Response, error)
    }
    
    • 1
    • 2
    • 3
    Transport

    Tranport 是 RoundTripper 的实现类,核心字段包括:

    • idleConn:空闲连接 map,实现复用
    • DialContext:新连接生成器
    type Transport struct {
        idleConn     map[connectMethodKey][]*persistConn // most recently used at end
        // ...
        DialContext func(ctx context.Context, network, addr string) (net.Conn, error)
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    Request

    http 请求参数结构体.

    type Request struct {
        // 方法
        Method string
        // 请求路径
        URL *url.URL
        // 请求头
        Header Header
        // 请求参数内容
        Body io.ReadCloser
        // 服务器主机
        Host string
        // query 请求参数
        Form url.Values
        // 响应参数 struct
        Response *Response
        // 请求链路的上下文
        ctx context.Context
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    Response

    http 响应参数结构体.

    type Response struct {
        // 请求状态,200 为 请求成功
        StatusCode int    // e.g. 200
        // http 协议,如:HTTP/1.0
        Proto      string // e.g. "HTTP/1.0"
        // 请求头
        Header Header
        // 响应参数内容  
        Body io.ReadCloser
        // 指向请求参数
        Request *Request
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    客户端发起请求

    reqBody, _ := json.Marshal(map[string]string{"key1": "val1", "key2": "val2"})
    resp, _ := http.Post(":8091", "application/json", bytes.NewReader(reqBody))
    defer resp.Body.Close()
    respBody, _ := io.ReadAll(resp.Body)
    
    • 1
    • 2
    • 3
    • 4

    客户端发起一次 http 请求大致分为几个步骤:

    • 构造 http 请求参数
    • 获取用于与服务端交互的 tcp 连接
    • 通过 tcp 连接发送请求参数
    • 通过 tcp 连接接收响应结果
    POST

    调用 net/http 包下的公开方法 Post 时,需要传入服务端地址 url,请求参数格式 contentType 以及请求参数的 io reader.
    方法中会使用包下的单例客户端 DefaultClient 处理这次请求.

    // 单例客户端,处理请求的
    var DefaultClient = &Client{}
    // 实际的post方法,需要传入端口号。请求参数等信息
    func Post(url, contentType string, body io.Reader) (resp *Response, err error) {
    	// 调用的是默认Client的post实现
        return DefaultClient.Post(url, contentType, body)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    在 Client.Post 方法中,首先会结合用户的入参,构造出完整的请求参数 Request;继而通过 Client.Do 方法,处理这笔请求.

    func (c *Client) Post(url, contentType string, body io.Reader) (resp *Response, err error) {
    	// 构造请求参数
        req, err := NewRequest("POST", url, body)
        // ...
        // 设置请求头
        req.Header.Set("Content-Type", contentType)
        // 处理请求
        return c.Do(req)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    构造请求参数 NewRequest

    NewRequestWithContext 方法中,根据用户传入的 url、method等信息,构造了 Request 实例.

    func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {
        // ...
        u, err := urlpkg.Parse(url)
        // ...
        rc, ok := body.(io.ReadCloser)
        // ...
        req := &Request{
            ctx:        ctx,
            Method:     method,
            URL:        u,
            // ...
            Header:     make(Header),
            Body:       rc,
            Host:       u.Host,
        }
        // ...
        return req, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    Client.Do

    流程解析:简易理解为方法的封装,并使用的公有方法调用私有方法

    Client.Do
    Client.do
    Client.send
    send

    如Client.Do方法其实是调用私有的do方法

    func (c *Client) Do(req *Request) (*Response, error) {
        return c.do(req)
    }
    
    func (c *Client) do(req *Request) (retres *Response, reterr error) {
    	// 设置生命周期
        var (
            deadline      = c.deadline()
            resp          *Response
            // ...
        )    
        for {
            // ...
            var err error       
            if resp, didTimeout, err = c.send(req, deadline); err != nil {
                // ...
            }
            // ...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在 Client.send 方法中,会在通过 send 方法发送请求之前和之后,分别对 cookie 进行更新。

    func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
        // 设置 cookie 到请求头中,此处的Cookies是自己创建的
        if c.Jar != nil {
            for _, cookie := range c.Jar.Cookies(req.URL) {
                req.AddCookie(cookie)
            }
        }
        // 发送请求
        resp, didTimeout, err = send(req, c.transport(), deadline)
        if err != nil {
            return nil, didTimeout, err
        }
        // 更新 resp 的 cookie 到请求头中,此处的cokkie是响应带的。
        if c.Jar != nil {
            if rc := resp.Cookies(); len(rc) > 0 {
                c.Jar.SetCookies(req.URL, rc)
            }
        }
        return resp, nil, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20

    在调用 send 方法时,需要注入 RoundTripper 模块,默认会使用全局单例 DefaultTransport 进行注入,核心逻辑位于 Transport.RoundTrip 方法中,其中分为两个步骤:

    • 获取/构造 tcp 连接
    • 通过 tcp 连接完成与服务端的交互
    var DefaultTransport RoundTripper = &Transport{
        // ...
        // 创建连接
        DialContext: defaultTransportDialContext(&net.Dialer{
            Timeout:   30 * time.Second,
            KeepAlive: 30 * time.Second,
        }),
        // ...
    }
    
    // 如果没有设置Transport,则会使用默认的DefaultTransport
    func (c *Client) transport() RoundTripper {
        if c.Transport != nil {
            return c.Transport
        }
        return DefaultTransport
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17

    send源码

    // 最终调用send(req, c.transport(), deadline)
    // 合并的request,发送请求的transport,生命周期deadline
    func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
        // ...
        // 使用transport的RoundTrip方法,实际调用的是私有roundTrip
        resp, err = rt.RoundTrip(req)
        // ...
        return resp, nil, nil
    }
    
    func (t *Transport) RoundTrip(req *Request) (*Response, error) {
    	// 公有方法封装私有方法,大写封装小写
        return t.roundTrip(req)
    }
    
    // 实际发送请求的方法
    func (t *Transport) roundTrip(req *Request) (*Response, error) {
        // ...
        for {          
            // ...    
            // 对请求数据进行封装
            treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}      
            // ...
            // 获取TCP连接
            pconn, err := t.getConn(treq, cm)        
            // ...
            // 发送请求并接收响应,此处创建了两个线程,一个是发送客户端请求的数据,还有一个是读取服务器返回的数据
            resp, err = pconn.roundTrip(treq)          
            // ...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    Transport.getConn

    获取 tcp 连接的策略分为两步:

    • 通过 queueForIdleConn 方法,尝试复用采用相同协议、访问相同服务端的空闲连接
    • 倘若无可用连接,则通过 queueForDial 方法,异步创建一个新的连接,并通过接收 ready channel 信号的方式,确认构造连接的工作已经完成.
    func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
        // 获取连接的请求参数体
        w := &wantConn{
            cm:         cm,
            // key 由 http 协议、服务端地址等信息组成
            key:        cm.key(),
            ctx:        ctx,
            // 标识连接构造成功的信号发射器,创建读channel,用来读取数据
            ready:      make(chan struct{}, 1),
        }
        // 倘若连接获取失败,在 wantConn.cancel 方法中,会尝试将 tcp 连接放回队列中以供后续复用
        defer func() {
            if err != nil {
                w.cancel(t, err)
            }
        }()
        // 尝试复用指向相同服务端地址的空闲连接,复用底层逻辑后续会说到
        if delivered := t.queueForIdleConn(w); delivered {
            pc := w.pc
            // ...
            return pc, nil
        }
        // 异步构造新的连接,复用失败,创建新的连接
        t.queueForDial(w)
        select {
        // 通过阻塞等待信号的方式,等待连接获取完成
        case <-w.ready:
            // ...
            // 后续tryDliver会将构造成功数据放回到这里
            return w.pc, w.err
        // ...
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    1. 复用连接
      • 尝试从 Transport.idleConn 中获取指向同一服务端的空闲连接 persisConn
      • 获取到连接后会调用 wantConn.tryDeliver 方法将连接绑定到 wantConn 请求参数上
      • 绑定成功后,会关闭 wantConn.ready channel,以唤醒阻塞读取该 channel 的 goroutine
    func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {
        // ...
        // 获取连接池中的空闲连接
        if list, ok := t.idleConn[w.key]; ok {
            // ...
            // 轮询读取
            for len(list) > 0 && !stop {
            	// 获取连接池中最后一个连接
                pconn := list[len(list)-1]
                // ...
                // 此处绑定空闲连接到wantConn
                delivered = w.tryDeliver(pconn, nil)
                // 返回值成功,将连接池最后的连接从连接池中删除
                if delivered {
                    // ...
                    list = list[:len(list)-1]               
                }
                // 停止轮询
                stop = true
            }
            // ...
            if stop {
            	// 返回构造成功
                return delivered
            }
        }
       
        // ...    
        return false
    }
    
    // 绑定空闲连接
    func (w *wantConn) tryDeliver(pc *persistConn, err error) bool {
        w.mu.Lock()
        defer w.mu.Unlock()
        // ...
        w.pc = pc
        w.err = err
        // ...
        // 关闭构造成功发射器,调用函数获取到构造成功消息
        close(w.ready)
        return true
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    1. 创建连接
      在 queueForDial 方法会异步调用 Transport.dialConnFor 方法,创建新的 tcp 连接. 由于是异步操作,所以在上游会通过读 channel 的方式,等待创建操作完成。
      这里之所以采用异步操作进行连接创建,有两部分原因:
    • 一个 tcp 连接并不是一个静态的数据结构,它是有生命周期的,创建过程中会为其创建负责读写的两个守护协程,伴随而生
    • 在上游 Transport.queueForIdleConn 方法中,当通过 select 多路复用的方式,接收到其他终止信号时,可以提前调用 wantConn.cancel 方法打断创建连接的 goroutine. 相比于串行化执行而言,这种异步交互的模式,具有更高的灵活度
    func (t *Transport) queueForDial(w *wantConn) {
        // ...
        // 异步调用创建连接的方法
        go t.dialConnFor(w) 
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Transport.dialConnFor 方法中,首先调用 Transport.dialConn 创建 tcp 连接 persisConn,接着执行 wantConn.tryDeliver 方法,将连接绑定到 wantConn 上,然后通过关闭 ready channel 操作唤醒上游读 ready channel 的 goroutine。(跟连接复用的方式相似)

    func (t *Transport) dialConnFor(w *wantConn) {
        // ...
        pc, err := t.dialConn(w.ctx, w.cm)
        delivered := w.tryDeliver(pc, err)
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    Transport.dialConn 方法包含了创建连接的核心逻辑:

    • 调用 Transport.dial 方法,最终通过 Tranport.DialContext 成员函数,创建好 tcp 连接,封装到 persistConn 当中
    • 异步启动连接的伴生读写协程 readLoop 和 writeLoop 方法,组成提交请求、接收响应的循环
    func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {
    	// 创建两个伴生线程,分别掌管读写
        pconn = &persistConn{
            t:             t,
            reqch:         make(chan requestAndChan, 1),
            writech:       make(chan writeRequest, 1),
            // ...
        }
        
        // 添加地址等信息
        conn, err := t.dial(ctx, "tcp", cm.addr())
        // ...
        pconn.conn = conn      
        // ...
        // 异步运行两个线程,监听处理信息
        go pconn.readLoop()
        go pconn.writeLoop()
        return pconn, nil
    }
    
    func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) {
        // ...
        // 创建好 tcp 连接,封装到 persistConn 当中
        return t.DialContext(ctx, network, addr)
        // ...
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 在伴生读协程 persisConn.readLoop 方法中,会读取来自服务端的响应,并添加到 persistConn.reqCh 中,供上游 persistConn.roundTrip 方法接收。
    • 在伴生协协程 persisConn.writeLoop方法中,会通过 persistConn.writech 读取到客户端提交的请求,然后将其发送到服务端。
    func (pc *persistConn) readLoop() { 
        // ...
        alive := true
        // 连接存在
        for alive {
            // ...
            // 读取服务器响应
            rc := <-pc.reqch
            // ...
            var resp *Response
            // ...
            // 将响应写入
            resp, err = pc.readResponse(rc, trace)
            // ...
            select{
            	// 传送给客户端
                rc.ch <- responseAndError{res: resp}:
                // ...
            }
            // ...        
        }
    }
    
    func (pc *persistConn) writeLoop() {
    // 轮询读取    
        for {
            select {
            case wr := <-pc.writech:
                // ...
                // 发送请求
                err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))
                // ...       
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    1. 归还连接
      有复用连接的能力,就必然存在归还连接的机制.
      首先,在构造新连接中途,倘若被打断,则可能会将连接放回队列以供复用:
    // getConn 中的方法,前面有看过
    func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {
        // ...
        // 倘若连接获取失败,在 wantConn.cancel 方法中,会尝试将 tcp 连接放回队列中以供后续复用
        defer func() {
            if err != nil {
                w.cancel(t, err)
            }
        }()
        // ...
    }
    
    // 判断persistConn存在,执行存放方法
    func (w *wantConn) cancel(t *Transport, err error) {
       // ...
        if pc != nil {
            t.putOrCloseIdleConn(pc)
        }
    }
    
    func (t *Transport) tryPutIdleConn(pconn *persistConn) error {
        // ...
        // 获取连接在内存中的地址?
        key := pconn.cacheKey
        // ...
        // 将连接地址当key存入连接池中
        t.idleConn[key] = append(idles, pconn)
        // ...
        return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    其次,倘若与服务端的一轮交互流程结束,也会将连接放回队列以供复用.

    func (pc *persistConn) readLoop() {
    	// 服务器断开连接后
        tryPutIdleConn := func(trace *httptrace.ClientTrace) bool {
            if err := pc.t.tryPutIdleConn(pc); err != nil {
                // ...
            }
            // ...
        }
        
        // ...
        // 复用选项为 true
        alive := true
        for alive {
            // ...
            select {
            case bodyEOF := <-waitForBodyRead:
                // ...
                tryPutIdleConn(trace)
                // ...
            }           
        }
        
    }
    
    // 过期选项,长时间没有使用连接后调用
    func (t *Transport) putOrCloseIdleConn(pconn *persistConn) {
        if err := t.tryPutIdleConn(pconn); err != nil {
            pconn.close(err)
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    persistConn.roundTrip

    一个连接 persistConn 是一个具有生命特征的角色. 它本身伴有 readLoop 和 writeLoop 两个守护协程,与上游应用者之间通过 channel 进行读写交互。而其中扮演应用者这一角色的persistConn.roundTrip:

    • 首先将 http 请求通过 persistConn.writech 发送给连接的守护协程 writeLoop,并进一步传送到服务端
    • 其次通过读取 resc channel,接收由守护协程 readLoop 代理转发的客户端响应数据.
    func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {
        // ...
        // 写数据,是发送数据。将请求写入writech。写线程读取到数据后会发送请求给服务器
        pc.writech <- writeRequest{req, writeErrCh, continueCh}
        resc := make(chan responseAndError)
        // 获取服务器返回数据
        pc.reqch <- requestAndChan{
            req:        req.Request,
            cancelKey:  req.cancelKey,
            ch:         resc,
            // ...
        }
        // ...
        // 阻塞等待,获取res
        for {       
            select {
            // ...
            case re := <-resc:
                // ...
                return re.res, nil
            // ...
            }
        }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24

    在这里总结一下客户端Client.go的设计

    1. 封装数据
      1. Transport,负责http的通信。其中主要的功能是连接池,连接复用和send方法
      2. CookieJar
      3. Transport 的类型是RoundTripper 是通信模块的接口,需要实现方法RoundTrip(实际是向写channel写入数据,通过写接口去向服务器发送数据。还有读取readchannel的数据,传递给client)
    2. Request 请求参数体,封装了Method,路径,请求头,请求参数Body,主机,响应resp和上下文等
    3. Response 响应参数,封装了响应Code,请求头,响应参数Body还有请求参数Request的指针
    客户端发送请求的流程
    • 构造请求参数(NewRequest)
    • 获取服务器的TCP连接(Transport.getConn)
    • 通过tcp发送请求(persistConn.roundTrip)
    • tcp获取响应结果(persistConn.roundTrip)
    客户端发送请求
    Default.Post\Client.Post
    Send http request
    post
    NewRequest创建请求参数
    Client.Do发送信息开始流程
    Transport.RoundTrip获取tcp连接
    Transport.dialConn创建伴生线程read和write
    persistConn.roundTrip
    Client.do
    Client.Send设置Cookie
    send
  • 相关阅读:
    多尺度retinex图像去雾算法matlab仿真
    移动互联网时代,如何撬开流量的密码?
    在中国可以使用 HubSpot 吗?
    【线性代数】P4 行列式相乘+范德蒙德行列式+克莱姆法则 cramer
    MySQL 视图(详解)
    【论文阅读 CIKM‘2021】Learning Multiple Intent Representations for Search Queries
    Linux执行脚本报错:-bash: ./bin/start.sh: /bin/bash^M: 坏的解释器: 没有那个文件或目录
    洋码头API接口
    Vue项目发布到linux
    Linux基础知识笔记
  • 原文地址:https://blog.csdn.net/HideonHacker/article/details/137980518