• Istio资源监听原理


    Istio规则监听原理之K8s注册中心

    在使用过程中,注册中心一般都使用k8s,那么对于istio中的资源是如何进行管理的那?下面让我们一一探索

    原理

    基于K8s的云原生软件最大的特点就是采用CRD的方式将配置注入到K8s中,istio也不例外,比如VirtualService、DestinationRule等。istio对路由规则等实现的原理是,监听K8s中的CRD资源,将其转化为Envoy能够识别的配置信息,发送给envoy这样就能实现路由转发等一系列功能。

    源码

    接下来让我们进入源码阶段进行一一探索

    初始化informer资源

    该方法由NewServer()->initControllers()->initConfigController调用

    func (s *Server) initK8SConfigStore(args *PilotArgs) error {
       if s.kubeClient == nil {
          return nil
       }
       //为istio涉及的CRD资源创建informer并为其添加监听事件,
       //事件触发的方法存放在configController.handler中
       // 增删改事件 都调用同一个方法
       configController, err := s.makeKubeConfigController(args)
       if err != nil {
          return err
       }
      // 这里创建WorkloadEntry资源控制器,该资源是内部服务(POD)注册到isito时所需要的一种资源,下篇文章会细细讲解
       s.XDSServer.WorkloadEntryController = workloadentry.NewController(configController, args.PodName, args.KeepaliveOptions.MaxServerConnectionAge)
       return nil
    }
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16

    让我们进入makeKubeConfigController

    func (s *Server) makeKubeConfigController(args *PilotArgs) (model.ConfigStoreController, error) {
      // 通过这个名字我们应该就有了大概的猜测
       return crdclient.New(s.kubeClient, args.Revision, args.RegistryOptions.KubeOptions.DomainSuffix)
    }
    
    func New(client kube.Client, revision, domainSuffix string) (model.ConfigStoreController, error) {
      // 获取CRD资源的schemas,它实现了crd资源从gvk到gvr,gvr到gvk转换的功能
    	schemas := collections.Pilot
    	if features.EnableGatewayAPI {
    		schemas = collections.PilotGatewayAPI
    	}
    	return NewForSchemas(client, revision, domainSuffix, schemas)
    }
    
    // 让我们看一下collections.PilotGatewayAPI里有什么
    	PilotGatewayAPI = collection.NewSchemasBuilder().
    			MustAdd(IstioExtensionsV1Alpha1Wasmplugins).
    		MustAdd(IstioNetworkingV1Alpha3Destinationrules).
    			MustAdd(IstioNetworkingV1Alpha3Envoyfilters).
    			MustAdd(IstioNetworkingV1Alpha3Gateways).
    			MustAdd(IstioNetworkingV1Alpha3Serviceentries).
    			MustAdd(IstioNetworkingV1Alpha3Sidecars).
    			MustAdd(IstioNetworkingV1Alpha3Virtualservices).
    			MustAdd(IstioNetworkingV1Alpha3Workloadentries).
    			MustAdd(IstioNetworkingV1Alpha3Workloadgroups).
    			MustAdd(IstioNetworkingV1Beta1Proxyconfigs).
    		MustAdd(IstioSecurityV1Beta1Authorizationpolicies).
    		MustAdd(IstioSecurityV1Beta1Peerauthentications).
    		MustAdd(IstioSecurityV1Beta1Requestauthentications).
    			MustAdd(IstioTelemetryV1Alpha1Telemetries).
    			MustAdd(K8SGatewayApiV1Alpha2Gatewayclasses).
    			MustAdd(K8SGatewayApiV1Alpha2Gateways).
    			MustAdd(K8SGatewayApiV1Alpha2Httproutes).
    			MustAdd(K8SGatewayApiV1Alpha2Referencepolicies).
    			MustAdd(K8SGatewayApiV1Alpha2Tcproutes).
    			MustAdd(K8SGatewayApiV1Alpha2Tlsroutes).
    			Build()
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37

    进入 NewForSchemas

    func NewForSchemas(client kube.Client, revision, domainSuffix string, schemas collection.Schemas) (model.ConfigStoreController, error) {
       schemasByCRDName := map[string]collection.Schema{}
       for _, s := range schemas.All() {
          // From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
          name := fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group())
          schemasByCRDName[name] = s
       }
       out := &Client{
          domainSuffix:     domainSuffix,
          schemas:          schemas,
          schemasByCRDName: schemasByCRDName,
          revision:         revision,
          queue:            queue.NewQueue(1 * time.Second),
          kinds:            map[config.GroupVersionKind]*cacheHandler{},
         // 定义了informer事件处理方法,所有资源中的增删改事件都会遍历该方法调用
          handlers:         map[config.GroupVersionKind][]model.EventHandler{},
          client:           client,
          istioClient:      client.Istio(),
          gatewayAPIClient: client.GatewayAPI(),
          crdMetadataInformer: client.MetadataInformer().ForResource(collections.K8SApiextensionsK8SIoV1Customresourcedefinitions.Resource().
             GroupVersionResource()).Informer(),
          beginSync:   atomic.NewBool(false),
          initialSync: atomic.NewBool(false),
       }
    
      // 会获取当前集群中的所有CRD,用来判断当前资源是否在当前的集群中注册
       known, err := knownCRDs(client.Ext())
       for _, s := range schemas.All() {
          // From the spec: "Its name MUST be in the format <.spec.name>.<.spec.group>."
          name := fmt.Sprintf("%s.%s", s.Resource().Plural(), s.Resource().Group())
          crd := true
          if _, f := collections.Builtin.Find(s.Name().String()); f {
             crd = false
          }
         // 只要判断是CRD资源那么就为其创建informer
          if !crd {
             handleCRDAdd(out, name, nil)
          } else {
             if _, f := known[name]; f {
                handleCRDAdd(out, name, nil)
             } else {
                scope.Warnf("Skipping CRD %v as it is not present", s.Resource().GroupVersionKind())
             }
          }
       }
    
       return out, nil
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46
    • 47
    • 48

    接下来就到了我们本文的重点了informer的创建

    func handleCRDAdd(cl *Client, name string, stop <-chan struct{}) {
       // 获取当前资源的schemas
       s, f := cl.schemasByCRDName[name]
       resourceGVK := s.Resource().GroupVersionKind()
       gvr := s.Resource().GroupVersionResource()
    
       cl.kindsMu.Lock()
       var i informers.GenericInformer
       var ifactory starter
       var err error
      // 这里对当前资源进行类型判断
       switch s.Resource().Group() {
        // 如果是gateway资源
       case gvk.KubernetesGateway.Group:
          ifactory = cl.client.GatewayAPIInformer()
          i, err = cl.client.GatewayAPIInformer().ForResource(gvr)
         // 如果是pod 等资源
       case gvk.Pod.Group, gvk.Deployment.Group, gvk.MutatingWebhookConfiguration.Group:
          ifactory = cl.client.KubeInformer()
          i, err = cl.client.KubeInformer().ForResource(gvr)
       case gvk.CustomResourceDefinition.Group:
          ifactory = cl.client.ExtInformer()
          i, err = cl.client.ExtInformer().ForResource(gvr)
       default:
         // 如果不是上面以上资源那么就是isito资源
          ifactory = cl.client.IstioInformer()
          i, err = cl.client.IstioInformer().ForResource(gvr)
       }
    
      // 光创建了informer还不够,我们还需要添加监听事件,这里我就不展开了
      // 原理很简单使用informer的AddEventHandler方法,添加监听事件方法
      // 增删改 都实现同一个方法,该方法里面循环调用了上面的handler方法
       cl.kinds[resourceGVK] = createCacheHandler(cl, s, i)
       if w, f := crdWatches[resourceGVK]; f {
          scope.Infof("notifying watchers %v was created", resourceGVK)
          w.once.Do(func() {
             close(w.stop)
          })
       }
       if stop != nil {
          // Start informer factory, only if stop is defined. In startup case, we will not start here as
          // we will start all factories once we are ready to initialize.
          // For dynamically added CRDs, we need to start immediately though
          ifactory.Start(stop)
       }
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42
    • 43
    • 44
    • 45
    • 46

    添加handler方法

    既然最终调用的是handler方法,那我们就看看handler中都有什么

    有些资源的handler可能与大部队不太一样,因为实现原理与功能不一样

    Serviceentries、Workloadentries、Workloadgroups这三个资源主要目的是提供服务注册的功能,Serviceentries提供了外部服务的注册、Workloadentries提供了内部服务自动注册的功能。

    其余资源都使用下面方法

    configHandler := func(prev config.Config, curr config.Config, event model.Event) {
      // 判断事件类型 更新资源状态
       defer func() {
          if event != model.EventDelete {
             s.statusReporter.AddInProgressResource(curr)
          } else {
             s.statusReporter.DeleteInProgressResource(curr)
          }
       }()
       // 判断是否要发送更新请求
       if event == model.EventUpdate && !needsPush(prev, curr) {
          log.Debugf("skipping push for %s as spec has not changed", prev.Key())
          return
       }
      // 封装更新请求
       pushReq := &model.PushRequest{
          Full: true,
          ConfigsUpdated: map[model.ConfigKey]struct{}{{
             Kind:      curr.GroupVersionKind,
             Name:      curr.Name,
             Namespace: curr.Namespace,
          }: {}},
          Reason: []model.TriggerReason{model.ConfigUpdate},
       }
       s.XDSServer.ConfigUpdate(pushReq)
    }
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26

    XDSServer.ConfigUpdate主要目的是将当前资源转化为envoy识别的配置然后发送给每个envoy

  • 相关阅读:
    【文末福利】半导体封装率先国产化,400+封装厂商最新名单汇总
    TS:知识补充
    「喜迎华诞」手把手教你用微信小程序给头像带上小旗帜
    【笨~~~】在python中导入另一个模块的函数时,为什么会运行模块中剩下的部分??顶层?
    八大排序算法
    Vue项目下页面自适应pc端不同分辨率自适应
    【光学】Matlab模拟透射光条纹强度分布曲线仿真
    【数据处理必备Numpy~python】
    【Redis系列】在Centos7上安装Redis5.0保姆级教程!
    Gradle笔记 六 Gradle 中的Dependencies
  • 原文地址:https://blog.csdn.net/a1023934860/article/details/125604888