需求:
创建的service annotaion中如果包含ingress/http: "true"的时候,会自动将该服务的ingress资源创建出来,当删除掉ingress/http: "true"的时候,自动删除ingress, 同时将service删除掉的时候也会自动删除ingress
代码目录结构
- tree ingress-expose
-
- ingress-expose
- ├── Dockerfile
- ├── go.mod
- ├── go.sum
- ├── main.go
- ├── manifests
- │ ├── ingress-manager-role-binding.yaml
- │ ├── ingress-manager-role.yaml
- │ ├── ingress-manager-sa.yaml
- │ ├── ingress-manager.yaml
- │ └── nginx.yaml
- └── pkg
- ├── controller.go
- ├── ingress.go
- └── service.go
-
- 3 directories, 12 files
main.go
- package main
-
- import (
- "ingress-expose/pkg"
- "k8s.io/client-go/informers"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/rest"
- "k8s.io/client-go/tools/clientcmd"
- "log"
- )
-
- func main() {
- //1. config
-
- // 从集群外部创建一个cofnig
- config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
- if err != nil {
- // 从集群内部创建一个config
- inClusterConfig, err := rest.InClusterConfig()
- if err != nil {
- log.Fatalln("can't get config", err)
- }
- config = inClusterConfig
- }
-
- //2. create clientSet type client
- clientSet, err := kubernetes.NewForConfig(config)
-
- if err != nil {
- log.Fatalln("can't create clientSet")
- }
-
- //3. create informer
- factory := informers.NewSharedInformerFactory(clientSet, 0)
-
- //4. create service、ingress Informer
- serviceInformer := factory.Core().V1().Services()
- ingressInformer := factory.Networking().V1().Ingresses()
-
- //5. add event handler
- controller := pkg.NewController(clientSet, serviceInformer, ingressInformer)
-
- //6.informer.Start
- stopCh := make(chan struct{})
- factory.Start(stopCh)
- factory.WaitForCacheSync(stopCh)
- controller.Run(stopCh)
- }
controller.go
- package pkg
-
- import (
- "context"
- "fmt"
- "k8s.io/apimachinery/pkg/api/errors"
- v13 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/util/runtime"
- "k8s.io/apimachinery/pkg/util/wait"
- informer "k8s.io/client-go/informers/core/v1"
- netInformer "k8s.io/client-go/informers/networking/v1"
- "k8s.io/client-go/kubernetes"
- coreLister "k8s.io/client-go/listers/core/v1"
- v1 "k8s.io/client-go/listers/networking/v1"
- "k8s.io/client-go/tools/cache"
- "k8s.io/client-go/util/workqueue"
- "time"
- )
-
- const (
- workNum = 5
- maxRetry = 10
- )
-
- type Controller struct {
- client kubernetes.Interface
- serviceLister coreLister.ServiceLister
- ingressLister v1.IngressLister
- // 创建一个限速workQueue
- queue workqueue.RateLimitingInterface
- }
-
- // 定义一个通用的方法使用workQueue
- func (c *Controller) enqueue(obj interface{}) {
- // 获取key
- key, err := cache.MetaNamespaceKeyFunc(obj)
- if err != nil {
- runtime.HandleError(err)
- }
- // 只需要将key到队列里即可,将key添加到队列
- c.queue.Add(key)
- }
-
- func (c *Controller) worker() {
- for c.processNextItem() {
- }
- }
-
- func (c *Controller) processNextItem() bool {
- item, shutdown := c.queue.Get()
- if shutdown {
- return false
- }
-
- defer c.queue.Done(item)
- key := item.(string)
-
- err := c.syncService(key)
- if err != nil {
- c.handlerError(key, err)
- }
- return true
- }
-
- func (c *Controller) syncService(key string) error {
-
- namespaceKey, name, err := cache.SplitMetaNamespaceKey(key)
- if err != nil {
- return err
- }
- // 删除,从indexer中查询namespace key 判断service 是否被删除
- service, err := c.serviceLister.Services(namespaceKey).Get(name)
- if errors.IsNotFound(err) {
- // 如果service已经被删除掉了,返回nil
- return nil
- }
- if err != nil {
- return err
- }
-
- // 新增和删除, 删除service 获取不到annotaion ok变为false走删除逻辑
- _, ok := service.GetAnnotations()["ingress/http"]
- ingress, err := c.ingressLister.Ingresses(namespaceKey).Get(name)
- if err != nil && !errors.IsNotFound(err) {
- return err
- }
-
- // 如果service 是存在的并且ingress是不存在的就去创建ingress
- if ok && errors.IsNotFound(err) {
- // create ingress
- fmt.Println("开始创建ingerss")
- ig := c.ConstructIngress(service)
-
- _, err := c.client.NetworkingV1().Ingresses(namespaceKey).Create(context.TODO(), ig, v13.CreateOptions{})
- if err != nil {
- return err
- }
- // 如果service不存在,并且ingress 不为nil,说明没有get到ingerss信息,则去删除
- } else if !ok && ingress != nil {
- fmt.Println(ok, nil)
- // delete ingress 如果ingress 没有ingress/http annotaions的时候就删除ingress
- fmt.Println("开始删除ingress")
- err := c.client.NetworkingV1().Ingresses(namespaceKey).Delete(context.TODO(), name, v13.DeleteOptions{})
- if err != nil {
- return err
- }
- }
- return nil
- }
-
- func (c *Controller) handlerError(key string, err error) {
- if c.queue.NumRequeues(key) > maxRetry {
- c.queue.AddRateLimited(key)
- return
- }
- runtime.HandleError(err)
- c.queue.Forget(key)
- }
-
- func (c *Controller) Run(stopCh chan struct{}) {
- for i := 0; i < workNum; i++ {
- go wait.Until(c.worker, time.Minute, stopCh)
- }
- <-stopCh
- }
-
- func NewController(client kubernetes.Interface, serviceInformer informer.ServiceInformer, ingressInformer netInformer.IngressInformer) Controller {
- // 创建控制器,indexer 减少与apiServer 的交互
- c := Controller{
- client: client,
- ingressLister: ingressInformer.Lister(),
- serviceLister: serviceInformer.Lister(),
- // 创建一个workQueue
- queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
- }
-
- // 创建事件,注册到Informer,当有事件来了会调用事件对应的方法
- serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- AddFunc: c.AddService,
- UpdateFunc: c.UpdateService,
- })
-
- ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
- DeleteFunc: c.DeleteIngress,
- })
-
- return c
- }
ingress.go
- package pkg
-
- import (
- v14 "k8s.io/api/core/v1"
- v12 "k8s.io/api/networking/v1"
- v13 "k8s.io/apimachinery/pkg/apis/meta/v1"
- )
-
- func (c *Controller) DeleteIngress(obj interface{}) {
- ingress := obj.(*v12.Ingress)
- ownerReference := v13.GetControllerOf(ingress)
- // service 如果不存在就不需要处理了
- if ownerReference == nil {
- return
- }
- // kind如果是service则跳过
- if ownerReference.Kind != "Service" {
- return
- }
- c.queue.Add(ingress.Namespace + "/" + ingress.Name)
- }
-
- func (c *Controller) ConstructIngress(service *v14.Service) *v12.Ingress {
- ingress := v12.Ingress{}
- ingress.ObjectMeta.OwnerReferences = []v13.OwnerReference{
- // 创建的时候将service 和 ingress进行关联,当删除service的时候利用垃圾回收机制自动将ingress删掉
- *v13.NewControllerRef(service, v14.SchemeGroupVersion.WithKind("Service")),
- }
- ingress.Name = service.Name
- ingress.Namespace = service.Namespace
- ingress.ClusterName = "nginx"
- pathType := v12.PathTypePrefix
- ingressClass := ingress.ClusterName
- ingress.Spec = v12.IngressSpec{
- IngressClassName: &ingressClass,
- Rules: []v12.IngressRule{
- {
- Host: service.Name + ".example.com",
- IngressRuleValue: v12.IngressRuleValue{
- HTTP: &v12.HTTPIngressRuleValue{
- Paths: []v12.HTTPIngressPath{
- {
- Path: "/",
- PathType: &pathType,
- Backend: v12.IngressBackend{
- Service: &v12.IngressServiceBackend{
- Name: service.Name,
- Port: v12.ServiceBackendPort{
- Number: 80,
- },
- },
- },
- },
- },
- },
- },
- },
- },
- }
- return &ingress
- }
service.go
- package pkg
-
- import "reflect"
-
- func (c *Controller) UpdateService(oldObj interface{}, newObj interface{}) {
- // todo 比较资源的annotation,如果内容一致,就不需要处理了
- if reflect.DeepEqual(oldObj, newObj) {
- return
- }
- c.enqueue(newObj)
-
- }
-
- func (c *Controller) AddService(obj interface{}) {
- c.enqueue(obj)
- }