• client-go 实现一个自动创建ingress资源的controller


    需求:

    创建的service annotaion中如果包含ingress/http: "true"的时候,会自动将该服务的ingress资源创建出来,当删除掉ingress/http: "true"的时候,自动删除ingress, 同时将service删除掉的时候也会自动删除ingress

    代码目录结构

    1. tree ingress-expose
    2. ingress-expose
    3. ├── Dockerfile
    4. ├── go.mod
    5. ├── go.sum
    6. ├── main.go
    7. ├── manifests
    8. │   ├── ingress-manager-role-binding.yaml
    9. │   ├── ingress-manager-role.yaml
    10. │   ├── ingress-manager-sa.yaml
    11. │   ├── ingress-manager.yaml
    12. │   └── nginx.yaml
    13. └── pkg
    14. ├── controller.go
    15. ├── ingress.go
    16. └── service.go
    17. 3 directories, 12 files

    main.go

    1. package main
    2. import (
    3. "ingress-expose/pkg"
    4. "k8s.io/client-go/informers"
    5. "k8s.io/client-go/kubernetes"
    6. "k8s.io/client-go/rest"
    7. "k8s.io/client-go/tools/clientcmd"
    8. "log"
    9. )
    10. func main() {
    11. //1. config
    12. // 从集群外部创建一个cofnig
    13. config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    14. if err != nil {
    15. // 从集群内部创建一个config
    16. inClusterConfig, err := rest.InClusterConfig()
    17. if err != nil {
    18. log.Fatalln("can't get config", err)
    19. }
    20. config = inClusterConfig
    21. }
    22. //2. create clientSet type client
    23. clientSet, err := kubernetes.NewForConfig(config)
    24. if err != nil {
    25. log.Fatalln("can't create clientSet")
    26. }
    27. //3. create informer
    28. factory := informers.NewSharedInformerFactory(clientSet, 0)
    29. //4. create service、ingress Informer
    30. serviceInformer := factory.Core().V1().Services()
    31. ingressInformer := factory.Networking().V1().Ingresses()
    32. //5. add event handler
    33. controller := pkg.NewController(clientSet, serviceInformer, ingressInformer)
    34. //6.informer.Start
    35. stopCh := make(chan struct{})
    36. factory.Start(stopCh)
    37. factory.WaitForCacheSync(stopCh)
    38. controller.Run(stopCh)
    39. }

    controller.go

    1. package pkg
    2. import (
    3. "context"
    4. "fmt"
    5. "k8s.io/apimachinery/pkg/api/errors"
    6. v13 "k8s.io/apimachinery/pkg/apis/meta/v1"
    7. "k8s.io/apimachinery/pkg/util/runtime"
    8. "k8s.io/apimachinery/pkg/util/wait"
    9. informer "k8s.io/client-go/informers/core/v1"
    10. netInformer "k8s.io/client-go/informers/networking/v1"
    11. "k8s.io/client-go/kubernetes"
    12. coreLister "k8s.io/client-go/listers/core/v1"
    13. v1 "k8s.io/client-go/listers/networking/v1"
    14. "k8s.io/client-go/tools/cache"
    15. "k8s.io/client-go/util/workqueue"
    16. "time"
    17. )
    18. const (
    19. workNum = 5
    20. maxRetry = 10
    21. )
    22. type Controller struct {
    23. client kubernetes.Interface
    24. serviceLister coreLister.ServiceLister
    25. ingressLister v1.IngressLister
    26. // 创建一个限速workQueue
    27. queue workqueue.RateLimitingInterface
    28. }
    29. // 定义一个通用的方法使用workQueue
    30. func (c *Controller) enqueue(obj interface{}) {
    31. // 获取key
    32. key, err := cache.MetaNamespaceKeyFunc(obj)
    33. if err != nil {
    34. runtime.HandleError(err)
    35. }
    36. // 只需要将key到队列里即可,将key添加到队列
    37. c.queue.Add(key)
    38. }
    39. func (c *Controller) worker() {
    40. for c.processNextItem() {
    41. }
    42. }
    43. func (c *Controller) processNextItem() bool {
    44. item, shutdown := c.queue.Get()
    45. if shutdown {
    46. return false
    47. }
    48. defer c.queue.Done(item)
    49. key := item.(string)
    50. err := c.syncService(key)
    51. if err != nil {
    52. c.handlerError(key, err)
    53. }
    54. return true
    55. }
    56. func (c *Controller) syncService(key string) error {
    57. namespaceKey, name, err := cache.SplitMetaNamespaceKey(key)
    58. if err != nil {
    59. return err
    60. }
    61. // 删除,从indexer中查询namespace key 判断service 是否被删除
    62. service, err := c.serviceLister.Services(namespaceKey).Get(name)
    63. if errors.IsNotFound(err) {
    64. // 如果service已经被删除掉了,返回nil
    65. return nil
    66. }
    67. if err != nil {
    68. return err
    69. }
    70. // 新增和删除, 删除service 获取不到annotaion ok变为false走删除逻辑
    71. _, ok := service.GetAnnotations()["ingress/http"]
    72. ingress, err := c.ingressLister.Ingresses(namespaceKey).Get(name)
    73. if err != nil && !errors.IsNotFound(err) {
    74. return err
    75. }
    76. // 如果service 是存在的并且ingress是不存在的就去创建ingress
    77. if ok && errors.IsNotFound(err) {
    78. // create ingress
    79. fmt.Println("开始创建ingerss")
    80. ig := c.ConstructIngress(service)
    81. _, err := c.client.NetworkingV1().Ingresses(namespaceKey).Create(context.TODO(), ig, v13.CreateOptions{})
    82. if err != nil {
    83. return err
    84. }
    85. // 如果service不存在,并且ingress 不为nil,说明没有get到ingerss信息,则去删除
    86. } else if !ok && ingress != nil {
    87. fmt.Println(ok, nil)
    88. // delete ingress 如果ingress 没有ingress/http annotaions的时候就删除ingress
    89. fmt.Println("开始删除ingress")
    90. err := c.client.NetworkingV1().Ingresses(namespaceKey).Delete(context.TODO(), name, v13.DeleteOptions{})
    91. if err != nil {
    92. return err
    93. }
    94. }
    95. return nil
    96. }
    97. func (c *Controller) handlerError(key string, err error) {
    98. if c.queue.NumRequeues(key) > maxRetry {
    99. c.queue.AddRateLimited(key)
    100. return
    101. }
    102. runtime.HandleError(err)
    103. c.queue.Forget(key)
    104. }
    105. func (c *Controller) Run(stopCh chan struct{}) {
    106. for i := 0; i < workNum; i++ {
    107. go wait.Until(c.worker, time.Minute, stopCh)
    108. }
    109. <-stopCh
    110. }
    111. func NewController(client kubernetes.Interface, serviceInformer informer.ServiceInformer, ingressInformer netInformer.IngressInformer) Controller {
    112. // 创建控制器,indexer 减少与apiServer 的交互
    113. c := Controller{
    114. client: client,
    115. ingressLister: ingressInformer.Lister(),
    116. serviceLister: serviceInformer.Lister(),
    117. // 创建一个workQueue
    118. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
    119. }
    120. // 创建事件,注册到Informer,当有事件来了会调用事件对应的方法
    121. serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    122. AddFunc: c.AddService,
    123. UpdateFunc: c.UpdateService,
    124. })
    125. ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    126. DeleteFunc: c.DeleteIngress,
    127. })
    128. return c
    129. }

    ingress.go

    1. package pkg
    2. import (
    3. v14 "k8s.io/api/core/v1"
    4. v12 "k8s.io/api/networking/v1"
    5. v13 "k8s.io/apimachinery/pkg/apis/meta/v1"
    6. )
    7. func (c *Controller) DeleteIngress(obj interface{}) {
    8. ingress := obj.(*v12.Ingress)
    9. ownerReference := v13.GetControllerOf(ingress)
    10. // service 如果不存在就不需要处理了
    11. if ownerReference == nil {
    12. return
    13. }
    14. // kind如果是service则跳过
    15. if ownerReference.Kind != "Service" {
    16. return
    17. }
    18. c.queue.Add(ingress.Namespace + "/" + ingress.Name)
    19. }
    20. func (c *Controller) ConstructIngress(service *v14.Service) *v12.Ingress {
    21. ingress := v12.Ingress{}
    22. ingress.ObjectMeta.OwnerReferences = []v13.OwnerReference{
    23. // 创建的时候将service 和 ingress进行关联,当删除service的时候利用垃圾回收机制自动将ingress删掉
    24. *v13.NewControllerRef(service, v14.SchemeGroupVersion.WithKind("Service")),
    25. }
    26. ingress.Name = service.Name
    27. ingress.Namespace = service.Namespace
    28. ingress.ClusterName = "nginx"
    29. pathType := v12.PathTypePrefix
    30. ingressClass := ingress.ClusterName
    31. ingress.Spec = v12.IngressSpec{
    32. IngressClassName: &ingressClass,
    33. Rules: []v12.IngressRule{
    34. {
    35. Host: service.Name + ".example.com",
    36. IngressRuleValue: v12.IngressRuleValue{
    37. HTTP: &v12.HTTPIngressRuleValue{
    38. Paths: []v12.HTTPIngressPath{
    39. {
    40. Path: "/",
    41. PathType: &pathType,
    42. Backend: v12.IngressBackend{
    43. Service: &v12.IngressServiceBackend{
    44. Name: service.Name,
    45. Port: v12.ServiceBackendPort{
    46. Number: 80,
    47. },
    48. },
    49. },
    50. },
    51. },
    52. },
    53. },
    54. },
    55. },
    56. }
    57. return &ingress
    58. }

    service.go

    1. package pkg
    2. import "reflect"
    3. func (c *Controller) UpdateService(oldObj interface{}, newObj interface{}) {
    4. // todo 比较资源的annotation,如果内容一致,就不需要处理了
    5. if reflect.DeepEqual(oldObj, newObj) {
    6. return
    7. }
    8. c.enqueue(newObj)
    9. }
    10. func (c *Controller) AddService(obj interface{}) {
    11. c.enqueue(obj)
    12. }

  • 相关阅读:
    vue源码分析-事件机制
    security CSRF漏洞保护
    柱状图:带误差棒
    centos pip失效
    spring security(二)--授权
    网络安全基础(一)网安考证必备知识:防火墙隧道的类型,防火墙隧道技术,密码学,常见的对称加密算法和非对称加密算法
    过滤器、拦截器、AOP、ControllerAdvcie的使用对比、执行顺序及代码教程
    docker 搭建 redis 集群
    《微服务实战》 第二十九章 分布式事务框架seata AT模式
    PA7 The Traveling Salesman Problem
  • 原文地址:https://blog.csdn.net/weixin_43798031/article/details/134006228