ingress-nginx 工作原理(上):服务初始化

Table of Contents

ingress-nginx 是 K8s ingress controller 的一种实现。这段时间在看它的源代码,源代码虽然不算很长(核心代码一万五千行左右), 但是有很多的 K8s 依赖(除了依赖 K8s API server 和 client-go 以外,还依赖了 apimachinery 中的组件),如果对 K8s 的组件不是 很熟的话,读起来还是有很多的困扰。

本文只介绍 ingress-nginx 工作原理,依赖组件(queue、watch 等)会在后续的文章介绍。源码主要由两个模块组成:cmd 和 internal, cmd 是入口(main),internal 按依赖组件分别进行了二次封装(file、ingress、k8s 等)。

工作原理如下图(图片看不太清,可点击下载到本地查看):

ingress-nginx-1-1.svg

按照图片从上到下看已经把 ingress-nginx 大体流程显示清楚了,下面将站在源码的角度从服务的生命周期(服务初始化、服务启动、 服务运行、服务停止)简单说一下实现上的一些细节。篇幅较长,分为上、下两篇文章。

另外,文中引用的源代码是精简过后的,删掉无关的代码。代码和文字说明部分用序号关联: (1) 对应 1.


1. 服务初始化

服务初始化,可以拆分成 K8s 客户端创建、NGINXController、对外 HTTP 服务、启动服务几块。

