• milvus datacoord启动源码分析


    datacoord启动源码分析

    结构体

    // components.DataCoord
    // DataCoord implements grpc server of DataCoord server
    type DataCoord struct {
    	ctx context.Context
    	svr *grpcdatacoordclient.Server
    }
    
    // grpcdatacoord.Server
    // Server is the grpc server of datacoord
    type Server structgrpcdatacoord.Context
    	cancel context.CancelFunc
    
    	serverID atomic.Int64
    
    	wg        sync.WaitGroup
    	dataCoord types.DataCoordComponent
    
    	etcdCli *clientv3.Client
    	tikvCli *txnkv.Client
    
    	grpcErrChan chan error
    	grpcServer  *grpc.Server
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    dataCoord是一个接口,实现dataCoord api功能。

    func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool, wg *sync.WaitGroup) component {
    	wg.Add(1)
    	return runComponent(ctx, localMsg, wg, components.NewDataCoord, metrics.RegisterDataCoord)
    }
    
    // creator用NewDataCoord替换
    role, err = creator(ctx, factory)
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    components.NewDataCoord是一个函数。

    NewDataCoord()用来创建DataCoord结构体。

    // NewDataCoord creates a new DataCoord
    func NewDataCoord(ctx context.Context, factory dependency.Factory) (*DataCoord, error) {
    	s := grpcdatacoordclient.NewServer(ctx, factory)
    
    	return &DataCoord{
    		ctx: ctx,
    		svr: s,
    	}, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    grpcdatacoordclient.NewServer()产生的是本结构体Server。

    进入NewServer:

    // NewServer new data service grpc server
    func NewServer(ctx context.Context, factory dependency.Factory, opts ...datacoord.Option) *Server {
    	ctx1, cancel := context.WithCancel(ctx)
    
    	s := &Server{
    		ctx:         ctx1,
    		cancel:      cancel,
    		grpcErrChan: make(chan error),
    	}
    	s.dataCoord = datacoord.CreateServer(s.ctx, factory, opts...)
    	return s
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    datacoord.CreateServer()返回一个结构体datacoord.Server,是接口types.DataCoordComponent的实现。

    执行Run()

    Server结构体创建后,调用结构体的Run()方法。

    func runComponent[T component](ctx context.Context,
    	localMsg bool,
    	runWg *sync.WaitGroup,
    	creator func(context.Context, dependency.Factory) (T, error),
    	metricRegister func(*prometheus.Registry),
    ) component {
    	var role T
    
    	sign := make(chan struct{})
    	go func() {
    		factory := dependency.NewFactory(localMsg)
    		var err error
    		role, err = creator(ctx, factory)
    		if localMsg {
    			paramtable.SetRole(typeutil.StandaloneRole)
    		} else {
    			paramtable.SetRole(role.GetName())
    		}
    		if err != nil {
    			panic(err)
    		}
    		close(sign)
            // 在这里调用对应组件结构体的Run()方法,这里是components.DataCoord结构体
    		if err := role.Run(); err != nil {
    			panic(err)
    		}
    		runWg.Done()
    	}()
        ......
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30

    runComponent是一个包裹函数。

    // Run starts service
    func (s *DataCoord) Run() error {
    	if err := s.svr.Run(); err != nil {
    		log.Error("DataCoord starts error", zap.Error(err))
    		return err
    	}
    	log.Debug("DataCoord successfully started")
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    Run()方法调用s.svr.Run()方法。srv是datacoord.CreateServer()返回的结构体datacoord.Server。

    // grpcdatacoord
    // Run starts the Server. Need to call inner init and start method.
    func (s *Server) Run() error {
    	if err := s.init(); err != nil {
    		return err
    	}
    	log.Debug("DataCoord init done ...")
    
    	if err := s.start(); err != nil {
    		return err
    	}
    	log.Debug("DataCoord start done ...")
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    接下来分析s.init()和s.start()方法。

    s.init()

    func (s *Server) init() error {
    	params := paramtable.Get()
    	etcdConfig := &params.EtcdCfg
    
    	etcdCli, err := etcd.GetEtcdClient(
    		etcdConfig.UseEmbedEtcd.GetAsBool(),
    		etcdConfig.EtcdUseSSL.GetAsBool(),
    		etcdConfig.Endpoints.GetAsStrings(),
    		etcdConfig.EtcdTLSCert.GetValue(),
    		etcdConfig.EtcdTLSKey.GetValue(),
    		etcdConfig.EtcdTLSCACert.GetValue(),
    		etcdConfig.EtcdTLSMinVersion.GetValue())
    	if err != nil {
    		log.Debug("DataCoord connect to etcd failed", zap.Error(err))
    		return err
    	}
    	s.etcdCli = etcdCli
    	s.dataCoord.SetEtcdClient(etcdCli)
    	s.dataCoord.SetAddress(params.DataCoordGrpcServerCfg.GetAddress())
    
    	if params.MetaStoreCfg.MetaStoreType.GetValue() == util.MetaStoreTypeTiKV {
    		log.Info("Connecting to tikv metadata storage.")
    		tikvCli, err := getTiKVClient(&paramtable.Get().TiKVCfg)
    		if err != nil {
    			log.Warn("DataCoord failed to connect to tikv", zap.Error(err))
    			return err
    		}
    		s.dataCoord.SetTiKVClient(tikvCli)
    		log.Info("Connected to tikv. Using tikv as metadata storage.")
    	}
        // 启动grpc,默认为13333
    	err = s.startGrpc()
    	if err != nil {
    		log.Debug("DataCoord startGrpc failed", zap.Error(err))
    		return err
    	}
        // 执行真正的初始化
    	if err := s.dataCoord.Init(); err != nil {
    		log.Error("dataCoord init error", zap.Error(err))
    		return err
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43

    这段可以看出来,创建了etcdCli并赋予给了s.etcdCli。

    s.startGrpc()启动grpc端口服务。

    最终调用s.dataCoord.Init()进行初始化,代码位置:internal\datacoord\server.go

    s.queryCoord是接口类型types.DataCoordComponent,DataCoordComponent继承于Component。

    type DataCoordComponent interface {
        DataCoord
        SetAddress(address string)
        SetEtcdClient(etcdClient *clientv3.Client)
        SetTiKVClient(client *txnkv.Client)
        SetRootCoordClient(rootCoord RootCoordClient)
        SetDataNodeCreator(func(context.Context, string, int64) (DataNodeClient, error))
        SetIndexNodeCreator(func(context.Context, string, int64) (IndexNodeClient, error))
    }
    
    // DataCoord is the interface `datacoord` package implements
    type DataCoord interface {
    	Component
    	datapb.DataCoordServer
    }
    
    // Component is the interface all services implement
    type Component interface {
    	Init() error
    	Start() error
    	Stop() error
    	Register() error
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23

    接口套接口:

    RootCoordComponent -> RootCoord -> Component
    DataCoordComponent -> DataCoord -> Component
    QueryCoordComponent -> QueryCoord -> Component
    ProxyComponent -> Proxy -> Component
    QueryNodeComponent -> QueryNode -> Component
    IndexNodeComponent -> IndexNode -> Component
    DataNodeComponent -> DataNode -> Component
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    各组件最终的Init()初始化代码路径:

    internal\rootcoord\root_coord.go->Init()
    internal\datacoord\server.go->Init()
    internal\querycoordv2\server.go->Init()
    internal\datanode\data_node.go->Init()
    internal\indexnode\indexnode.go->Init()
    internal\querynodev2\server.go->Init()
    internal\proxy\proxy.go->Init()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    回过头来继续datacoord的init。

    // Init change server state to Initializing
    func (s *Server) Init() error {
    	var err error
    	s.factory.Init(Params)
    	if err = s.initSession(); err != nil {
    		return err
    	}
    	if s.enableActiveStandBy {
    		......
    	}
        // 执行真正的初始化
    	return s.initDataCoord()
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13

    继续进入c.initDataCoord():

    func (s *Server) initDataCoord() error {
    	s.stateCode.Store(commonpb.StateCode_Initializing)
    	var err error
    	if err = s.initRootCoordClient(); err != nil {
    		return err
    	}
    
    	s.broker = NewCoordinatorBroker(s.rootCoordClient)
    
    	storageCli, err := s.newChunkManagerFactory()
    	if err != nil {
    		return err
    	}
    
    	if err = s.initMeta(storageCli); err != nil {
    		return err
    	}
    
    	s.handler = newServerHandler(s)
    
    	if err = s.initCluster(); err != nil {
    		return err
    	}
    
    	s.allocator = newRootCoordAllocator(s.rootCoordClient)
    
    	s.initIndexNodeManager()
    
    	if err = s.initServiceDiscovery(); err != nil {
    		return err
    	}
    
    	if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
    		s.createCompactionHandler()
    		s.createCompactionTrigger()
    	}
    
    	if err = s.initSegmentManager(); err != nil {
    		return err
    	}
    
    	s.initGarbageCollection(storageCli)
    	s.initIndexBuilder(storageCli)
    
    	s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    从代码可以看出初始化是在填充datacoord结构体。

    s.start()

    启动组件的逻辑。

    func (s *Server) start() error {
    	err := s.dataCoord.Register()
    	if err != nil {
    		log.Debug("DataCoord register service failed", zap.Error(err))
    		return err
    	}
    
    	err = s.dataCoord.Start()
    	if err != nil {
    		log.Error("DataCoord start failed", zap.Error(err))
    		return err
    	}
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14

    s.dataCoord是一个Component接口,实现了 方法Init()、 Start() 、 Stop() 、 Register() 。

    Register():向元数据etcd注册。

    Start():用来启动组件。

    进入s.dataCoord.Start():

    func (s *Server) Start() error {
    	if !s.enableActiveStandBy {
    		s.startDataCoord()
    		log.Info("DataCoord startup successfully")
    	}
    
    	return nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8

    真正执行启动逻辑在s.startDataCoord()。

    func (s *Server) startDataCoord() {
    	if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
    		s.compactionHandler.start()
    		s.compactionTrigger.start()
    	}
    	s.startServerLoop()
    	s.stateCode.Store(commonpb.StateCode_Healthy)
    	sessionutil.SaveServerInfo(typeutil.DataCoordRole, s.session.ServerID)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9

    要详细知道启动querycoord组件做了什么事情,研究这个函数。

  • 相关阅读:
    Tomcat配置敏感信息屏蔽
    「Verilog学习笔记」数据选择器实现逻辑电路
    JVM内存布局、类加载机制及垃圾回收机制详解
    Vue框架学习(第十三课)Vuex状态管理中的store和state属性
    【iOS】—— present和push再学习
    Python语言学习实战-内置函数reduce()的使用(附源码和实现效果)
    STM32时钟系统
    精品Python宠物领养网站系统失物招领
    使用信号分析器
    HBase原理深入
  • 原文地址:https://blog.csdn.net/shulu/article/details/138133172