• Apisix-Ingress服务发现详解


    apisix

    Apache APISIX 是一个基于微服务 API 网关,其不仅可以处理南北向的流量,也可以处理东西向的流量即服务之间的流量。Apache APISIX 集成了控制面板和数据面,与其他 API 网关相比,Apache APISIX 的上游、路由、插件全是动态的,修改这些东西时都不用重启。并且 Apache APISIX 的插件也是热加载,可以随时插拔、修改插件。

    Apache APISIX 其设计理念是基于API 网关的数据平面和控制平面分离。控制平面不仅仅能够控制 Apache APISIX ,同时其还能够控制其他组件;数据平面不仅仅能够被自身的控制平面控制,还能被其他组件所控制。由于其基于ETCD 来存储和分发路由数据,默认具备高可用,无单点故障风险。除此之外,其能够友好地支持 Prometheus、SkyWalking 动态追踪、流量复制、故障注入等相关功能。

    apisix ingress

    在 K8s 生态中,Ingress 作为表示 K8s 流量入口的一种资源,想要让其生效,就需要有一个 Ingress Controller 去监听 K8s 中的 Ingress 资源,并对这些资源进行相应规则的解析和实际承载流量。在当下趋势中,像 Kubernetes Ingress Nginx 就是使用最广泛的 Ingress Controller 实现。

    而 APISIX Ingress 则是另一种 Ingress Controller 的实现。跟 Kubernetes Ingress Nginx 的区别主要在于 APISIX Ingress 是以 Apache APISIX 作为实际承载业务流量的数据面。

    Apache APISIX Ingress Controller 除了覆盖 NGINX Ingress Controller 已有的能力外,还解决了一些 Nginx Ingress Controller 的痛点。具体如下:

    • 1、配置的动态化加载
      通常情况下,作为接入层的 Ingress Controller ,其承载着服务的入口流量引入,在生产环境中,我们的业务对系统的可靠性有着更高的要求,然而,基于 Apache APISIX Ingress Controller 其能够支持动态配置,即时生效,降低生产事故的意外及风险,有助于提高运维可维护性。

    • 2、较强的灰度能力
      在实际的业务场景中,有的时候,往往会依据某些特定的需求进行权重调整,结合业务需求按比例进行流量控制,Apache APISIX Ingress Controller 可以支持 Service和 Pod 级别的权重调整,配置清晰而且可读性更强。
      除此,相对于NGINX Ingress Controller 中通过 Annotation 的方式提供 Canary 灰度方案,Apache APISIX Ingress Controller 能够解决其缺陷,从而能够更好的提供灰度策略。

    • 3、较好的扩展能力
      基于 Apache APISIX 强大的插件能力,Apache APISIX Ingress Controller 通过动态绑定插件来增强功能。Apache APISIX 通过插件封装逻辑,易于管理;完善的文档,易于使用和理解。Apache APISIX Ingress Controller 通过配置即可绑定和解绑插件,无需操作脚本。

    APISIX Ingress 目前已经支持的自定义资源主要是以下 5 类,涉及到路由、上游、消费者、证书相关和集群公共配置的相关类别。

    内置服务发现

    APISIX 内置了下面这些服务发现机制:

    • 基于 Eureka 的服务发现

    • 基于 Nacos 的服务发现

    • 基于 Consul 的服务发现

    • 基于 Consul KV 的服务发现

    • 基于 DNS 的服务发现

    • 基于 APISIX-Seed 架构的控制面服务发现

    • 基于 Kubernetes 的服务发现

    上面介绍的这些都是基于数据面apisix配置信息手动变更操作的方案集成。其实,在基于k8s的云原生场景下,apisix还提供了一个控制面组件来对apisix的服务发现进行自动管理,那就是apisix ingress

    下面就从源码的角度来看看apisix ingress是怎么做到自动的服务发现的

    apisix ingress启动

    main.go启动入口一路跟踪,进入providers/controller.gorun方法:

    1. func (c *Controller) run(ctx context.Context) {
    2.    log.Infow("controller tries to leading ...",
    3.       zap.String("namespace", c.namespace),
    4.       zap.String("pod", c.name),
    5.    )
    6.    var cancelFunc context.CancelFunc
    7.    ctx, cancelFunc = context.WithCancel(ctx)
    8.    defer cancelFunc()
    9.    // give up leader
    10.    defer c.leaderContextCancelFunc()
    11.    clusterOpts := &apisix.ClusterOptions{
    12.       AdminAPIVersion: c.cfg.APISIX.AdminAPIVersion,
    13.       Name: c.cfg.APISIX.DefaultClusterName,
    14.       AdminKey: c.cfg.APISIX.DefaultClusterAdminKey,
    15.       BaseURL: c.cfg.APISIX.DefaultClusterBaseURL,
    16.       MetricsCollector: c.MetricsCollector,
    17.    }
    18.    err := c.apisix.AddCluster(ctx, clusterOpts)
    19.    if err != nil && err != apisix.ErrDuplicatedCluster {
    20.       // TODO give up the leader role
    21.       log.Errorf("failed to add default cluster: %s", err)
    22.       return
    23.    }
    24.    if err := c.apisix.Cluster(c.cfg.APISIX.DefaultClusterName).HasSynced(ctx); err != nil {
    25.       // TODO give up the leader role
    26.       log.Errorf("failed to wait the default cluster to be ready: %s", err)
    27.       // re-create apisix cluster, used in next c.run
    28.       if err = c.apisix.UpdateCluster(ctx, clusterOpts); err != nil {
    29.          log.Errorf("failed to update default cluster: %s", err)
    30.          return
    31.       }
    32.       return
    33.    }
    34.    // Creation Phase
    35.    c.informers = c.initSharedInformers()
    36.    common := &providertypes.Common{
    37.       ControllerNamespace: c.namespace,
    38.       ListerInformer: c.informers,
    39.       Config: c.cfg,
    40.       APISIX: c.apisix,
    41.       KubeClient: c.kubeClient,
    42.       MetricsCollector: c.MetricsCollector,
    43.       Recorder: c.recorder,
    44.    }
    45.    c.namespaceProvider, err = namespace.NewWatchingNamespaceProvider(ctx, c.kubeClient, c.cfg)
    46.    if err != nil {
    47.       ctx.Done()
    48.       return
    49.    }
    50.    c.podProvider, err = pod.NewProvider(common, c.namespaceProvider)
    51.    if err != nil {
    52.       ctx.Done()
    53.       return
    54.    }
    55.    c.translator = translation.NewTranslator(&translation.TranslatorOptions{
    56.       APIVersion: c.cfg.Kubernetes.APIVersion,
    57.       EndpointLister: c.informers.EpLister,
    58.       ServiceLister: c.informers.SvcLister,
    59.       SecretLister: c.informers.SecretLister,
    60.       PodLister: c.informers.PodLister,
    61.       ApisixUpstreamLister: c.informers.ApisixUpstreamLister,
    62.       PodProvider: c.podProvider,
    63.    })
    64.    c.apisixProvider, c.apisixTranslator, err = apisixprovider.NewProvider(common, c.namespaceProvider, c.translator)
    65.    if err != nil {
    66.       ctx.Done()
    67.       return
    68.    }
    69.    c.ingressProvider, err = ingressprovider.NewProvider(common, c.namespaceProvider, c.translator, c.apisixTranslator)
    70.    if err != nil {
    71.       ctx.Done()
    72.       return
    73.    }
    74.    c.kubeProvider, err = k8s.NewProvider(common, c.translator, c.namespaceProvider, c.apisixProvider, c.ingressProvider)
    75.    if err != nil {
    76.       ctx.Done()
    77.       return
    78.    }
    79.    if c.cfg.Kubernetes.EnableGatewayAPI {
    80.       c.gatewayProvider, err = gateway.NewGatewayProvider(&gateway.ProviderOptions{
    81.          Cfg: c.cfg,
    82.          APISIX: c.apisix,
    83.          APISIXClusterName: c.cfg.APISIX.DefaultClusterName,
    84.          KubeTranslator: c.translator,
    85.          RestConfig: nil,
    86.          KubeClient: c.kubeClient.Client,
    87.          MetricsCollector: c.MetricsCollector,
    88.          NamespaceProvider: c.namespaceProvider,
    89.       })
    90.       if err != nil {
    91.          ctx.Done()
    92.          return
    93.       }
    94.    }
    95.    // Init Phase
    96.    if err = c.namespaceProvider.Init(ctx); err != nil {
    97.       ctx.Done()
    98.       return
    99.    }
    100.    if err = c.apisixProvider.Init(ctx); err != nil {
    101.       ctx.Done()
    102.       return
    103.    }
    104.    // Run Phase
    105.    e := utils.ParallelExecutor{}
    106.    e.Add(func() {
    107.       c.checkClusterHealth(ctx, cancelFunc)
    108.    })
    109.    e.Add(func() {
    110.       c.informers.Run(ctx)
    111.    })
    112.    e.Add(func() {
    113.       c.namespaceProvider.Run(ctx)
    114.    })
    115.    e.Add(func() {
    116.       c.kubeProvider.Run(ctx)
    117.    })
    118.    e.Add(func() {
    119.       c.apisixProvider.Run(ctx)
    120.    })
    121.    e.Add(func() {
    122.       c.ingressProvider.Run(ctx)
    123.    })
    124.    if c.cfg.Kubernetes.EnableGatewayAPI {
    125.       e.Add(func() {
    126.          c.gatewayProvider.Run(ctx)
    127.       })
    128.    }
    129.    e.Add(func() {
    130.       c.resourceSyncLoop(ctx, c.cfg.ApisixResourceSyncInterval.Duration)
    131.    })
    132.    c.MetricsCollector.ResetLeader(true)
    133.    log.Infow("controller now is running as leader",
    134.       zap.String("namespace", c.namespace),
    135.       zap.String("pod", c.name),
    136.    )
    137.    <-ctx.Done()
    138.    e.Wait()
    139.    for _, execErr := range e.Errors() {
    140.       log.Error(execErr.Error())
    141.    }
    142.    if len(e.Errors()) > 0 {
    143.       log.Error("Start failed, abort...")
    144.       cancelFunc()
    145.    }
    146. }

    上面代码逻辑大致如下:

    • 初始化apisix集群配置信息:用于跟数据面服务apisix通信进行相关的配置操作

    • 初始化k8s资源inforrmers信息:用户对k8s各个资源进行监听及获取资源信息

    • 监听k8s集群namespace资源并处理

    • 监听k8s集群pod资源并处理

    • 监听k8s集群ingress资源并处理

    • 监听k8s集群中apisix自定义资源并处理,比如:apisixRoute等

    • 监听k8s集群endpoint资源并处理

    • 监听k8s集群secret资源并处理

    • 监听k8s集群configmap资源并处理

    • 监听k8s集群gateway资源并处理

    下面以处理endpoint资源为例进行说明,其他资源的监听处理类似,就不一一讲解了。

    服务发现

    进入到k8s/endpoint/provider.go中,我们先来看看实例初始化方法:

    1. func NewProvider(common *providertypes.Common, translator translation.Translator, namespaceProvider namespace.WatchingNamespaceProvider) (Provider, error) {
    2.    p := &endpointProvider{
    3.       cfg: common.Config,
    4.    }
    5.    base := &baseEndpointController{
    6.       Common: common,
    7.       translator: translator,
    8.       svcLister: common.SvcLister,
    9.       apisixUpstreamLister: common.ApisixUpstreamLister,
    10.    }
    11.    if common.Kubernetes.WatchEndpointSlices {
    12.       p.endpointSliceController = newEndpointSliceController(base, namespaceProvider)
    13.    } else {
    14.       p.endpointsController = newEndpointsController(base, namespaceProvider)
    15.    }
    16.    return p, nil
    17. }
    18. func newEndpointsController(base *baseEndpointController, namespaceProvider namespace.WatchingNamespaceProvider) *endpointsController {
    19.    ctl := &endpointsController{
    20.       baseEndpointController: base,
    21.       workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemFastSlowRateLimiter(1*time.Second, 60*time.Second, 5), "endpoints"),
    22.       workers: 1,
    23.       namespaceProvider: namespaceProvider,
    24.       epLister: base.EpLister,
    25.       epInformer: base.EpInformer,
    26.    }
    27.    ctl.epInformer.AddEventHandler(
    28.       cache.ResourceEventHandlerFuncs{
    29.          AddFunc: ctl.onAdd,
    30.          UpdateFunc: ctl.onUpdate,
    31.          DeleteFunc: ctl.onDelete,
    32.       },
    33.    )
    34.    return ctl
    35. }

    注意到最后代码的AddEventHandler,这里就是我们经常见到的informer的处理回调方法设置的地方。

    我们看到针对endpoint资源的增删改,设置了对应的回调方法。这里来看看onAdd方法:

    1. func (c *endpointsController) onAdd(obj interface{}) {
    2.    key, err := cache.MetaNamespaceKeyFunc(obj)
    3.    if err != nil {
    4.       log.Errorf("found endpoints object with bad namespace/name: %s, ignore it", err)
    5.       return
    6.    }
    7.    if !c.namespaceProvider.IsWatchingNamespace(key) {
    8.       return
    9.    }
    10.    log.Debugw("endpoints add event arrived",
    11.       zap.String("object-key", key))
    12.    c.workqueue.Add(&types.Event{
    13.       Type: types.EventAdd,
    14.       // TODO pass key.
    15.       Object: kube.NewEndpoint(obj.(*corev1.Endpoints)),
    16.    })
    17.    c.MetricsCollector.IncrEvents("endpoints", "add")
    18. }

    该方法的参数表示增加的endpoint资源对象信息。该方法主要是向endpoint的队列workqueue中增加一个事件对象:包含事件类型、增加的endpoint对象

    在最开始main我们介绍provider的启动方法中提到:执行了每个provider的run方法,下面我们来看下endpoint的provider的run方法:

    1. func (c *endpointsController) run(ctx context.Context) {
    2.    log.Info("endpoints controller started")
    3.    defer log.Info("endpoints controller exited")
    4.    defer c.workqueue.ShutDown()
    5.    if ok := cache.WaitForCacheSync(ctx.Done(), c.epInformer.HasSynced); !ok {
    6.       log.Error("informers sync failed")
    7.       return
    8.    }
    9.    handler := func() {
    10.       for {
    11.          obj, shutdown := c.workqueue.Get()
    12.          if shutdown {
    13.             return
    14.          }
    15.          err := c.sync(ctx, obj.(*types.Event))
    16.          c.workqueue.Done(obj)
    17.          c.handleSyncErr(obj, err)
    18.       }
    19.    }
    20.    for i := 0; i < c.workers; i++ {
    21.       go handler()
    22.    }
    23.    <-ctx.Done()
    24. }
    25. func (c *endpointsController) sync(ctx context.Context, ev *types.Event) error {
    26.    ep := ev.Object.(kube.Endpoint)
    27.    ns, err := ep.Namespace()
    28.    if err != nil {
    29.       return err
    30.    }
    31.    newestEp, err := c.epLister.GetEndpoint(ns, ep.ServiceName())
    32.    if err != nil {
    33.       if errors.IsNotFound(err) {
    34.          return c.syncEmptyEndpoint(ctx, ep)
    35.       }
    36.       return err
    37.    }
    38.    return c.syncEndpoint(ctx, newestEp)
    39. }
    40. func (c *baseEndpointController) syncEndpoint(ctx context.Context, ep kube.Endpoint) error {
    41.    log.Debugw("endpoint controller syncing endpoint",
    42.       zap.Any("endpoint", ep),
    43.    )
    44.    namespace, err := ep.Namespace()
    45.    if err != nil {
    46.       return err
    47.    }
    48.    svcName := ep.ServiceName()
    49.    svc, err := c.svcLister.Services(namespace).Get(svcName)
    50.    if err != nil {
    51.       if k8serrors.IsNotFound(err) {
    52.          return c.syncEmptyEndpoint(ctx, ep)
    53.       }
    54.       log.Errorf("failed to get service %s/%s: %s", namespace, svcName, err)
    55.       return err
    56.    }
    57.    switch c.Kubernetes.APIVersion {
    58.    case config.ApisixV2beta3:
    59.       var subsets []configv2beta3.ApisixUpstreamSubset
    60.       subsets = append(subsets, configv2beta3.ApisixUpstreamSubset{})
    61.       auKube, err := c.apisixUpstreamLister.V2beta3(namespace, svcName)
    62.       if err != nil {
    63.          if !k8serrors.IsNotFound(err) {
    64.             log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
    65.             return err
    66.          }
    67.       } else if auKube.V2beta3().Spec != nil && len(auKube.V2beta3().Spec.Subsets) > 0 {
    68.          subsets = append(subsets, auKube.V2beta3().Spec.Subsets...)
    69.       }
    70.       clusters := c.APISIX.ListClusters()
    71.       for _, port := range svc.Spec.Ports {
    72.          for _, subset := range subsets {
    73.             nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)
    74.             if err != nil {
    75.                log.Errorw("failed to translate upstream nodes",
    76.                   zap.Error(err),
    77.                   zap.Any("endpoints", ep),
    78.                   zap.Int32("port", port.Port),
    79.                )
    80.             }
    81.             name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)
    82.             for _, cluster := range clusters {
    83.                if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
    84.                   return err
    85.                }
    86.             }
    87.          }
    88.       }
    89.    case config.ApisixV2:
    90.       var subsets []configv2.ApisixUpstreamSubset
    91.       subsets = append(subsets, configv2.ApisixUpstreamSubset{})
    92.       auKube, err := c.apisixUpstreamLister.V2(namespace, svcName)
    93.       if err != nil {
    94.          if !k8serrors.IsNotFound(err) {
    95.             log.Errorf("failed to get ApisixUpstream %s/%s: %s", namespace, svcName, err)
    96.             return err
    97.          }
    98.       } else if auKube.V2().Spec != nil && len(auKube.V2().Spec.Subsets) > 0 {
    99.          subsets = append(subsets, auKube.V2().Spec.Subsets...)
    100.       }
    101.       clusters := c.APISIX.ListClusters()
    102.       for _, port := range svc.Spec.Ports {
    103.          for _, subset := range subsets {
    104.             nodes, err := c.translator.TranslateEndpoint(ep, port.Port, subset.Labels)
    105.             if err != nil {
    106.                log.Errorw("failed to translate upstream nodes",
    107.                   zap.Error(err),
    108.                   zap.Any("endpoints", ep),
    109.                   zap.Int32("port", port.Port),
    110.                )
    111.             }
    112.             name := apisixv1.ComposeUpstreamName(namespace, svcName, subset.Name, port.Port, types.ResolveGranularity.Endpoint)
    113.             for _, cluster := range clusters {
    114.                if err := c.SyncUpstreamNodesChangeToCluster(ctx, cluster, nodes, name); err != nil {
    115.                   return err
    116.                }
    117.             }
    118.          }
    119.       }
    120.    default:
    121.       panic(fmt.Errorf("unsupported ApisixUpstream version %v", c.Kubernetes.APIVersion))
    122.    }
    123.    return nil
    124. }

    上面代码主要逻辑就是:

    • 从endpoint的队列workqueue中获取事件对象

    • 根据endpoint信息从k8s集群中获取最新的namespace和service等信息

    • 根据namespace和servicename从k8s集群中获取apisix upstream资源信息

    • 对每一个service端口,向数据面服务apisix发送配置更新请求

    https://xiaorui.cc/archives/7369

  • 相关阅读:
    Java实现SQL分页
    合成孔径雷达地面运动目标检测技术研究——基于概率图(Matlab代码实现)
    Java SE 19 新增特性
    Request和Response介绍 [Tomcat][Servlet]
    计算机毕业设计之java+ssm植物养护管理系统
    conda创建环境、安装包到环境迁移
    命名空间和作用域
    Vmware 静态网络配置
    OpenSIPS 防扫描处理
    java中的IO流之序列化与反序列化(对象数据和文件的读写交互)
  • 原文地址:https://blog.csdn.net/m0_47495420/article/details/133781847