• ETCD数据库源码分析——集群通信初始化


    在这里插入图片描述
    消息入口
    一个etcd节点运行以后,有3个通道接收外界消息,以kv数据的增删改查请求处理为例,介绍这3个通道的工作机制。

    1. client的http调用:会通过注册到http模块的keysHandler的ServeHTTP方法处理。解析好的消息调用EtcdServer的Do()方法处理。(图中2)
    2. client的grpc调用:启动时会向grpc server注册quotaKVServer对象,quotaKVServer是以组合的方式增强了kvServer这个数据结构。grpc消息解析完以后会调用kvServer的Range、Put、DeleteRange、Txn、Compact等方法。kvServer中包含有一个RaftKV的接口,由EtcdServer这个结构实现。所以最后就是调用到EtcdServer的Range、Put、DeleteRange、Txn、Compact等方法。(图中1 ETCD数据库源码分析——etcd gRPC 服务 API)
    3. 节点之间的grpc消息:每个EtcdServer中包含有Transport结构,Transport中会有一个peers的map,每个peer封装了节点到其他某个节点的通信方式。包括streamReader、streamWriter等,用于消息的发送和接收。streamReader中有recvc和propc队列,streamReader处理完接收到的消息会将消息推到这个队列中。由peer去处理,peer调用raftNode的Process方法处理消息。(图中3、4 本文将介绍的)

    集群通信服务端

    func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    	// 校验配置inCfg,其实就是config结构体的embed.Config类型的ec成员
    	serving := false // 标识是否正在提供服务
    	e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} // 创建Etcd
    	// 1. 初始化集群内部通信服务端通信所需的peerListener
    	if e.Peers, err = configurePeerListeners(cfg); err != nil { return e, err }	
    	// buffer channel so goroutines on closed connections won't wait forever
    	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
    	// 2. 启动集群通信服务端
    	if err = e.servePeers(); err != nil { return e, err }	
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    在这里插入图片描述
    初始化集群内部通信服务端通信所需的peerListener
    configurePeerListeners函数定义在server/embed/etcd.go文件中,为配置Config中LPUrls每个url创建一个peerListener,如上图中的peerListener结构体所示,该函数为初始化peerListener结构体会调用transport.NewListenerWithOpts函数创建net.Listener,将close函数初始化为func(context.Context) error { return peers[i].Listener.Close() },用于调用net.Listener.Close函数。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。

    func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
    	if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil { return nil, err }
    	if err = cfg.PeerSelfCert(); err != nil { cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
    	if !cfg.PeerTLSInfo.Empty() { cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }
    
    	peers = make([]*peerListener, len(cfg.LPUrls)) // 需要为配置Config中的LPUrls每个url申请一个peerListener
    	defer func() {  // 退出该函数执行的清理函数
    		if err == nil { return }
    		for i := range peers {
    			if peers[i] != nil && peers[i].close != nil {
    				cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
    				ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    				peers[i].close(ctx)
    				cancel()
    			}
    		}
    	}()
    
    	for i, u := range cfg.LPUrls {
    		if u.Scheme == "http" {   // http协议
    			if !cfg.PeerTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
    			if cfg.PeerTLSInfo.ClientCertAuth { cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
    		}
    		peers[i] = &peerListener{close: func(context.Context) error { return nil }} // 为配置Config中的LPUrls每个url创建一个peerListener
    		peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
    		if err != nil { return nil, err }
    		// once serve, overwrite with 'http.Server.Shutdown'
    		peers[i].close = func(context.Context) error { return peers[i].Listener.Close() }
    	}
    	return peers, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31

    启动集群通信服务端
    servePeers函数定义在server/embed/etcd.go文件中,首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...),RaftHandler()会返回s.r.transport.Handler(),最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux。对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler;更新peerListener的close函数,对每个peerListener创建协程运行cmux.Serve(),也就是运行注册configurePeerListeners的transport.NewListenerWithOpts创建Listener的服务。

    // configure peer handlers after rafthttp.Transport started
    func (e *Etcd) servePeers() (err error) {
    	ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
    
    	for _, p := range e.Peers {  // 对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler
    		u := p.Listener.Addr().String()
    		m := cmux.New(p.Listener)  // 使用configurePeerListeners函数初始化的Listener创建cmx
    		srv := &http.Server{ Handler:     ph, ReadTimeout: 5 * time.Minute, ErrorLog:    defaultLog.New(io.Discard, "", 0),  } // do not log user error 创建http server
    		go srv.Serve(m.Match(cmux.Any())) // 创建协程运行http server
    		p.serve = func() error {  // configurePeerListeners函数并没有初始化peerListener结构体的serve函数,这里进行初始化
    			e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
    			return m.Serve()
    		}
    		p.close = func(ctx context.Context) error { // gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out 更新一下close函数,因为这里需要关闭的对象不一样了
    			e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
    			srv.Shutdown(ctx)
    			e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
    			m.Close()
    			return nil
    		}
    	}
    	
    	for _, pl := range e.Peers { // start peer servers in a goroutine
    		go func(l *peerListener) {  // 创建协程运行cmux.Serve()
    			u := l.Addr().String()
    			e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
    			)
    			e.errHandler(l.serve())
    		}(pl)
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32

    集群通信客户端

    在初始化EtcdServer流程中运行的etcd.NewServersrv(cfg)函数最终会进行传输层的初始化,这里描述的就是首图中的Transport框图,如下流程主要进行:创建rafthttp.Transport实例、启动rafthttp.Transport实例和向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例操作。详细流程将在后续博客中讲解。

    	// TODO: move transport initialization near the definition of remote
    	tr := &rafthttp.Transport{  // 创建rafthttp.Transport实例
    		Logger:      cfg.Logger, TLSInfo:     cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
    		ID:          b.cluster.nodeID, URLs:        cfg.PeerURLs, ClusterID:   b.cluster.cl.ID(),
    		Raft:        srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
    		// 在这里传入的rafthttp.Raft接口实现是EtcdServer实例。EtcdServer对Raft接口的实现比较简单,它会直接将调用委托给底层的raftNode实例
    		ErrorC:      srv.errorc,
    	}
    	if err = tr.Start(); err != nil { return nil, err }	 // 启动rafthttp.Transport实例
    	// 向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例
    	for _, m := range b.cluster.remotes {  // add all remotes into transport
    		if m.ID != b.cluster.nodeID { tr.AddRemote(m.ID, m.PeerURLs) }
    	}
    	for _, m := range b.cluster.cl.Members() {
    		if m.ID != b.cluster.nodeID { tr.AddPeer(m.ID, m.PeerURLs) }
    	}
    	srv.r.transport = tr
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
  • 相关阅读:
    【web-攻击会话管理】(4.3.1)会话令牌处理中的薄弱:在网络上泄露令牌、在日志中泄漏令牌
    idea 导入项目
    电脑小技巧45个
    【洛谷 P8682】[蓝桥杯 2019 省 B] 等差数列 题解(数学+排序+差分)
    五分钟带你了解Python基础知识【精华】
    网页表单文本框的自动填写(四种方法)
    找不到d3dx9_43.dll如何修复?d3dx9_43.dll丢失的解决办法分享
    撞上元宇宙冷门新职业 我变年入百万打工人
    Part4_场景_第55章 瑞士&第56章 苏黎世
    计算机网络(一)网络体系结构
  • 原文地址:https://blog.csdn.net/asmartkiller/article/details/125458626