prevent namespace cleanup hotloop

This commit is contained in:
deads2k 2016-05-16 08:26:48 -04:00
parent 8d90427c34
commit 7a7fe3abb8

View File

@ -47,7 +47,7 @@ type NamespaceController struct {
// controller that observes the namespaces
controller *framework.Controller
// namespaces that have been queued up for processing by workers
queue *workqueue.Type
queue workqueue.RateLimitingInterface
// list of preferred group versions and their corresponding resource set for namespace deletion
groupVersionResources []unversioned.GroupVersionResource
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
@ -67,7 +67,7 @@ func NewNamespaceController(
namespaceController := &NamespaceController{
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.New(),
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
groupVersionResources: groupVersionResources,
opCache: operationNotSupportedCache{},
finalizerToken: finalizerToken,
@ -122,29 +122,40 @@ func (nm *NamespaceController) enqueueNamespace(obj interface{}) {
// The system ensures that no two workers can process
// the same namespace at the same time.
func (nm *NamespaceController) worker() {
for {
func() {
workFunc := func() bool {
key, quit := nm.queue.Get()
if quit {
return true
}
defer nm.queue.Done(key)
err := nm.syncNamespaceFromKey(key.(string))
if err == nil {
// no error, forget this entry and return
nm.queue.Forget(key)
return false
}
if estimate, ok := err.(*contentRemainingError); ok {
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
} else {
// rather than wait for a full resync, re-add the namespace to the queue to be processed
nm.queue.AddRateLimited(key)
utilruntime.HandleError(err)
}
return false
}
for {
quit := workFunc()
if quit {
return
}
defer nm.queue.Done(key)
if err := nm.syncNamespaceFromKey(key.(string)); err != nil {
if estimate, ok := err.(*contentRemainingError); ok {
go func() {
defer utilruntime.HandleCrash()
t := estimate.Estimate/2 + 1
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
time.Sleep(time.Duration(t) * time.Second)
nm.queue.Add(key)
}()
} else {
// rather than wait for a full resync, re-add the namespace to the queue to be processed
nm.queue.Add(key)
utilruntime.HandleError(err)
}
}
}()
}
}