// cmd/nginx/main.go
func main() {
    // (1)
    showVersion, conf, err := parseFlags()
    if showVersion {
        os.Exit(0)
    }

    // (2)
    kubeClient, err := createApiserverClient(conf.APIServerHost, conf.KubeConfigFile)
    if err != nil {
        handleFatalInitError(err)
    }

    // (3)
    if conf.Namespace != "" {
        _, err = kubeClient.CoreV1().Namespaces().Get(conf.Namespace, metav1.GetOptions{})
        if err != nil {
            glog.Fatalf("no namespace with name %v found: %v", conf.Namespace, err)
        }
    }

    conf.Client = kubeClient

    // (4)
    ngx := controller.NewNGINXController(conf, fs)

    // (7)
    go handleSigterm(ngx, func(code int) {
        os.Exit(code)
    })

    // (5)
    mux := http.NewServeMux()
    go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux)

    // (6)
    ngx.Start()
}
  1. 解析命令行参数,得到 controller.Configuration 对象,Configuration 涵盖了所有 Ingress controller 所有需要的配置,比如 K8s APIServer相关配置参数
  2. 创建 ApiserverClient,根据 Configuration 的配置创建 K8s Client,创建之后调用 ServerVersion() API 以判断 API Server 是否可以正常连接。因为在一些环境中第一次可能连不上 API Server,所以这里有重试策略。

    // cmd/nginx/main.go
    func createApiserverClient(apiserverHost string, kubeConfig string) (*kubernetes.Clientset, error) {
        var v *discovery.Info
    
        // https://github.com/kubernetes/ingress-nginx/issues/1968
        // 在一些环境中可能第一次连不上 API server
        defaultRetry := wait.Backoff{
            Steps:    10,              // 尝试次数
            Duration: 1 * time.Second, // 基准的尝试时间
            Factor:   1.5,             // 持续时间 = 迭代次数 * 因子
            Jitter:   0.1,             // 每次迭代应用的抖动量
        }
    
        var lastErr error
        retries := 0
        glog.V(2).Info("trying to discover Kubernetes version")
        err = wait.ExponentialBackoff(defaultRetry, func() (bool, error) {
            v, err = client.Discovery().ServerVersion() // 查看版本号
    
            if err == nil {
                return true, nil
            }
    
            lastErr = err
    
            retries++
            return false, nil
        })
    }
    
    

    waitk8s.io/apimachinery 的组件, ExponentialBackoff 会根据 Backoff 提供的参数(将决定回调函数执行之后的 Sleep 间隔)去执行回调函数, 是否继续执行由回调函数的返回值决定。代码不长,感兴趣可以自行查看 apimachinery/pkg/util/wait/wait.go#L175 源代码。

  3. ingress-nginx 监听的 namespace 由参数中的 watch-namespace 来决定,如果为空则表示默认监听所有的 namespace
  4. 创建 NGINXController,根据名字就知道了,NGINXController 是 ingress-controller 的 controller。

    // internal/ingress/controller/nginx.go
    func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController {
        // 环境变量查看 NGINX_BINARY,如果未设置则使用 nginxBinary = "/usr/sbin/nginx"
        ngx := os.Getenv("NGINX_BINARY")
        if ngx == "" {
            ngx = nginxBinary
        }
    
        n := &NGINXController{
            binary: ngx,
            resolver:        h,
            cfg:             config,
            // (4.1)
            stopCh:   make(chan struct{}),
            // (4.2)
            updateCh: channels.NewRingChannel(1024),
            // (4.3)
            fileSystem: fs,
            // (4.4)
            runningConfig: &ingress.Configuration{},
        }
    
        // (4.5)
        n.store = store.New(
            fs,
            n.updateCh)
    
        // (4.6)
        n.syncQueue = task.NewTaskQueue(n.syncIngress)
    
        // (4.7)
        onTemplateChange := func() {
            template, err := ngx_template.NewTemplate(tmplPath, fs)
            if err != nil {
                // this error is different from the rest because it must be clear why nginx is not working
                glog.Errorf(`
    -------------------------------------------------------------------------------
    Error loading new template : %v
    -------------------------------------------------------------------------------
    `, err)
                return
            }
    
            n.t = template
            glog.Info("new NGINX template loaded")
            n.SetForceReload(true)
        }
    
        ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs)
        if err != nil {
            glog.Fatalf("invalid NGINX template: %v", err)
        }
    
        n.t = ngxTpl
    
        _, err = watch.NewFileWatcher(tmplPath, onTemplateChange)
        if err != nil {
            glog.Fatalf("unexpected error creating file watcher: %v", err)
        }
    
        filesToWatch := []string{}
        err := filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
    
            if info.IsDir() {
                return nil
            }
    
            filesToWatch = append(filesToWatch, path)
            return nil
        })
    
        if err != nil {
            glog.Fatalf("unexpected error creating file watcher: %v", err)
        }
    
        for _, f := range filesToWatch {
            _, err = watch.NewFileWatcher(f, func() {
                glog.Info("file %v changed. Reloading NGINX", f)
                n.SetForceReload(true)
            })
            if err != nil {
                glog.Fatalf("unexpected error creating file watcher: %v", err)
            }
        }
    }
    
    
    1. stopCh 服务是否需要终止管道,终止服务时只需要向管道中写入数据即可。
    2. updateCh 是一个 RingChannel -> 环形管道,buffer 长度为 1024,环形的意思是 不会阻塞管道写入,当空间用完时,新的元素将覆盖最早的元素 , 使用了 eapache/channels 的实现方案,它对标准的 Channel 进行了扩展,常规的数据结构,后面我会专门写篇文章分析其数据结构。
    3. fileSystem 封装了 K8s 的 filesystem(它也封装了标准库的一些文件操作),ingress-nginx 需要对本机的目录、文件进行操作, 比如写 nginx 配置文件更新之后写入。比较简单,不展开讲了。
    4. runningConfig 保存了一份当前 nginx 运行配置对应的 ingress.Configuration ,其主要作用在于当 ingress 需要更新时, 判断新的配置与旧的配置是否相同,如果完全相同的话,就不需要 reload nginx 了
    5. store 对应的是 K8sStore 的实现,封装了 K8s 的 Informer ,注意这里传入的 updateCh 和 NGINXController 中的 updateCh 是同一个(指针)。创建 Store 做了很多事情,需要注意(代码比较长,我进行了精简,只看思路):

            // ingress/controller/store/store.go
      // 参数省略了
      func New() Storer {
          store := &k8sStore{
              informers: &Informer{},
              listers:   &Lister{},
              updateCh:  updateCh,
          }
      
          // (4.5.1)
          eventBroadcaster := record.NewBroadcaster()
          eventBroadcaster.StartLogging(glog.Infof)
          eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{
              Interface: client.CoreV1().Events(namespace),
          })
          recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{
              Component: "nginx-ingress-controller",
          })
      
          // (4.5.2)
          // create informers factory, enable and assign required informers
          infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {})
          store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
          store.listers.Ingress.Store = store.informers.Ingress.GetStore()
      
          // Endpoint, Secret, ConfigMap, Service 和 Ingress 差不多,代码省略了
      
          ingEventHandler := cache.ResourceEventHandlerFuncs{
              AddFunc: func(obj interface{}) {
                  ing := obj.(*extensions.Ingress)
                  if !class.IsValid(ing) {
                      a, _ := parser.GetStringAnnotation(class.IngressKey, ing)
                      return
                  }
                  recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
      
                  // (4.5.3)
                  updateCh.In() <- Event{
                      Type: CreateEvent,
                      Obj:  obj,
                  }
              },
              DeleteFunc: func(obj interface{}) {
                  ing, ok := obj.(*extensions.Ingress)
                  recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name))
      
                  // (4.5.3)
                  updateCh.In() <- Event{
                      Type: DeleteEvent,
                      Obj:  obj,
                  }
              },
              UpdateFunc: func(old, cur interface{}) {
                  oldIng := old.(*extensions.Ingress)
                  curIng := cur.(*extensions.Ingress)
                  validOld := class.IsValid(oldIng)
                  validCur := class.IsValid(curIng)
                  if !validOld && validCur {
                      recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
                  } else if validOld && !validCur {
                      recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
                  } else if validCur && !reflect.DeepEqual(old, cur) {
                      recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name))
                  }
      
                  // (4.5.3)
                  updateCh.In() <- Event{
                      Type: UpdateEvent,
                      Obj:  cur,
                  }
              },
          }
          store.informers.Ingress.AddEventHandler(ingEventHandler)
          return store
      }
      
      1. 使用了 K8s 的 record 和 watch 组件,简单来讲就是创建一个广播器,一方输入数据(recorder),输入的是事件(对象, 类型,其它附加信息)-> recorder.Eventf ,另外一方是 watcher,watcher 可以是 Logger、Sink 等,广播器负责将输入 的数据广播给 watcher,后面我会写篇文章分析
      2. 创建了 K8s 的 informers,informers 是 Client-go 的非常关键的的组件,内部实现极其复杂。当需要 List/Get K8s 中的 Objects 时,可以直接用 Informer 实例的 Lister() 方法。它最重要的功能是 监听事件并触发回调函数 。它可以监听 resource 的创建、更新和删除三种事件类型,回调函数只需要实现 AddFunc DeleteFunc UpdateFunc 三个函数即可。 上面的 ingEventHandler 对应的是 ingress 的 EventHandler。这篇文章 对 Informer 实现进行了一定的说明,内部实现细节有时间我研究研究。
      3. 当收到增删改的事件时,解析出事件对应的类型和 Objects,然后添加到 updateCh
  5. 创建 HTTP Server,添加 URL 及其 handler。相比其他地方,这里的代码很简单,看上面的大图即可。这里的 registerHandlers 居然是在协程里做的,如果是纯粹的 HTTP Server 的话,还会这么做吗?
  6. 启动服务,下篇文章详解。
  7. 处理 term 信号,下篇文章详解。

下篇文章

First created: 2018-06-13 10:52:40
Last updated: 2022-12-11 Sun 12:49
Power by Emacs 27.1 (Org mode 9.4.4)