• ETCD数据库源码分析——集群间网络层服务端接口


    从上一篇文章ETCD数据库源码分析——集群通信初始化我们知道:

    • 集群通信服务端会调用configurePeerListeners函数为配置Config中LPUrls每个url创建一个peerListener,该函数为初始化peerListener结构体会调用transport.NewListenerWithOpts函数创建net.Listener。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。
    • 集群通信服务端会调用servePeers函数启动服务。servePeers函数会创建协程运行cmux.New(p.Listener).Serve()函数。
    • 集群通信服务端会调用servePeers函数启动服务。servePeers函数首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。servePeers函数会创建协程运行&http.Server{Handler: ph, }.Serve(m.Match(cmux.Any()))

    这篇文章我们来学习一下transport.NewListenerWithOpts函数和pipelineHandler、streamHandler、snapHandler。

    NewListenerWithOpts函数

    Listener的初始化代码如下所示,transport.NewListenerWithOpts函数代码位于client/pkg/transport/listener.go文件中,作为transport包内export到外部的函数。

    		peers[i] = &peerListener{close: func(context.Context) error { return nil }}
    		peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme,
    			transport.WithTLSInfo(&cfg.PeerTLSInfo),
    			transport.WithSocketOpts(&cfg.SocketOpts),
    			transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),
    		)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6

    client/pkg/transport/listener.go文件export两个函数给其他代码调用。NewListener函数用于不带选项时使用,从调用看两个函数并没有本质的不同,最终都是会调用client/pkg/transport/listener.go文件中的newListener函数。

    // NewListener creates a new listner.
    func NewListener(addr, scheme string, tlsinfo *TLSInfo) (l net.Listener, err error) {
    	return newListener(addr, scheme, WithTLSInfo(tlsinfo))
    }
    // NewListenerWithOpts creates a new listener which accepts listener options.
    func NewListenerWithOpts(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
    	return newListener(addr, scheme, opts...)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    newListener函数比较负责,会根据不同的协议和选项调用不同的函数产生不同的Listener。针对unix或unixs协议调用NewUnixListener函数(定义在client/pkg/transport/unix_listener.go文件);通过listen选项(相关代码位于client/pkg/transport/listener_opts.go文件)来设定监听配置;rwTimeoutListener定义在client/pkg/transport/timeout_listener.go文件中;NewTLSListener定义在client/pkg/transport/listener_tls.go中。其实万变不离其宗net.Listener GO包。

    func newListener(addr, scheme string, opts ...ListenerOption) (net.Listener, error) {
    	if scheme == "unix" || scheme == "unixs" { // unix sockets via unix://laddr		
    		return NewUnixListener(addr)
    	}
    
    	lnOpts := newListenOpts(opts...)  // listen选项相关代码位于client/pkg/transport/listener_opts.go文件
    	switch {
    	case lnOpts.IsSocketOpts():	
    		config, err := newListenConfig(lnOpts.socketOpts) // new ListenConfig with socket options.
    		if err != nil { return nil, err }
    		lnOpts.ListenConfig = config
    		// check for timeout
    		fallthrough
    	case lnOpts.IsTimeout(), lnOpts.IsSocketOpts():	
    		ln, err := lnOpts.ListenConfig.Listen(context.TODO(), "tcp", addr) // timeout listener with socket options.
    		if err != nil { return nil, err }
    		lnOpts.Listener = &rwTimeoutListener{
    			Listener:     ln,
    			readTimeout:  lnOpts.readTimeout,
    			writeTimeout: lnOpts.writeTimeout,
    		}
    	case lnOpts.IsTimeout():
    		ln, err := net.Listen("tcp", addr)
    		if err != nil { return nil, err }
    		lnOpts.Listener = &rwTimeoutListener{
    			Listener:     ln,
    			readTimeout:  lnOpts.readTimeout,
    			writeTimeout: lnOpts.writeTimeout,
    		}
    	default:
    		ln, err := net.Listen("tcp", addr)
    		if err != nil { return nil, err }
    		lnOpts.Listener = ln
    	}
    
    	//  only skip if not passing TLSInfo
    	if lnOpts.skipTLSInfoCheck && !lnOpts.IsTLS() { return lnOpts.Listener, nil }
    	return wrapTLS(scheme, lnOpts.tlsInfo, lnOpts.Listener)
    }
    func wrapTLS(scheme string, tlsinfo *TLSInfo, l net.Listener) (net.Listener, error) {
    	if scheme != "https" && scheme != "unixs" { return l, nil }
    	if tlsinfo != nil && tlsinfo.SkipClientSANVerify { return NewTLSListener(l, tlsinfo) }
    	return newTLSListener(l, tlsinfo, checkSAN)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44

    pipelineHandler、streamHandler、snapHandler

    servePeers函数首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux来获取相应的Handler。首先我们先看etcdserver.ServerPeerV2接口,其包含了ServerPeer,而ServerPeer又包含了ServerV2接口,最终可以发现ServerV2接口包含了Server接口。而EtcdServer(server/etcdserver/server.go)是Server接口的实现,因此servePeers函数代码ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)中形参etcdserver.ServerPeerV2传入EtcdServer实参是没有任何问题的。

    type ServerPeerV2 interface {
    	ServerPeer
    	HashKVHandler() http.Handler
    	DowngradeEnabledHandler() http.Handler
    }
    type ServerPeer interface {
    	ServerV2
    	RaftHandler() http.Handler
    	LeaseHandler() http.Handler
    }
    type ServerV2 interface {
    	Server
    	Leader() types.ID
    	// Do takes a V2 request and attempts to fulfill it, returning a Response.
    	Do(ctx context.Context, r pb.Request) (Response, error)
    	ClientCertAuthEnabled() bool
    }
    type Server interface {
    	AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
    	RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
    	UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
    	PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
    	ClusterVersion() *semver.Version
    	StorageVersion() *semver.Version
    	Cluster() api.Cluster
    	Alarms() []*pb.AlarmMember
    	LeaderChangedNotify() <-chan struct{}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28

    NewPeerHandler函数调用newPeerHandler函数注册raftHandler、leaseHandler、hashKVHandler、downgradeEnabledHandler和versionHandler、peerMembersHandler、peerMemberPromoteHandler到mux中。这里罗列一下这些handler所对应的路径。

    handlerpath
    raftHandler/raft 或 /raft/
    leaseHandler/leases 或 /leases/internal
    hashKVHandler/members/hashkv
    downgradeEnabledHandler/downgrade/enabled
    versionHandler/version
    peerMembersHandler/members
    peerMemberPromoteHandler/members/promote/
    // NewPeerHandler generates an http.Handler to handle etcd peer requests.
    func NewPeerHandler(lg *zap.Logger, s etcdserver.ServerPeerV2) http.Handler {
    	return newPeerHandler(lg, s, s.RaftHandler(), s.LeaseHandler(), s.HashKVHandler(), s.DowngradeEnabledHandler())
    }
    func newPeerHandler(lg *zap.Logger,s etcdserver.Server,raftHandler http.Handler,leaseHandler http.Handler,hashKVHandler http.Handler,downgradeEnabledHandler http.Handler,) http.Handler {
    	if lg == nil { lg = zap.NewNop() }
    	peerMembersHandler := newPeerMembersHandler(lg, s.Cluster())
    	peerMemberPromoteHandler := newPeerMemberPromoteHandler(lg, s)
    	mux := http.NewServeMux()
    	mux.HandleFunc("/", http.NotFound)
    	mux.Handle(rafthttp.RaftPrefix, raftHandler)
    	mux.Handle(rafthttp.RaftPrefix+"/", raftHandler)
    	mux.Handle(peerMembersPath, peerMembersHandler)
    	mux.Handle(peerMemberPromotePrefix, peerMemberPromoteHandler)
    	if leaseHandler != nil {
    		mux.Handle(leasehttp.LeasePrefix, leaseHandler)
    		mux.Handle(leasehttp.LeaseInternalPrefix, leaseHandler)
    	}
    	if downgradeEnabledHandler != nil {
    		mux.Handle(etcdserver.DowngradeEnabledPath, downgradeEnabledHandler)
    	}
    	if hashKVHandler != nil {
    		mux.Handle(etcdserver.PeerHashKVPath, hashKVHandler)
    	}
    	mux.HandleFunc(versionPath, versionHandler(s, serveVersion))
    	return mux
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    RaftHandler()
    RaftHandler()函数定义在server/etcdserver/server.go文件中,其会调用transport的Handler函数。

    func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
    
    • 1

    transport的Handler函数定义在server/etcdserver/api/rafthttp/transport.go文件中,如下代码所示,创建pipelineHandler、streamHandler、snapHandler并将其注册到mux中。

    func (t *Transport) Handler() http.Handler {
    	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
    	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
    	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
    	mux := http.NewServeMux()
    	mux.Handle(RaftPrefix, pipelineHandler)
    	mux.Handle(RaftStreamPrefix+"/", streamHandler)
    	mux.Handle(RaftSnapshotPrefix, snapHandler)
    	mux.Handle(ProbingPrefix, probing.NewHandler())
    	return mux
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11

    这里罗列一下这些handler所对应的路径。

    handlerpath
    raftHandler/raft 或 /raft/
    pipelineHandler/raft
    streamHandler/raft/stream
    snapHandler/raft/snapshot
    probing.NewHandler()/raft/probing

    LeaseHandler()
    LeaseHandler()函数定义在server/etcdserver/server.go文件中,其会调用leasehttp的NewHandler函数。

    func (s *EtcdServer) LeaseHandler() http.Handler {
    	if s.lessor == nil { return nil }
    	return leasehttp.NewHandler(s.lessor, s.ApplyWait)
    }
    
    • 1
    • 2
    • 3
    • 4

    HashKVHandler()
    HashKVHandler()函数定义在server/etcdserver/corrupt.go文件中,其会返回hashKVHandler结构体。

    func (s *EtcdServer) HashKVHandler() http.Handler {
    	return &hashKVHandler{lg: s.Logger(), server: s}
    }
    type hashKVHandler struct {
    	lg     *zap.Logger
    	server *EtcdServer
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    DowngradeEnabledHandler()
    DowngradeEnabledHandler()函数定义在server/etcdserver/server.go文件中,其会返回downgradeEnabledHandler结构体。

    type downgradeEnabledHandler struct {
    	lg      *zap.Logger
    	cluster api.Cluster
    	server  *EtcdServer
    }
    func (s *EtcdServer) DowngradeEnabledHandler() http.Handler {
    	return &downgradeEnabledHandler{
    		lg:      s.Logger(),
    		cluster: s.cluster,
    		server:  s,
    	}
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    versionHandler(s, serveVersion)
    versionHandler()函数定义在server/etcdserver/api/etcdhttp/version.go文件中,其会向其函数体中的函数传入ResponseWriter对象和http.Request,其内部函数会调用server.ClusterVersion()和server.StorageVersion()获取集群版本和存储版本,然后调用serveVersion函数将版本信息封装成Versions结构体,设置http相应头信息,序列化后写出。

    func versionHandler(server etcdserver.Server, fn func(http.ResponseWriter, *http.Request, string, string)) http.HandlerFunc {
    	return func(w http.ResponseWriter, r *http.Request) {
    		clusterVersion := server.ClusterVersion()
    		storageVersion := server.StorageVersion()
    		clusterVersionStr, storageVersionStr := "not_decided", "unknown"
    		if clusterVersion != nil {
    			clusterVersionStr = clusterVersion.String()
    		}
    		if storageVersion != nil {
    			storageVersionStr = storageVersion.String()
    		}
    		fn(w, r, clusterVersionStr, storageVersionStr)
    	}
    }
    func serveVersion(w http.ResponseWriter, r *http.Request, clusterV, storageV string) {
    	if !allowMethod(w, r, "GET") { return }
    	vs := version.Versions{
    		Server:  version.Version,
    		Cluster: clusterV,
    		Storage: storageV,
    	}
    
    	w.Header().Set("Content-Type", "application/json")
    	b, err := json.Marshal(&vs)
    	if err != nil { panic(fmt.Sprintf("cannot marshal versions to json (%v)", err)) }
    	w.Write(b)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27

    newPeerMembersHandler(lg, s.Cluster())
    newPeerMembersHandler()函数定义在server/etcdserver/api/etcdhttp/peer.go文件中,其会返回peerMembersHandler结构体。

    func newPeerMembersHandler(lg *zap.Logger, cluster api.Cluster) http.Handler {
    	return &peerMembersHandler{
    		lg:      lg,
    		cluster: cluster,
    	}
    }
    type peerMembersHandler struct {
    	lg      *zap.Logger
    	cluster api.Cluster
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    newPeerMemberPromoteHandler(lg, s)
    newPeerMemberPromoteHandler()函数定义在server/etcdserver/api/etcdhttp/peer.go文件中,其会返回peerMemberPromoteHandler结构体。

    func newPeerMemberPromoteHandler(lg *zap.Logger, s etcdserver.Server) http.Handler {
    	return &peerMemberPromoteHandler{
    		lg:      lg,
    		cluster: s.Cluster(),
    		server:  s,
    	}
    }
    type peerMemberPromoteHandler struct {
    	lg      *zap.Logger
    	cluster api.Cluster
    	server  etcdserver.Server
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
  • 相关阅读:
    Vue2源码学习笔记 - 13.响应式原理—Watcher 类详解
    Junit单元测试之Maven项目集成Jacoco,查看覆盖率报告
    “==”和equals的区别
    深度学习和图形学渲染的结合和应用
    基于php的物流系统设计与实现
    【Overload游戏引擎细节分析】鼠标键盘控制摄像机原理
    容易记混的方法slice、splice、split
    字节前端监控培训精华笔记
    【C语言数据结构】栈-顺序存储(顺序栈)
    涨知识!Python 的异常信息还能这样展现
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/125460092