Merge pull request #32664 from m1093782566/m109-certificates-hot-loop

Automatic merge from submit-queue

[Controller Manager] Fix certificates controller hotloop and use utilruntime.HandleError to replace glog.Errorf

<!--  Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md
2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md
3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes
-->

**What this PR does / why we need it**:

Fix certificates controller hotloop on unexpected API server rejections.

**Which issue this PR fixes** 

Related issue is #30629

**Special notes for your reviewer**:

@deads2k @derekwaynecarr PTAL.

I find there is no unit test for certificates controller, and I will implement unit tests for it later.
This commit is contained in:
Kubernetes Submit Queue 2016-09-17 21:00:59 -07:00 committed by GitHub
commit 920581d964

View File

@ -57,7 +57,7 @@ type CertificateController struct {
signer *local.Signer
queue *workqueue.Type
queue workqueue.RateLimitingInterface
}
func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Duration, caCertFile, caKeyFile string, approveAllKubeletCSRsForGroup string) (*CertificateController, error) {
@ -78,7 +78,7 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
cc := &CertificateController{
kubeClient: kubeClient,
queue: workqueue.NewNamed("certificate"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "certificate"),
signer: ca,
approveAllKubeletCSRsForGroup: approveAllKubeletCSRsForGroup,
}
@ -120,37 +120,47 @@ func NewCertificateController(kubeClient clientset.Interface, syncPeriod time.Du
// Run the main goroutine responsible for watching and syncing jobs.
func (cc *CertificateController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer cc.queue.ShutDown()
go cc.csrController.Run(stopCh)
glog.Infof("Starting certificate controller manager")
for i := 0; i < workers; i++ {
go wait.Until(cc.worker, time.Second, stopCh)
}
<-stopCh
glog.Infof("Shutting down certificate controller")
cc.queue.ShutDown()
}
// worker runs a thread that dequeues CSRs, handles them, and marks them done.
func (cc *CertificateController) worker() {
for {
func() {
key, quit := cc.queue.Get()
if quit {
return
}
defer cc.queue.Done(key)
err := cc.syncHandler(key.(string))
if err != nil {
glog.Errorf("Error syncing CSR: %v", err)
}
}()
for cc.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (cc *CertificateController) processNextWorkItem() bool {
cKey, quit := cc.queue.Get()
if quit {
return false
}
defer cc.queue.Done(cKey)
err := cc.syncHandler(cKey.(string))
if err == nil {
cc.queue.Forget(cKey)
return true
}
cc.queue.AddRateLimited(cKey)
utilruntime.HandleError(fmt.Errorf("Sync %v failed with : %v", cKey, err))
return true
}
func (cc *CertificateController) enqueueCertificateRequest(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
cc.queue.Add(key)
@ -179,7 +189,6 @@ func (cc *CertificateController) maybeSignCertificate(key string) error {
}()
obj, exists, err := cc.csrStore.Store.GetByKey(key)
if err != nil {
cc.queue.Add(key)
return err
}
if !exists {
@ -234,7 +243,7 @@ func (cc *CertificateController) maybeAutoApproveCSR(csr *certificates.Certifica
x509cr, err := utilcertificates.ParseCertificateRequestObject(csr)
if err != nil {
glog.Errorf("unable to parse csr %q: %v", csr.ObjectMeta.Name, err)
utilruntime.HandleError(fmt.Errorf("unable to parse csr %q: %v", csr.Name, err))
return csr, nil
}
if !reflect.DeepEqual([]string{"system:nodes"}, x509cr.Subject.Organization) {