消息入口
一个etcd节点运行以后,有3个通道接收外界消息,以kv数据的增删改查请求处理为例,介绍这3个通道的工作机制。
func StartEtcd(inCfg *Config) (e *Etcd, err error) {
// 校验配置inCfg,其实就是config结构体的embed.Config类型的ec成员
serving := false // 标识是否正在提供服务
e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})} // 创建Etcd
// 1. 初始化集群内部通信服务端通信所需的peerListener
if e.Peers, err = configurePeerListeners(cfg); err != nil { return e, err }
// buffer channel so goroutines on closed connections won't wait forever
e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
// 2. 启动集群通信服务端
if err = e.servePeers(); err != nil { return e, err }
初始化集群内部通信服务端通信所需的peerListener
configurePeerListeners函数定义在server/embed/etcd.go文件中,为配置Config中LPUrls每个url创建一个peerListener,如上图中的peerListener结构体所示,该函数为初始化peerListener结构体会调用transport.NewListenerWithOpts
函数创建net.Listener,将close函数初始化为func(context.Context) error { return peers[i].Listener.Close() }
,用于调用net.Listener.Close函数。configurePeerListeners函数并没有初始化peerListener结构体的serve函数。
func configurePeerListeners(cfg *Config) (peers []*peerListener, err error) {
if err = updateCipherSuites(&cfg.PeerTLSInfo, cfg.CipherSuites); err != nil { return nil, err }
if err = cfg.PeerSelfCert(); err != nil { cfg.logger.Fatal("failed to get peer self-signed certs", zap.Error(err)) }
if !cfg.PeerTLSInfo.Empty() { cfg.logger.Info( "starting with peer TLS", zap.String("tls-info", fmt.Sprintf("%+v", cfg.PeerTLSInfo)), zap.Strings("cipher-suites", cfg.CipherSuites),) }
peers = make([]*peerListener, len(cfg.LPUrls)) // 需要为配置Config中的LPUrls每个url申请一个peerListener
defer func() { // 退出该函数执行的清理函数
if err == nil { return }
for i := range peers {
if peers[i] != nil && peers[i].close != nil {
cfg.logger.Warn( "closing peer listener", zap.String("address", cfg.LPUrls[i].String()), zap.Error(err),)
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
peers[i].close(ctx)
cancel()
}
}
}()
for i, u := range cfg.LPUrls {
if u.Scheme == "http" { // http协议
if !cfg.PeerTLSInfo.Empty() { cfg.logger.Warn("scheme is HTTP while key and cert files are present; ignoring key and cert files", zap.String("peer-url", u.String())) }
if cfg.PeerTLSInfo.ClientCertAuth { cfg.logger.Warn("scheme is HTTP while --peer-client-cert-auth is enabled; ignoring client cert auth for this URL", zap.String("peer-url", u.String())) }
}
peers[i] = &peerListener{close: func(context.Context) error { return nil }} // 为配置Config中的LPUrls每个url创建一个peerListener
peers[i].Listener, err = transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(&cfg.PeerTLSInfo), transport.WithSocketOpts(&cfg.SocketOpts), transport.WithTimeout(rafthttp.ConnReadTimeout, rafthttp.ConnWriteTimeout),)
if err != nil { return nil, err }
// once serve, overwrite with 'http.Server.Shutdown'
peers[i].close = func(context.Context) error { return peers[i].Listener.Close() }
}
return peers, nil
}
启动集群通信服务端
servePeers函数定义在server/embed/etcd.go文件中,首先NewPeerHandler会调用newPeerHandler(lg,s,s.RaftHandler()...)
,RaftHandler()会返回s.r.transport.Handler()
,最终transport.Handler函数返回了注册了pipelineHandler、streamHandler、snapHandler的mux。对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler;更新peerListener的close函数,对每个peerListener创建协程运行cmux.Serve(),也就是运行注册configurePeerListeners的transport.NewListenerWithOpts创建Listener的服务。
// configure peer handlers after rafthttp.Transport started
func (e *Etcd) servePeers() (err error) {
ph := etcdhttp.NewPeerHandler(e.GetLogger(), e.Server)
for _, p := range e.Peers { // 对于每个peerListener创建一个协程运行http server,使用pipelineHandler、streamHandler、snapHandler
u := p.Listener.Addr().String()
m := cmux.New(p.Listener) // 使用configurePeerListeners函数初始化的Listener创建cmx
srv := &http.Server{ Handler: ph, ReadTimeout: 5 * time.Minute, ErrorLog: defaultLog.New(io.Discard, "", 0), } // do not log user error 创建http server
go srv.Serve(m.Match(cmux.Any())) // 创建协程运行http server
p.serve = func() error { // configurePeerListeners函数并没有初始化peerListener结构体的serve函数,这里进行初始化
e.cfg.logger.Info( "cmux::serve", zap.String("address", u),)
return m.Serve()
}
p.close = func(ctx context.Context) error { // gracefully shutdown http.Server. close open listeners, idle connections until context cancel or time-out 更新一下close函数,因为这里需要关闭的对象不一样了
e.cfg.logger.Info("stopping serving peer traffic", zap.String("address", u),)
srv.Shutdown(ctx)
e.cfg.logger.Info("stopped serving peer traffic", zap.String("address", u),)
m.Close()
return nil
}
}
for _, pl := range e.Peers { // start peer servers in a goroutine
go func(l *peerListener) { // 创建协程运行cmux.Serve()
u := l.Addr().String()
e.cfg.logger.Info("serving peer traffic",zap.String("address", u),
)
e.errHandler(l.serve())
}(pl)
}
return nil
}
在初始化EtcdServer流程中运行的etcd.NewServersrv(cfg)函数最终会进行传输层的初始化,这里描述的就是首图中的Transport框图,如下流程主要进行:创建rafthttp.Transport实例、启动rafthttp.Transport实例和向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例操作。详细流程将在后续博客中讲解。
// TODO: move transport initialization near the definition of remote
tr := &rafthttp.Transport{ // 创建rafthttp.Transport实例
Logger: cfg.Logger, TLSInfo: cfg.PeerTLSInfo, DialTimeout: cfg.PeerDialTimeout(),
ID: b.cluster.nodeID, URLs: cfg.PeerURLs, ClusterID: b.cluster.cl.ID(),
Raft: srv, Snapshotter: b.ss, ServerStats: sstats, LeaderStats: lstats,
// 在这里传入的rafthttp.Raft接口实现是EtcdServer实例。EtcdServer对Raft接口的实现比较简单,它会直接将调用委托给底层的raftNode实例
ErrorC: srv.errorc,
}
if err = tr.Start(); err != nil { return nil, err } // 启动rafthttp.Transport实例
// 向rafthttp.Transport实例中添加集群中各个节点对应的Peer实例和Remote实例
for _, m := range b.cluster.remotes { // add all remotes into transport
if m.ID != b.cluster.nodeID { tr.AddRemote(m.ID, m.PeerURLs) }
}
for _, m := range b.cluster.cl.Members() {
if m.ID != b.cluster.nodeID { tr.AddPeer(m.ID, m.PeerURLs) }
}
srv.r.transport = tr