ingress-nginx 工作原理(上):服务初始化
Table of Contents
ingress-nginx 是 K8s ingress controller 的一种实现。这段时间在看它的源代码,源代码虽然不算很长(核心代码一万五千行左右), 但是有很多的 K8s 依赖(除了依赖 K8s API server 和 client-go 以外,还依赖了 apimachinery 中的组件),如果对 K8s 的组件不是 很熟的话,读起来还是有很多的困扰。
本文只介绍 ingress-nginx 工作原理,依赖组件(queue、watch 等)会在后续的文章介绍。源码主要由两个模块组成:cmd 和 internal, cmd 是入口(main),internal 按依赖组件分别进行了二次封装(file、ingress、k8s 等)。
工作原理如下图(图片看不太清,可点击下载到本地查看):
按照图片从上到下看已经把 ingress-nginx 大体流程显示清楚了,下面将站在源码的角度从服务的生命周期(服务初始化、服务启动、 服务运行、服务停止)简单说一下实现上的一些细节。篇幅较长,分为上、下两篇文章。
另外,文中引用的源代码是精简过后的,删掉无关的代码。代码和文字说明部分用序号关联: (1)
对应 1.
。
1. 服务初始化
服务初始化,可以拆分成 K8s 客户端创建、NGINXController、对外 HTTP 服务、启动服务几块。
// cmd/nginx/main.go func main() { // (1) showVersion, conf, err := parseFlags() if showVersion { os.Exit(0) } // (2) kubeClient, err := createApiserverClient(conf.APIServerHost, conf.KubeConfigFile) if err != nil { handleFatalInitError(err) } // (3) if conf.Namespace != "" { _, err = kubeClient.CoreV1().Namespaces().Get(conf.Namespace, metav1.GetOptions{}) if err != nil { glog.Fatalf("no namespace with name %v found: %v", conf.Namespace, err) } } conf.Client = kubeClient // (4) ngx := controller.NewNGINXController(conf, fs) // (7) go handleSigterm(ngx, func(code int) { os.Exit(code) }) // (5) mux := http.NewServeMux() go registerHandlers(conf.EnableProfiling, conf.ListenPorts.Health, ngx, mux) // (6) ngx.Start() }
- 解析命令行参数,得到
controller.Configuration
对象,Configuration 涵盖了所有 Ingress controller 所有需要的配置,比如 K8s APIServer相关配置参数 创建 ApiserverClient,根据 Configuration 的配置创建 K8s Client,创建之后调用
ServerVersion()
API 以判断 API Server 是否可以正常连接。因为在一些环境中第一次可能连不上 API Server,所以这里有重试策略。// cmd/nginx/main.go func createApiserverClient(apiserverHost string, kubeConfig string) (*kubernetes.Clientset, error) { var v *discovery.Info // https://github.com/kubernetes/ingress-nginx/issues/1968 // 在一些环境中可能第一次连不上 API server defaultRetry := wait.Backoff{ Steps: 10, // 尝试次数 Duration: 1 * time.Second, // 基准的尝试时间 Factor: 1.5, // 持续时间 = 迭代次数 * 因子 Jitter: 0.1, // 每次迭代应用的抖动量 } var lastErr error retries := 0 glog.V(2).Info("trying to discover Kubernetes version") err = wait.ExponentialBackoff(defaultRetry, func() (bool, error) { v, err = client.Discovery().ServerVersion() // 查看版本号 if err == nil { return true, nil } lastErr = err retries++ return false, nil }) }
wait
是k8s.io/apimachinery
的组件,ExponentialBackoff
会根据Backoff
提供的参数(将决定回调函数执行之后的 Sleep 间隔)去执行回调函数, 是否继续执行由回调函数的返回值决定。代码不长,感兴趣可以自行查看apimachinery/pkg/util/wait/wait.go#L175
源代码。- ingress-nginx 监听的 namespace 由参数中的
watch-namespace
来决定,如果为空则表示默认监听所有的 namespace 创建 NGINXController,根据名字就知道了,NGINXController 是 ingress-controller 的 controller。
// internal/ingress/controller/nginx.go func NewNGINXController(config *Configuration, fs file.Filesystem) *NGINXController { // 环境变量查看 NGINX_BINARY,如果未设置则使用 nginxBinary = "/usr/sbin/nginx" ngx := os.Getenv("NGINX_BINARY") if ngx == "" { ngx = nginxBinary } n := &NGINXController{ binary: ngx, resolver: h, cfg: config, // (4.1) stopCh: make(chan struct{}), // (4.2) updateCh: channels.NewRingChannel(1024), // (4.3) fileSystem: fs, // (4.4) runningConfig: &ingress.Configuration{}, } // (4.5) n.store = store.New( fs, n.updateCh) // (4.6) n.syncQueue = task.NewTaskQueue(n.syncIngress) // (4.7) onTemplateChange := func() { template, err := ngx_template.NewTemplate(tmplPath, fs) if err != nil { // this error is different from the rest because it must be clear why nginx is not working glog.Errorf(` ------------------------------------------------------------------------------- Error loading new template : %v ------------------------------------------------------------------------------- `, err) return } n.t = template glog.Info("new NGINX template loaded") n.SetForceReload(true) } ngxTpl, err := ngx_template.NewTemplate(tmplPath, fs) if err != nil { glog.Fatalf("invalid NGINX template: %v", err) } n.t = ngxTpl _, err = watch.NewFileWatcher(tmplPath, onTemplateChange) if err != nil { glog.Fatalf("unexpected error creating file watcher: %v", err) } filesToWatch := []string{} err := filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } filesToWatch = append(filesToWatch, path) return nil }) if err != nil { glog.Fatalf("unexpected error creating file watcher: %v", err) } for _, f := range filesToWatch { _, err = watch.NewFileWatcher(f, func() { glog.Info("file %v changed. Reloading NGINX", f) n.SetForceReload(true) }) if err != nil { glog.Fatalf("unexpected error creating file watcher: %v", err) } } }
stopCh
服务是否需要终止管道,终止服务时只需要向管道中写入数据即可。updateCh
是一个 RingChannel -> 环形管道,buffer 长度为 1024,环形的意思是 不会阻塞管道写入,当空间用完时,新的元素将覆盖最早的元素 , 使用了 eapache/channels 的实现方案,它对标准的 Channel 进行了扩展,常规的数据结构,后面我会专门写篇文章分析其数据结构。fileSystem
封装了 K8s 的 filesystem(它也封装了标准库的一些文件操作),ingress-nginx 需要对本机的目录、文件进行操作, 比如写 nginx 配置文件更新之后写入。比较简单,不展开讲了。runningConfig
保存了一份当前 nginx 运行配置对应的ingress.Configuration
,其主要作用在于当 ingress 需要更新时, 判断新的配置与旧的配置是否相同,如果完全相同的话,就不需要 reload nginx 了store 对应的是 K8sStore 的实现,封装了 K8s 的
Informer
,注意这里传入的updateCh
和 NGINXController 中的updateCh
是同一个(指针)。创建 Store 做了很多事情,需要注意(代码比较长,我进行了精简,只看思路):// ingress/controller/store/store.go // 参数省略了 func New() Storer { store := &k8sStore{ informers: &Informer{}, listers: &Lister{}, updateCh: updateCh, } // (4.5.1) eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{ Interface: client.CoreV1().Events(namespace), }) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{ Component: "nginx-ingress-controller", }) // (4.5.2) // create informers factory, enable and assign required informers infFactory := informers.NewFilteredSharedInformerFactory(client, resyncPeriod, namespace, func(*metav1.ListOptions) {}) store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer() store.listers.Ingress.Store = store.informers.Ingress.GetStore() // Endpoint, Secret, ConfigMap, Service 和 Ingress 差不多,代码省略了 ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ing := obj.(*extensions.Ingress) if !class.IsValid(ing) { a, _ := parser.GetStringAnnotation(class.IngressKey, ing) return } recorder.Eventf(ing, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) // (4.5.3) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: func(obj interface{}) { ing, ok := obj.(*extensions.Ingress) recorder.Eventf(ing, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", ing.Namespace, ing.Name)) // (4.5.3) updateCh.In() <- Event{ Type: DeleteEvent, Obj: obj, } }, UpdateFunc: func(old, cur interface{}) { oldIng := old.(*extensions.Ingress) curIng := cur.(*extensions.Ingress) validOld := class.IsValid(oldIng) validCur := class.IsValid(curIng) if !validOld && validCur { recorder.Eventf(curIng, corev1.EventTypeNormal, "CREATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else if validOld && !validCur { recorder.Eventf(curIng, corev1.EventTypeNormal, "DELETE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } else if validCur && !reflect.DeepEqual(old, cur) { recorder.Eventf(curIng, corev1.EventTypeNormal, "UPDATE", fmt.Sprintf("Ingress %s/%s", curIng.Namespace, curIng.Name)) } // (4.5.3) updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } }, } store.informers.Ingress.AddEventHandler(ingEventHandler) return store }
- 使用了 K8s 的 record 和 watch 组件,简单来讲就是创建一个广播器,一方输入数据(recorder),输入的是事件(对象,
类型,其它附加信息)->
recorder.Eventf
,另外一方是 watcher,watcher 可以是 Logger、Sink 等,广播器负责将输入 的数据广播给 watcher,后面我会写篇文章分析 - 创建了 K8s 的 informers,informers 是 Client-go 的非常关键的的组件,内部实现极其复杂。当需要 List/Get K8s 中的
Objects 时,可以直接用 Informer 实例的 Lister() 方法。它最重要的功能是 监听事件并触发回调函数 。它可以监听
resource 的创建、更新和删除三种事件类型,回调函数只需要实现
AddFunc
DeleteFunc
UpdateFunc
三个函数即可。 上面的ingEventHandler
对应的是 ingress 的 EventHandler。这篇文章 对 Informer 实现进行了一定的说明,内部实现细节有时间我研究研究。 - 当收到增删改的事件时,解析出事件对应的类型和 Objects,然后添加到
updateCh
中
- 使用了 K8s 的 record 和 watch 组件,简单来讲就是创建一个广播器,一方输入数据(recorder),输入的是事件(对象,
类型,其它附加信息)->
- 创建 HTTP Server,添加 URL 及其 handler。相比其他地方,这里的代码很简单,看上面的大图即可。这里的
registerHandlers
居然是在协程里做的,如果是纯粹的 HTTP Server 的话,还会这么做吗? - 启动服务,下篇文章详解。
- 处理 term 信号,下篇文章详解。
下篇文章。