From 7a7fe3abb87310f851b70c3bf24ce762f9dbf40e Mon Sep 17 00:00:00 2001 From: deads2k Date: Mon, 16 May 2016 08:26:48 -0400 Subject: [PATCH] prevent namespace cleanup hotloop --- .../namespace/namespace_controller.go | 59 +++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 23d7fefeb4d..0583313ab86 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -47,7 +47,7 @@ type NamespaceController struct { // controller that observes the namespaces controller *framework.Controller // namespaces that have been queued up for processing by workers - queue *workqueue.Type + queue workqueue.RateLimitingInterface // list of preferred group versions and their corresponding resource set for namespace deletion groupVersionResources []unversioned.GroupVersionResource // opCache is a cache to remember if a particular operation is not supported to aid dynamic client. @@ -67,7 +67,7 @@ func NewNamespaceController( namespaceController := &NamespaceController{ kubeClient: kubeClient, clientPool: clientPool, - queue: workqueue.New(), + queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), groupVersionResources: groupVersionResources, opCache: operationNotSupportedCache{}, finalizerToken: finalizerToken, @@ -122,29 +122,40 @@ func (nm *NamespaceController) enqueueNamespace(obj interface{}) { // The system ensures that no two workers can process // the same namespace at the same time. func (nm *NamespaceController) worker() { + workFunc := func() bool { + key, quit := nm.queue.Get() + if quit { + return true + } + defer nm.queue.Done(key) + + err := nm.syncNamespaceFromKey(key.(string)) + if err == nil { + // no error, forget this entry and return + nm.queue.Forget(key) + return false + } + + if estimate, ok := err.(*contentRemainingError); ok { + t := estimate.Estimate/2 + 1 + glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t) + nm.queue.AddAfter(key, time.Duration(t)*time.Second) + + } else { + // rather than wait for a full resync, re-add the namespace to the queue to be processed + nm.queue.AddRateLimited(key) + utilruntime.HandleError(err) + } + return false + + } + for { - func() { - key, quit := nm.queue.Get() - if quit { - return - } - defer nm.queue.Done(key) - if err := nm.syncNamespaceFromKey(key.(string)); err != nil { - if estimate, ok := err.(*contentRemainingError); ok { - go func() { - defer utilruntime.HandleCrash() - t := estimate.Estimate/2 + 1 - glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t) - time.Sleep(time.Duration(t) * time.Second) - nm.queue.Add(key) - }() - } else { - // rather than wait for a full resync, re-add the namespace to the queue to be processed - nm.queue.Add(key) - utilruntime.HandleError(err) - } - } - }() + quit := workFunc() + + if quit { + return + } } }