• EtcdServer初始化


    EtcdServer初始化

    NewServer

    etcdserver.NewServer (server.go:305) go.etcd.io/etcd/server/v3/etcdserver
    embed.StartEtcd (etcd.go:247) go.etcd.io/etcd/server/v3/embed
    etcdmain.startEtcd (etcd.go:203) go.etcd.io/etcd/server/v3/etcdmain
    etcdmain.startEtcdOrProxyV2 (etcd.go:113) go.etcd.io/etcd/server/v3/etcdmain
    etcdmain.Main (main.go:40) go.etcd.io/etcd/server/v3/etcdmain
    main.main (main.go:32) main
    runtime.main (proc.go:255) runtime
    runtime.goexit (asm_amd64.s:1581) runtime
     - Async Stack Trace
    :2
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10

    startEtcdOrProxyV2

    startEtcdOrProxyV2会针对命令行参数解析并进行初始化和启动etcd服务器。其执行流程大致如下:

    • 解析参数
    • 初始化日志
    • 初始化host
    • 初始化db存储名
    • 启动服务器
    • 然后等待服务器停止
    func startEtcdOrProxyV2(args []string) {
    	grpc.EnableTracing = false
    	// 解析参数
    	cfg := newConfig()
    	defaultInitialCluster := cfg.ec.InitialCluster
    
    	err := cfg.parse(args[1:])
    	// 初始化日志
    	lg := cfg.ec.GetLogger()
    	// If we failed to parse the whole configuration, print the error using
    	// preferably the resolved logger from the config,
    	// but if does not exists, create a new temporary logger.
    	if lg == nil {
    		var zapError error
    		// use this logger
    		lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
    		if zapError != nil {
    			fmt.Printf("error creating zap logger %v", zapError)
    			os.Exit(1)
    		}
    	}
    	lg.Info("Running: ", zap.Strings("args", args))
    	if err != nil {
    		lg.Warn("failed to verify flags", zap.Error(err))
    		switch err {
    		case embed.ErrUnsetAdvertiseClientURLsFlag:
    			lg.Warn("advertise client URLs are not set", zap.Error(err))
    		}
    		os.Exit(1)
    	}
    
    	cfg.ec.SetupGlobalLoggers()
    
    	defer func() {
    		logger := cfg.ec.GetLogger()
    		if logger != nil {
    			logger.Sync()
    		}
    	}()
    	// 初始化host
    	defaultHost, dhErr := (&cfg.ec).UpdateDefaultClusterFromName(defaultInitialCluster)
    	if defaultHost != "" {
    		lg.Info(
    			"detected default host for advertise",
    			zap.String("host", defaultHost),
    		)
    	}
    	if dhErr != nil {
    		lg.Info("failed to detect default host", zap.Error(dhErr))
    	}
    
    	// 初始化存储目录
    	if cfg.ec.Dir == "" {
    		cfg.ec.Dir = fmt.Sprintf("%v.etcd", cfg.ec.Name)
    		lg.Warn(
    			"'data-dir' was empty; using default",
    			zap.String("data-dir", cfg.ec.Dir),
    		)
    	}
    
    	var stopped <-chan struct{}
    	var errc <-chan error
    	// 3. 启动etcd服务器
    	which := identifyDataDirOrDie(cfg.ec.GetLogger(), cfg.ec.Dir)
    	if which != dirEmpty {
    		lg.Info(
    			"server has already been initialized",
    			zap.String("data-dir", cfg.ec.Dir),
    			zap.String("dir-type", string(which)),
    		)
    		switch which {
    		case dirMember:
    			stopped, errc, err = startEtcd(&cfg.ec)
    		case dirProxy:
    			lg.Panic("v2 http proxy has already been deprecated in 3.6", zap.String("dir-type", string(which)))
    		default:
    			lg.Panic(
    				"unknown directory type",
    				zap.String("dir-type", string(which)),
    			)
    		}
    	} else {
    		stopped, errc, err = startEtcd(&cfg.ec)
    		if err != nil {
    			lg.Warn("failed to start etcd", zap.Error(err))
    		}
    	}
    
    	if err != nil {
    		if derr, ok := err.(*errors.DiscoveryError); ok {
    			switch derr.Err {
    			case v2discovery.ErrDuplicateID:
    				lg.Warn(
    					"member has been registered with discovery service",
    					zap.String("name", cfg.ec.Name),
    					zap.String("discovery-token", cfg.ec.Durl),
    					zap.Error(derr.Err),
    				)
    				lg.Warn(
    					"but could not find valid cluster configuration",
    					zap.String("data-dir", cfg.ec.Dir),
    				)
    				lg.Warn("check data dir if previous bootstrap succeeded")
    				lg.Warn("or use a new discovery token if previous bootstrap failed")
    
    			case v2discovery.ErrDuplicateName:
    				lg.Warn(
    					"member with duplicated name has already been registered",
    					zap.String("discovery-token", cfg.ec.Durl),
    					zap.Error(derr.Err),
    				)
    				lg.Warn("cURL the discovery token URL for details")
    				lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
    
    			default:
    				lg.Warn(
    					"failed to bootstrap; discovery token was already used",
    					zap.String("discovery-token", cfg.ec.Durl),
    					zap.Error(err),
    				)
    				lg.Warn("do not reuse discovery token; generate a new one to bootstrap a cluster")
    			}
    			os.Exit(1)
    		}
    
    		if strings.Contains(err.Error(), "include") && strings.Contains(err.Error(), "--initial-cluster") {
    			lg.Warn("failed to start", zap.Error(err))
    			if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) {
    				lg.Warn("forgot to set --initial-cluster?")
    			}
    			if types.URLs(cfg.ec.APUrls).String() == embed.DefaultInitialAdvertisePeerURLs {
    				lg.Warn("forgot to set --initial-advertise-peer-urls?")
    			}
    			if cfg.ec.InitialCluster == cfg.ec.InitialClusterFromName(cfg.ec.Name) && len(cfg.ec.Durl) == 0 && len(cfg.ec.DiscoveryCfg.Endpoints) == 0 {
    				lg.Warn("V2 discovery settings (i.e., --discovery) or v3 discovery settings (i.e., --discovery-token, --discovery-endpoints) are not set")
    			}
    			os.Exit(1)
    		}
    		lg.Fatal("discovery failed", zap.Error(err))
    	}
    
    	osutil.HandleInterrupts(lg)
    
    	// At this point, the initialization of etcd is done.
    	// The listeners are listening on the TCP ports and ready
    	// for accepting connections. The etcd instance should be
    	// joined with the cluster and ready to serve incoming
    	// connections.
    	notifySystemd(lg)
    
    	select {
    	case lerr := <-errc:
    		// fatal out on listener errors
    		lg.Fatal("listener failed", zap.Error(lerr))
    	case <-stopped:
    	}
    
    	osutil.Exit(0)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159

    StartEtcd

    StartEtcd启动etcd服务器和HTTP服务,用于客户端/服务器通信。

    返回etcd。

    服务器不能保证一定加入集群,所以需要等待Etcd.Server.ReadyNotify()。

    func StartEtcd(inCfg *Config) (e *Etcd, err error) {
    	if err = inCfg.Validate(); err != nil {
    		return nil, err
    	}
    	serving := false
    	e = &Etcd{cfg: *inCfg, stopc: make(chan struct{})}
    	cfg := &e.cfg
    	defer func() {
    		if e == nil || err == nil {
    			return
    		}
    		if !serving {
    			// errored before starting gRPC server for serveCtx.serversC
    			for _, sctx := range e.sctxs {
    				close(sctx.serversC)
    			}
    		}
    		e.Close()
    		e = nil
    	}()
    
    	if !cfg.SocketOpts.Empty() {
    		cfg.logger.Info(
    			"configuring socket options",
    			zap.Bool("reuse-address", cfg.SocketOpts.ReuseAddress),
    			zap.Bool("reuse-port", cfg.SocketOpts.ReusePort),
    		)
    	}
    	e.cfg.logger.Info(
    		"configuring peer listeners",
    		zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
    	)
    	if e.Peers, err = configurePeerListeners(cfg); err != nil {
    		return e, err
    	}
    
    	e.cfg.logger.Info(
    		"configuring client listeners",
    		zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
    	)
    	if e.sctxs, err = configureClientListeners(cfg); err != nil {
    		return e, err
    	}
    
    	for _, sctx := range e.sctxs {
    		e.Clients = append(e.Clients, sctx.l)
    	}
    
    	var (
    		urlsmap types.URLsMap
    		token   string
    	)
    	memberInitialized := true
    	if !isMemberInitialized(cfg) {
    		memberInitialized = false
    		urlsmap, token, err = cfg.PeerURLsMapAndToken("etcd")
    		if err != nil {
    			return e, fmt.Errorf("error setting up initial cluster: %v", err)
    		}
    	}
    
    	// 是否自动压缩,默认未设置
    	if len(cfg.AutoCompactionRetention) == 0 {
    		cfg.AutoCompactionRetention = "0"
    	}
    	autoCompactionRetention, err := parseCompactionRetention(cfg.AutoCompactionMode, cfg.AutoCompactionRetention)
    	if err != nil {
    		return e, err
    	}
    
    	backendFreelistType := parseBackendFreelistType(cfg.BackendFreelistType)
    
    	srvcfg := config.ServerConfig{
    		Name:                                     cfg.Name,
    		ClientURLs:                               cfg.ACUrls,
    		PeerURLs:                                 cfg.APUrls,
    		DataDir:                                  cfg.Dir,
    		DedicatedWALDir:                          cfg.WalDir,
    		SnapshotCount:                            cfg.SnapshotCount,
    		SnapshotCatchUpEntries:                   cfg.SnapshotCatchUpEntries,
    		MaxSnapFiles:                             cfg.MaxSnapFiles,
    		MaxWALFiles:                              cfg.MaxWalFiles,
    		InitialPeerURLsMap:                       urlsmap,
    		InitialClusterToken:                      token,
    		DiscoveryURL:                             cfg.Durl,
    		DiscoveryProxy:                           cfg.Dproxy,
    		DiscoveryCfg:                             cfg.DiscoveryCfg,
    		NewCluster:                               cfg.IsNewCluster(),
    		PeerTLSInfo:                              cfg.PeerTLSInfo,
    		TickMs:                                   cfg.TickMs,
    		ElectionTicks:                            cfg.ElectionTicks(),
    		WaitClusterReadyTimeout:                  cfg.ExperimentalWaitClusterReadyTimeout,
    		InitialElectionTickAdvance:               cfg.InitialElectionTickAdvance,
    		AutoCompactionRetention:                  autoCompactionRetention,
    		AutoCompactionMode:                       cfg.AutoCompactionMode,
    		QuotaBackendBytes:                        cfg.QuotaBackendBytes,
    		BackendBatchLimit:                        cfg.BackendBatchLimit,
    		BackendFreelistType:                      backendFreelistType,
    		BackendBatchInterval:                     cfg.BackendBatchInterval,
    		MaxTxnOps:                                cfg.MaxTxnOps,
    		MaxRequestBytes:                          cfg.MaxRequestBytes,
    		MaxConcurrentStreams:                     cfg.MaxConcurrentStreams,
    		SocketOpts:                               cfg.SocketOpts,
    		StrictReconfigCheck:                      cfg.StrictReconfigCheck,
    		ClientCertAuthEnabled:                    cfg.ClientTLSInfo.ClientCertAuth,
    		AuthToken:                                cfg.AuthToken,
    		BcryptCost:                               cfg.BcryptCost,
    		TokenTTL:                                 cfg.AuthTokenTTL,
    		CORS:                                     cfg.CORS,
    		HostWhitelist:                            cfg.HostWhitelist,
    		InitialCorruptCheck:                      cfg.ExperimentalInitialCorruptCheck,
    		CorruptCheckTime:                         cfg.ExperimentalCorruptCheckTime,
    		CompactHashCheckEnabled:                  cfg.ExperimentalCompactHashCheckEnabled,
    		CompactHashCheckTime:                     cfg.ExperimentalCompactHashCheckTime,
    		PreVote:                                  cfg.PreVote,
    		Logger:                                   cfg.logger,
    		ForceNewCluster:                          cfg.ForceNewCluster,
    		EnableGRPCGateway:                        cfg.EnableGRPCGateway,
    		ExperimentalEnableDistributedTracing:     cfg.ExperimentalEnableDistributedTracing,
    		UnsafeNoFsync:                            cfg.UnsafeNoFsync,
    		EnableLeaseCheckpoint:                    cfg.ExperimentalEnableLeaseCheckpoint,
    		LeaseCheckpointPersist:                   cfg.ExperimentalEnableLeaseCheckpointPersist,
    		CompactionBatchLimit:                     cfg.ExperimentalCompactionBatchLimit,
    		CompactionSleepInterval:                  cfg.ExperimentalCompactionSleepInterval,
    		WatchProgressNotifyInterval:              cfg.ExperimentalWatchProgressNotifyInterval,
    		DowngradeCheckTime:                       cfg.ExperimentalDowngradeCheckTime,
    		WarningApplyDuration:                     cfg.ExperimentalWarningApplyDuration,
    		WarningUnaryRequestDuration:              cfg.ExperimentalWarningUnaryRequestDuration,
    		ExperimentalMemoryMlock:                  cfg.ExperimentalMemoryMlock,
    		ExperimentalTxnModeWriteWithSharedBuffer: cfg.ExperimentalTxnModeWriteWithSharedBuffer,
    		ExperimentalBootstrapDefragThresholdMegabytes: cfg.ExperimentalBootstrapDefragThresholdMegabytes,
    		ExperimentalMaxLearners:                       cfg.ExperimentalMaxLearners,
    		V2Deprecation:                                 cfg.V2DeprecationEffective(),
    	}
    
    	if srvcfg.ExperimentalEnableDistributedTracing {
    		tctx := context.Background()
    		tracingExporter, opts, err := setupTracingExporter(tctx, cfg)
    		if err != nil {
    			return e, err
    		}
    		if tracingExporter == nil || len(opts) == 0 {
    			return e, fmt.Errorf("error setting up distributed tracing")
    		}
    		e.tracingExporterShutdown = func() { tracingExporter.Shutdown(tctx) }
    		srvcfg.ExperimentalTracerOptions = opts
    
    		e.cfg.logger.Info(
    			"distributed tracing setup enabled",
    		)
    	}
    
    	print(e.cfg.logger, *cfg, srvcfg, memberInitialized)
    
    	if e.Server, err = etcdserver.NewServer(srvcfg); err != nil {
    		return e, err
    	}
    
    	// buffer channel so goroutines on closed connections won't wait forever
    	e.errc = make(chan error, len(e.Peers)+len(e.Clients)+2*len(e.sctxs))
    
    	// newly started member ("memberInitialized==false")
    	// does not need corruption check
    	if memberInitialized && srvcfg.InitialCorruptCheck {
    		if err = e.Server.CorruptionChecker().InitialCheck(); err != nil {
    			// set "EtcdServer" to nil, so that it does not block on "EtcdServer.Close()"
    			// (nothing to close since rafthttp transports have not been started)
    
    			e.cfg.logger.Error("checkInitialHashKV failed", zap.Error(err))
    			e.Server.Cleanup()
    			e.Server = nil
    			return e, err
    		}
    	}
    	e.Server.Start()
    
    	if err = e.servePeers(); err != nil {
    		return e, err
    	}
    	if err = e.serveClients(); err != nil {
    		return e, err
    	}
    	if err = e.serveMetrics(); err != nil {
    		return e, err
    	}
    
    	e.cfg.logger.Info(
    		"now serving peer/client/metrics",
    		zap.String("local-member-id", e.Server.MemberId().String()),
    		zap.Strings("initial-advertise-peer-urls", e.cfg.getAPURLs()),
    		zap.Strings("listen-peer-urls", e.cfg.getLPURLs()),
    		zap.Strings("advertise-client-urls", e.cfg.getACURLs()),
    		zap.Strings("listen-client-urls", e.cfg.getLCURLs()),
    		zap.Strings("listen-metrics-urls", e.cfg.getMetricsURLs()),
    	)
    	serving = true
    	return e, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139
    • 140
    • 141
    • 142
    • 143
    • 144
    • 145
    • 146
    • 147
    • 148
    • 149
    • 150
    • 151
    • 152
    • 153
    • 154
    • 155
    • 156
    • 157
    • 158
    • 159
    • 160
    • 161
    • 162
    • 163
    • 164
    • 165
    • 166
    • 167
    • 168
    • 169
    • 170
    • 171
    • 172
    • 173
    • 174
    • 175
    • 176
    • 177
    • 178
    • 179
    • 180
    • 181
    • 182
    • 183
    • 184
    • 185
    • 186
    • 187
    • 188
    • 189
    • 190
    • 191
    • 192
    • 193
    • 194
    • 195
    • 196
    • 197
    • 198

    NewServer

    创建一个etcd服务器

    func NewServer(cfg config.ServerConfig) (srv *EtcdServer, err error) {
    	b, err := bootstrap(cfg)
    	if err != nil {
    		return nil, err
    	}
    
    	defer func() {
    		if err != nil {
    			b.Close()
    		}
    	}()
    
    	sstats := stats.NewServerStats(cfg.Name, b.cluster.cl.String())
    	lstats := stats.NewLeaderStats(cfg.Logger, b.cluster.nodeID.String())
    
    	heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
    	srv = &EtcdServer{
    		readych:               make(chan struct{}),
    		Cfg:                   cfg,
    		lgMu:                  new(sync.RWMutex),
    		lg:                    cfg.Logger,
    		errorc:                make(chan error, 1),
    		v2store:               b.storage.st,
    		snapshotter:           b.ss,
    		r:                     *b.raft.newRaftNode(b.ss, b.storage.wal.w, b.cluster.cl),
    		memberId:              b.cluster.nodeID,
    		attributes:            membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
    		cluster:               b.cluster.cl,
    		stats:                 sstats,
    		lstats:                lstats,
    		SyncTicker:            time.NewTicker(500 * time.Millisecond),
    		peerRt:                b.prt,
    		reqIDGen:              idutil.NewGenerator(uint16(b.cluster.nodeID), time.Now()),
    		AccessController:      &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
    		consistIndex:          b.storage.backend.ci,
    		firstCommitInTerm:     notify.NewNotifier(),
    		clusterVersionChanged: notify.NewNotifier(),
    	}
    	serverID.With(prometheus.Labels{"server_id": b.cluster.nodeID.String()}).Set(1)
    	srv.cluster.SetVersionChangedNotifier(srv.clusterVersionChanged)
    	srv.applyV2 = NewApplierV2(cfg.Logger, srv.v2store, srv.cluster)
    
    	srv.be = b.storage.backend.be
    	srv.beHooks = b.storage.backend.beHooks
    	minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
    
    	//kv前总要收回出租人。当我们恢复mvcc。KV会把key重新附在租约上。
    	// 如果我们恢复mvcc。首先是KV,它会在恢复之前把key给错误的出租人。
    	srv.lessor = lease.NewLessor(srv.Logger(), srv.be, srv.cluster, lease.LessorConfig{
    		MinLeaseTTL:                int64(math.Ceil(minTTL.Seconds())),
    		CheckpointInterval:         cfg.LeaseCheckpointInterval,
    		CheckpointPersist:          cfg.LeaseCheckpointPersist,
    		ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
    	})
    
    	tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
    		func(index uint64) <-chan struct{} {
    			return srv.applyWait.Wait(index)
    		},
    		time.Duration(cfg.TokenTTL)*time.Second,
    	)
    	if err != nil {
    		cfg.Logger.Warn("failed to create token provider", zap.Error(err))
    		return nil, err
    	}
    
    	mvccStoreConfig := mvcc.StoreConfig{
    		CompactionBatchLimit:    cfg.CompactionBatchLimit,
    		CompactionSleepInterval: cfg.CompactionSleepInterval,
    	}
    	srv.kv = mvcc.New(srv.Logger(), srv.be, srv.lessor, mvccStoreConfig)
    	srv.corruptionChecker = newCorruptionChecker(cfg.Logger, srv, srv.kv.HashStorage())
    
    	srv.authStore = auth.NewAuthStore(srv.Logger(), schema.NewAuthBackend(srv.Logger(), srv.be), tp, int(cfg.BcryptCost))
    
    	newSrv := srv // since srv == nil in defer if srv is returned as nil
    	defer func() {
    		// closing backend without first closing kv can cause
    		// resumed compactions to fail with closed tx errors
    		if err != nil {
    			newSrv.kv.Close()
    		}
    	}()
    	if num := cfg.AutoCompactionRetention; num != 0 {
    		srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
    		if err != nil {
    			return nil, err
    		}
    		srv.compactor.Run()
    	}
    
    	if err = srv.restoreAlarms(); err != nil {
    		return nil, err
    	}
    	srv.uberApply = srv.NewUberApplier()
    
    	if srv.Cfg.EnableLeaseCheckpoint {
    		// setting checkpointer enables lease checkpoint feature.
    		srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
    			srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
    		})
    	}
    
    	// Set the hook after EtcdServer finishes the initialization to avoid
    	// the hook being called during the initialization process.
    	srv.be.SetTxPostLockInsideApplyHook(srv.getTxPostLockInsideApplyHook())
    
    	// TODO: move transport initialization near the definition of remote
    	tr := &rafthttp.Transport{
    		Logger:      cfg.Logger,
    		TLSInfo:     cfg.PeerTLSInfo,
    		DialTimeout: cfg.PeerDialTimeout(),
    		ID:          b.cluster.nodeID,
    		URLs:        cfg.PeerURLs,
    		ClusterID:   b.cluster.cl.ID(),
    		Raft:        srv,
    		Snapshotter: b.ss,
    		ServerStats: sstats,
    		LeaderStats: lstats,
    		ErrorC:      srv.errorc,
    	}
    	if err = tr.Start(); err != nil {
    		return nil, err
    	}
    	// add all remotes into transport
    	for _, m := range b.cluster.remotes {
    		if m.ID != b.cluster.nodeID {
    			tr.AddRemote(m.ID, m.PeerURLs)
    		}
    	}
    	for _, m := range b.cluster.cl.Members() {
    		if m.ID != b.cluster.nodeID {
    			tr.AddPeer(m.ID, m.PeerURLs)
    		}
    	}
    	srv.r.transport = tr
    
    	return srv, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74
    • 75
    • 76
    • 77
    • 78
    • 79
    • 80
    • 81
    • 82
    • 83
    • 84
    • 85
    • 86
    • 87
    • 88
    • 89
    • 90
    • 91
    • 92
    • 93
    • 94
    • 95
    • 96
    • 97
    • 98
    • 99
    • 100
    • 101
    • 102
    • 103
    • 104
    • 105
    • 106
    • 107
    • 108
    • 109
    • 110
    • 111
    • 112
    • 113
    • 114
    • 115
    • 116
    • 117
    • 118
    • 119
    • 120
    • 121
    • 122
    • 123
    • 124
    • 125
    • 126
    • 127
    • 128
    • 129
    • 130
    • 131
    • 132
    • 133
    • 134
    • 135
    • 136
    • 137
    • 138
    • 139

    bootstrap

    boostrap进行主要的初始化:

    func bootstrap(cfg config.ServerConfig) (b *bootstrappedServer, err error) {
    
    	if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
    		cfg.Logger.Warn(
    			"exceeded recommended request limit",
    			zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
    			zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
    			zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
    			zap.String("recommended-request-size", recommendedMaxRequestBytesString),
    		)
    	}
    	//每个etcd节点都有会将其数据保存到“节点名称.etcdlmember ”目录下 如果在下面没有特殊说明,
    	//则捉到的目录都是该目录下的子目录。这里会先检测该目录是否存在 如果不存在就创建该目录
    	if terr := fileutil.TouchDirAll(cfg.Logger, cfg.DataDir); terr != nil {
    		return nil, fmt.Errorf("cannot access data directory: %v", terr)
    	}
    
    	if terr := fileutil.TouchDirAll(cfg.Logger, cfg.MemberDir()); terr != nil {
    		return nil, fmt.Errorf("cannot access member directory: %v", terr)
    	}
    	// 初始化snap
    	ss := bootstrapSnapshot(cfg)
    	prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.PeerDialTimeout())
    	if err != nil {
    		return nil, err
    	}
    
    	// 检测wal目录下是否存在wal日志文件
    	haveWAL := wal.Exist(cfg.WALDir())
    	// 创建版本存储
    	st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
    	// 初始化backend
    	backend, err := bootstrapBackend(cfg, haveWAL, st, ss)
    	if err != nil {
    		return nil, err
    	}
    	var (
    		bwal *bootstrappedWAL
    	)
    
    	if haveWAL {
    		if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
    			return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
    		}
    		bwal = bootstrapWALFromSnapshot(cfg, backend.snapshot)
    	}
    	// 初始化集群
    	cluster, err := bootstrapCluster(cfg, bwal, prt)
    	if err != nil {
    		backend.Close()
    		return nil, err
    	}
    	// 初始化存储
    	s, err := bootstrapStorage(cfg, st, backend, bwal, cluster)
    	if err != nil {
    		backend.Close()
    		return nil, err
    	}
    
    	err = cluster.Finalize(cfg, s)
    	if err != nil {
    		backend.Close()
    		return nil, err
    	}
    	// 初始化raft模块
    	raft := bootstrapRaft(cfg, cluster, s.wal)
    	return &bootstrappedServer{
    		prt:     prt,
    		ss:      ss,
    		storage: s,
    		cluster: cluster,
    		raft:    raft,
    	}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48
    • 49
    • 50
    • 51
    • 52
    • 53
    • 54
    • 55
    • 56
    • 57
    • 58
    • 59
    • 60
    • 61
    • 62
    • 63
    • 64
    • 65
    • 66
    • 67
    • 68
    • 69
    • 70
    • 71
    • 72
    • 73
    • 74

    推荐一个零声学院免费教程,个人觉得老师讲得不错,
    分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,
    fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,
    TCP/IP,协程,DPDK等技术内容,点击立即学习

  • 相关阅读:
    【监控】grafana图表使用快速上手
    asp.net core MVC 模型验证
    贷款借钱平台 小额贷款系统开发小额贷款源码 贷款平台开发搭建
    Request和Response
    初始网络原理
    论文阅读:Denoising Diffusion Probabilistic Models
    基于UDP协议的网络服务器的模拟实现
    (高阶) Redis 7 第16讲 预热/雪崩/击穿/穿透 缓存篇
    控制力士乐油研阿托斯比例阀信号转换器放大器
    单行、多行文本超出显示省略号
  • 原文地址:https://blog.csdn.net/hzb869168467/article/details/126576436