cmd/nginx/main.go
func main() { klog.InitFlags(nil) rand.Seed(time.Now().UnixNano()) fmt.Println(version.String()) showVersion, conf, err := parseFlags() if showVersion { os.Exit(0) } if err != nil { klog.Fatal(err) } err = file.CreateRequiredDirectories() if err != nil { klog.Fatal(err) } kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile) if err != nil { handleFatalInitError(err) } if len(conf.DefaultService) > 0 { err := checkService(conf.DefaultService, kubeClient) if err != nil { klog.Fatal(err) } klog.InfoS("Valid default backend", "service", conf.DefaultService) } if len(conf.PublishService) > 0 { err := checkService(conf.PublishService, kubeClient) if err != nil { klog.Fatal(err) } } if conf.Namespace != "" { _, err = kubeClient.CoreV1().Namespaces().Get(context.TODO(), conf.Namespace, metav1.GetOptions{}) if err != nil { klog.Fatalf("No namespace with name %v found: %v", conf.Namespace, err) } } conf.FakeCertificate = ssl.GetFakeSSLCert() klog.InfoS("SSL fake certificate created", "file", conf.FakeCertificate.PemFileName) if !k8s.NetworkingIngressAvailable(kubeClient) { klog.Fatalf("ingress-nginx requires Kubernetes v1.19.0 or higher") } _, err = kubeClient.NetworkingV1().IngressClasses().List(context.TODO(), metav1.ListOptions{}) if err != nil { if !errors.IsNotFound(err) { if errors.IsForbidden(err) { klog.Warningf("No permissions to list and get Ingress Classes: %v, IngressClass feature will be disabled", err) conf.IngressClassConfiguration.IgnoreIngressClass = true } } } conf.Client = kubeClient err = k8s.GetIngressPod(kubeClient) if err != nil { klog.Fatalf("Unexpected error obtaining ingress-nginx pod: %v", err) } reg := prometheus.NewRegistry() reg.MustRegister(prometheus.NewGoCollector()) reg.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{ PidFn: func() (int, error) { return os.Getpid(), nil }, ReportErrors: true, })) mc := metric.NewDummyCollector() if conf.EnableMetrics { mc, err = metric.NewCollector(conf.MetricsPerHost, reg, conf.IngressClassConfiguration.Controller) if err != nil { klog.Fatalf("Error creating prometheus collector: %v", err) } } // Pass the ValidationWebhook status to determine if we need to start the collector // for the admissionWebhook mc.Start(conf.ValidationWebhook) if conf.EnableProfiling { go registerProfiler() } ngx := controller.NewNGINXController(conf, mc) mux := http.NewServeMux() registerHealthz(nginx.HealthPath, ngx, mux) registerMetrics(reg, mux) go startHTTPServer(conf.HealthCheckHost, conf.ListenPorts.Health, mux) go ngx.Start() handleSigterm(ngx, func(code int) { os.Exit(code) }) }
主要逻辑
Step1 初始化配置,获取kubeClient
kubeClient, err := createApiserverClient(conf.APIServerHost, conf.RootCAFile, conf.KubeConfigFile)
Step2 检查DefaultService、PublicService、Namespace等存在
step3 检查IngressClasses权限
step4 检查ingress controller pod存在
Step5 创建NGINXController, 开启健康检查,metrics
ngx := controller.NewNGINXController(conf, mc)
方法NewNGINXController
//internal/ingress/controller/nginx.go // NewNGINXController creates a new NGINX Ingress controller. func NewNGINXController(config *Configuration, mc metric.Collector) *NGINXController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{ Interface: config.Client.CoreV1().Events(config.Namespace), }) h, err := dns.GetSystemNameServers() if err != nil { klog.Warningf("Error reading system nameservers: %v", err) } n := &NGINXController{ isIPV6Enabled: ing_net.IsIPv6Enabled(), resolver: h, cfg: config, syncRateLimiter: flowcontrol.NewTokenBucketRateLimiter(config.SyncRateLimit, 1), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{ Component: "nginx-ingress-controller", }), stopCh: make(chan struct{}), updateCh: channels.NewRingChannel(1024), ngxErrCh: make(chan error), stopLock: &sync.Mutex{}, runningConfig: new(ingress.Configuration), Proxy: &TCPProxy{}, metricCollector: mc, command: NewNginxCommand(), } if n.cfg.ValidationWebhook != "" { n.validationWebhookServer = &http.Server{ Addr: config.ValidationWebhook, Handler: adm_controller.NewAdmissionControllerServer(&adm_controller.IngressAdmission{Checker: n}), TLSConfig: ssl.NewTLSListener(n.cfg.ValidationWebhookCertPath, n.cfg.ValidationWebhookKeyPath).TLSConfig(), // disable http/2 // https://github.com/kubernetes/kubernetes/issues/80313 // https://github.com/kubernetes/ingress-nginx/issues/6323#issuecomment-737239159 TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)), } } n.store = store.New( config.Namespace, config.WatchNamespaceSelector, config.ConfigMapName, config.TCPConfigMapName, config.UDPConfigMapName, config.DefaultSSLCertificate, config.ResyncPeriod, config.Client, n.updateCh, config.DisableCatchAll, config.IngressClassConfiguration) n.syncQueue = task.NewTaskQueue(n.syncIngress) if config.UpdateStatus { n.syncStatus = status.NewStatusSyncer(status.Config{ Client: config.Client, PublishService: config.PublishService, PublishStatusAddress: config.PublishStatusAddress, IngressLister: n.store, UpdateStatusOnShutdown: config.UpdateStatusOnShutdown, UseNodeInternalIP: config.UseNodeInternalIP, }) } else { klog.Warning("Update of Ingress status is disabled (flag --update-status)") } onTemplateChange := func() { template, err := ngx_template.NewTemplate(nginx.TemplatePath) if err != nil { // this error is different from the rest because it must be clear why nginx is not working klog.ErrorS(err, "Error loading new template") return } n.t = template klog.InfoS("New NGINX configuration template loaded") n.syncQueue.EnqueueTask(task.GetDummyObject("template-change")) } ngxTpl, err := ngx_template.NewTemplate(nginx.TemplatePath) if err != nil { klog.Fatalf("Invalid NGINX configuration template: %v", err) } n.t = ngxTpl _, err = watch.NewFileWatcher(nginx.TemplatePath, onTemplateChange) if err != nil { klog.Fatalf("Error creating file watcher for %v: %v", nginx.TemplatePath, err) } filesToWatch := []string{} err = filepath.Walk("/etc/nginx/geoip/", func(path string, info os.FileInfo, err error) error { if err != nil { return err } if info.IsDir() { return nil } filesToWatch = append(filesToWatch, path) return nil }) if err != nil { klog.Fatalf("Error creating file watchers: %v", err) } for _, f := range filesToWatch { _, err = watch.NewFileWatcher(f, func() { klog.InfoS("File changed detected. Reloading NGINX", "path", f) n.syncQueue.EnqueueTask(task.GetDummyObject("file-change")) }) if err != nil { klog.Fatalf("Error creating file watcher for %v: %v", f, err) } } return n }
主要逻辑:
实例化NGINXController
创建validationWebhookServer
创建store缓存
n.store = store.New
监听模板文件以及/etc/nginx/geoip下的文件变化,如有变化,发送对应的文件变化事件
当ingress有变化时,执行syncIngress方法回调
n.syncQueue = task.NewTaskQueue(n.syncIngress)
Start方法,开启万能的死循环,处理各种Event
//internal/ingress/controller/nginx.go for { select { case err := <-n.ngxErrCh: if n.isShuttingDown { return } // if the nginx master process dies, the workers continue to process requests // until the failure of the configured livenessProbe and restart of the pod. if process.IsRespawnIfRequired(err) { return } case event := <-n.updateCh.Out(): if n.isShuttingDown { break } if evt, ok := event.(store.Event); ok { klog.V(3).InfoS("Event received", "type", evt.Type, "object", evt.Obj) if evt.Type == store.ConfigurationEvent { // TODO: is this necessary? Consider removing this special case n.syncQueue.EnqueueTask(task.GetDummyObject("configmap-change")) continue } n.syncQueue.EnqueueSkippableTask(evt.Obj) } else { klog.Warningf("Unexpected event type received %T", event) } case <-n.stopCh: return } }
当updateCh中有数据时,放到syncQueue中
//internal/task/queue.go // worker processes work in the queue through sync. func (t *Queue) worker() { for { key, quit := t.queue.Get() if quit { if !isClosed(t.workerDone) { close(t.workerDone) } return } ts := time.Now().UnixNano() item := key.(Element) if item.Timestamp != 0 && t.lastSync > item.Timestamp { klog.V(3).InfoS("skipping sync", "key", item.Key, "last", t.lastSync, "now", item.Timestamp) t.queue.Forget(key) t.queue.Done(key) continue } klog.V(3).InfoS("syncing", "key", item.Key) if err := t.sync(key); err != nil { klog.ErrorS(err, "requeuing", "key", item.Key) t.queue.AddRateLimited(Element{ Key: item.Key, Timestamp: 0, }) } else { t.queue.Forget(key) t.lastSync = ts } t.queue.Done(key) } }
当syncQueue中有数据,回调syncIngress方法 t.sync(key)
//internal/ingress/controller/store/store.go ingEventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { ing, _ := toIngress(obj) if !watchedNamespace(ing.Namespace) { return } ic, err := store.GetIngressClass(ing, icConfig) if err != nil { klog.InfoS("Ignoring ingress because of error while validating ingress class", "ingress", klog.KObj(ing), "error", err) return } klog.InfoS("Found valid IngressClass", "ingress", klog.KObj(ing), "ingressclass", ic) if hasCatchAllIngressRule(ing.Spec) && disableCatchAll { klog.InfoS("Ignoring add for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(ing)) return } recorder.Eventf(ing, corev1.EventTypeNormal, "Sync", "Scheduled for sync") store.syncIngress(ing) store.updateSecretIngressMap(ing) store.syncSecrets(ing) updateCh.In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: ingDeleteHandler, UpdateFunc: func(old, cur interface{}) { oldIng, _ := toIngress(old) curIng, _ := toIngress(cur) if !watchedNamespace(oldIng.Namespace) { return } var errOld, errCur error var classCur string if !icConfig.IgnoreIngressClass { _, errOld = store.GetIngressClass(oldIng, icConfig) classCur, errCur = store.GetIngressClass(curIng, icConfig) } if errOld != nil && errCur == nil { if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll { klog.InfoS("ignoring update for catch-all ingress because of --disable-catch-all", "ingress", klog.KObj(curIng)) return } klog.InfoS("creating ingress", "ingress", klog.KObj(curIng), "ingressclass", classCur) recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync") } else if errOld == nil && errCur != nil { klog.InfoS("removing ingress because of unknown ingressclass", "ingress", klog.KObj(curIng)) ingDeleteHandler(old) return } else if errCur == nil && !reflect.DeepEqual(old, cur) { if hasCatchAllIngressRule(curIng.Spec) && disableCatchAll { klog.InfoS("ignoring update for catch-all ingress and delete old one because of --disable-catch-all", "ingress", klog.KObj(curIng)) ingDeleteHandler(old) return } recorder.Eventf(curIng, corev1.EventTypeNormal, "Sync", "Scheduled for sync") } else { klog.V(3).InfoS("No changes on ingress. Skipping update", "ingress", klog.KObj(curIng)) return } store.syncIngress(curIng) store.updateSecretIngressMap(curIng) store.syncSecrets(curIng) updateCh.In() <- Event{ Type: UpdateEvent, Obj: cur, } }, }
internal/ingress/controller/store/store.go
New方法构造各种资源的处理EventHandler
Ingress举例:ResourceEventHandlerFuncs
处理Ingress的Add、Delete、Update
store.informers.Ingress.AddEventHandler(ingEventHandler) if !icConfig.IgnoreIngressClass { store.informers.IngressClass.AddEventHandler(ingressClassEventHandler) } store.informers.Endpoint.AddEventHandler(epEventHandler) store.informers.Secret.AddEventHandler(secrEventHandler) store.informers.ConfigMap.AddEventHandler(cmEventHandler) store.informers.Service.AddEventHandler(serviceHandler)
Ingress、IngressClass、Endpoint、Secret、ConfigMap、Service注册到k8s的sdk Informers中