From fea0c79054c38bb99d8930878a047f2e12cabb3b Mon Sep 17 00:00:00 2001 From: d00369826 Date: Wed, 14 Sep 2016 20:19:19 +0800 Subject: [PATCH] fix certificates controller hotloop on unexpected API server rejections Change-Id: Ib7d2e18bcaa498bddfc785f3ff12958dfaaecbc3 --- pkg/controller/certificates/controller.go | 45 ++++++++++++++--------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/pkg/controller/certificates/controller.go b/pkg/controller/certificates/controller.go index 430d50e04cb..64911d63478 100644 --- a/pkg/controller/certificates/controller.go +++ b/pkg/controller/certificates/controller.go @@ -58,7 +58,7 @@ type CertificateController struct { signer *local.Signer - queue *workqueue.Type + queue workqueue.RateLimitingInterface } func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approveAllKubeletCSRsForGroup string) (*CertificateController, error) { @@ -79,7 +79,7 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du cc := &CertificateController{ kubeClient: kubeClient, - queue: workqueue.NewNamed("certificate"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "certificate"), signer: ca, approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup, } @@ -121,37 +121,47 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du // Run the main goroutine responsible for watching and syncing jobs. func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer cc.queue.ShutDown() + go cc.csrController.Run(stopCh) + glog.Infof("Starting certificate controller manager") for i := 0; i < workers; i++ { go wait.Until(cc.worker, time.Second, stopCh) } <-stopCh glog.Infof("Shutting down certificate controller") - cc.queue.ShutDown() } // worker runs a thread that dequeues CSRs, handles them, and marks them done. func (cc *CertificateController) worker() { - for { - func() { - key, quit := cc.queue.Get() - if quit { - return - } - defer cc.queue.Done(key) - err := cc.syncHandler(key.(string)) - if err != nil { - glog.Errorf("Error syncing CSR: %v", err) - } - }() + for cc.processNextWorkItem() { } } +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (cc *CertificateController) processNextWorkItem() bool { + cKey, quit := cc.queue.Get() + if quit { + return false + } + defer cc.queue.Done(cKey) + + err := cc.syncHandler(cKey.(string)) + if err == nil { + cc.queue.Forget(cKey) + return true + } + + cc.queue.AddRateLimited(cKey) + utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err)) + return true +} + func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) return } cc.queue.Add(key) @@ -180,7 +190,6 @@ func (cc *CertificateController) maybeSignCertificate(key string) error { }() obj, exists, err := cc.csrStore.Store.GetByKey(key) if err != nil { - cc.queue.Add(key) return err } if !exists { @@ -235,7 +244,7 @@ func (cc *CertificateController) maybeAutoApproveCSR(csr *certificates.Certifica x509cr, err := utilcertificates.ParseCertificateRequestObject(csr) if err != nil { - glog.Errorf("unable to parse csr %q: %v", csr.ObjectMeta.Name, err) + utilruntime.HandleError(fmt.Errorf("unable to parse csr %q: %v", csr.Name, err)) return csr, nil } if !reflect.DeepEqual([]string{"system:nodes"}, x509cr.Subject.Organization) {