ingress-nginx 工作原理(下):服务启动、运行和停止
Table of Contents
1. 服务启动
在 main
中 ngx.Start()
将启动服务。
// internal/ingress/controller/nginx.go func (n *NGINXController) Start() { // (1) n.store.Run(n.stopCh) // (2) cmd := exec.Command(n.binary, "-c", cfgPath) // put nginx in another process group to prevent it // to receive signals meant for the controller // 把 nginx 扔进另外一个进程组,防止他接受控制器的信号 cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, Pgid: 0, } n.start(cmd) // (3) go n.syncQueue.Run(time.Second, n.stopCh) // (4) // force initial sync n.syncQueue.Enqueue(&extensions.Ingress{}) // (5) for { select { case err := <-n.ngxErrCh: if n.isShuttingDown { break } if process.IsRespawnIfRequired(err) { process.WaitUntilPortIsAvailable(n.cfg.ListenPorts.HTTP) // release command resources cmd.Process.Release() // start a new nginx master process if the controller is not being stopped cmd = exec.Command(n.binary, "-c", cfgPath) cmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, Pgid: 0, } n.start(cmd) } case event := <-n.updateCh.Out(): if n.isShuttingDown { break } if evt, ok := event.(store.Event); ok { glog.V(3).Infof("Event %v received - object %v", evt.Type, evt.Obj) if evt.Type == store.ConfigurationEvent { n.SetForceReload(true) } n.syncQueue.Enqueue(evt.Obj) } else { glog.Warningf("unexpected event type received %T", event) } case <-n.stopCh: break } } }
store.Run
会调用informers.Run
进而触发 Endpoint、Service、ingress等 informer 的Run
, 总之是触发 Informer 开始监听各种 Add、Delete、Update 事件,当接收到事件时,写入updateCh
中- 启动 nginx 进程(配置路径:
"/etc/nginx/nginx.conf"
),nginx 运行出错时会将错误写入到ngxErrCh
中 - 启动
syncQueue=,=syncQueue
是个task.Queue
,任务队列。它所做的事情是不断的从队列中取出元素,回调某个函数,然后把该元素删除。 这里的回调函数是syncIngress
在初始化 NGINXController 时设置的。syncIngress
完成了将传入的 Ingress Event 处理之后生成新的 nginx 配置文件,然后 reload nginx,在文章后面会详细介绍。至于task.Queue
我会单独写一篇文章说明内部实现 - 向
syncQueue
压入一个空的 Ingress 元素(syncQueue
刚启动,这是第一个队列中的元素),将触发全量的同步 Ingress 。 注意:虽然store.Run
之后,在这个时间段内有可能会有资源发生变更,但是变更之后的资源会记录在updateCh
中,而不是syncQueue
, 并不会触发 nginx reload - 服务运行
2. 服务运行
服务正常运行是不断从 K8s 接受资源变更事件,然后转换成 nginx 新配置,进而 reload nginx 的过程。
也就是上面 for ... select
中 case event := <-n.updateCh.Out()
中所做的事情:从 updateCh
中获取事件,然后添加到 syncQueue
,
因为 syncQueue
已经在运行状态了,新添加的事件对象(资源)很快会被 syncIngress
处理。
让我们回顾一下整个过程(这是 ingress-nginx 的核心):
- 以
syncIngress
函数为参数初始化syncQueue
&& 创建一个大小为 1024 的环形 ChannelupdateCh
- 初始时向
syncQueue
添加一个空的 Ingress - informer 监听 K8s 资源变化触发
AddFunc
DeleteFunc
UpdateFunc
将变更的资源添加到updateCh
for ... select
不断的从updateCh
中取出变更资源,然后添加到syncQueue
中syncQueue
不断从队列中弹出元素,然后调用syncIngress
处理
3. 服务终止
上面的 for ... select
中的 case <-n.stopCh
,等 stopCh
有数据时,for 循环 break,服务正常结束。
在 main
函数中有这么一段代码:
go handleSigterm(ngx, func(code int) { os.Exit(code) })
handleSigterm
的实现为:
func handleSigterm(ngx *controller.NGINXController, exit exiter) { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGTERM) <-signalChan glog.Infof("Received SIGTERM, shutting down") exitCode := 0 if err := ngx.Stop(); err != nil { glog.Infof("Error during shutdown %v", err) exitCode = 1 } glog.Infof("Handled quit, awaiting pod deletion") time.Sleep(10 * time.Second) glog.Infof("Exiting with %v", exitCode) exit(exitCode) }
接受任意的 SIGTERM
信号,然后调用 ngx.Stop()
:
func (n *NGINXController) Stop() error { n.isShuttingDown = true // 关闭 stopCh 管道 close(n.stopCh) // 关闭 syncQueue go n.syncQueue.Shutdown() // 退出 nginx 进程 cmd := exec.Command(n.binary, "-c", cfgPath, "-s", "quit") cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr err := cmd.Run() return nil }
发送 SIGTERM
信号可能是终端(kill pid),也可能是调用 /stop
:
mux.HandleFunc("/stop", func(w http.ResponseWriter, r *http.Request) { err := syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // <- 这里 })
服务的启动、运行、停止就介绍完了。下面介绍一下 syncQueue
的处理函数 syncIngress
。
4. syncIngress
// ingress/controller/controller.go func (n *NGINXController) syncIngress(interface{}) error { n.syncRateLimiter.Accept() if n.syncQueue.IsShuttingDown() { return nil } // (1) ings := n.store.ListIngresses() sort.SliceStable(ings, func(i, j int) bool { ir := ings[i].ResourceVersion jr := ings[j].ResourceVersion return ir < jr }) // (2) upstreams, servers := n.getBackendServers(ings) var passUpstreams []*ingress.SSLPassthroughBackend for _, server := range servers { if !server.SSLPassthrough { continue } for _, loc := range server.Locations { if loc.Path != rootLocation { glog.Warningf("ignoring path %v of ssl passthrough host %v", loc.Path, server.Hostname) continue } passUpstreams = append(passUpstreams, &ingress.SSLPassthroughBackend{ Backend: loc.Backend, Hostname: server.Hostname, Service: loc.Service, Port: loc.Port, }) break } } // (3) pcfg := ingress.Configuration{ Backends: upstreams, Servers: servers, TCPEndpoints: n.getStreamServices(n.cfg.TCPConfigMapName, apiv1.ProtocolTCP), UDPEndpoints: n.getStreamServices(n.cfg.UDPConfigMapName, apiv1.ProtocolUDP), PassthroughBackends: passUpstreams, } // (4) if !n.isForceReload() && n.runningConfig.Equal(&pcfg) { glog.V(3).Infof("skipping backend reload (no changes detected)") return nil } // (5) err := n.OnUpdate(pcfg) // (6) n.runningConfig = &pcfg return nil }
syncIngress
的参数虽然为interface{}
,但是并没有使用参数,数据是从store.ListIngress()
中获取的。 在 Informers 的事件回调处理函数AddFunc
中有下面三行代码:store.extractAnnotations(ing) store.updateSecretIngressMap(ing) store.syncSecrets(ing)
估计是在 store 单独存储了一份 K8s ingress cache,在资源变更时同步更新缓存。所以在
syncIngress
每次都是拿到全部的 Ingresses。 也就是说syncQueue
中添加元素只是为了触发syncQueue
,而Enqueue
的传入的数据是什么是无意义的。- 根据 ingress 列表生成 backend 列表和 server 列表。backend 对应的是 NGINX 的 upstreams,而 server 对应的也是 NGINX 的 server
ingress.Configuration
对应最终的 nginx 配置文件所需的数据- 不是强制重启条件并且 nginx 当前的配置和新配置的相同时,跳过本次同步
onUpdate
更新配置文件,更新流程为:- 将 configmap 形式的 configuration 转换成普通的 configuration 对象(更贴合 nginx 配置形式的)
- 将配置写入自定义的配置模板(内存中)
- 先写入一个临时文件,测试配置是否存在问题:
nginx -t -c tmpfile
- 写入配置文件
"/etc/nginx/nginx.conf"
- reload nginx:
nginx -s reload -c /etc/nginx/nginx.conf
- 将新的配置文件作为替换正在运行的配置文件(以便下次更新对比)
这两篇文章简单讲解了 ingress-nginx 的生命周期,不够深入也不够全面。ingress-nginx 细节很多,有很多代码都是与 SSL 相关的,没费心思看。
当初看 ingress-nginx 的初衷是研究如何监听 K8s 资源变更事件的,看了源代码之后才知道它使用了 K8s client 的 informers 组件来实现的,
而 store 模块 cache 也是封装了 k8s.io/client-go/tools/cache
,它自己只是实现了自己的逻辑(ingress->nginx config),感兴趣大家可以去看看相关资料。