diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index c297e09db5d..803d7896f24 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -31,6 +31,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/intstr" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" @@ -64,7 +65,8 @@ type DisruptionController struct { dController *cache.Controller dLister cache.StoreToDeploymentLister - queue *workqueue.Type + // PodDisruptionBudget keys that need to be synced. + queue workqueue.RateLimitingInterface broadcaster record.EventBroadcaster recorder record.EventRecorder @@ -87,7 +89,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient * dc := &DisruptionController{ kubeClient: kubeClient, podController: podInformer.GetController(), - queue: workqueue.NewNamed("disruption"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), broadcaster: record.NewBroadcaster(), } dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) @@ -390,33 +392,27 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ( } func (dc *DisruptionController) worker() { - work := func() bool { - key, quit := dc.queue.Get() - if quit { - return quit - } - defer dc.queue.Done(key) - glog.V(4).Infof("Syncing PodDisruptionBudget %q", key.(string)) - if err := dc.sync(key.(string)); err != nil { - glog.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", key.(string), err) - // TODO(mml): In order to be safe in the face of a total inability to write state - // changes, we should write an expiration timestamp here and consumers - // of the PDB state (the /evict subresource handler) should check that - // any 'true' state is relatively fresh. + for dc.processNextWorkItem() { + } +} - // TODO(mml): file an issue to that effect - - // TODO(mml): If we used a workqueue.RateLimitingInterface, we could - // improve our behavior (be a better citizen) when we need to retry. - dc.queue.Add(key) - } +func (dc *DisruptionController) processNextWorkItem() bool { + dKey, quit := dc.queue.Get() + if quit { return false } - for { - if quit := work(); quit { - return - } + defer dc.queue.Done(dKey) + + err := dc.sync(dKey.(string)) + if err == nil { + dc.queue.Forget(dKey) + return true } + + utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err)) + dc.queue.AddRateLimited(dKey) + + return true } func (dc *DisruptionController) sync(key string) error { @@ -427,10 +423,10 @@ func (dc *DisruptionController) sync(key string) error { obj, exists, err := dc.pdbLister.Store.GetByKey(key) if !exists { - return err + glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key) + return nil } if err != nil { - glog.Errorf("unable to retrieve PodDisruptionBudget %v from store: %v", key, err) return err } diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index 7446753b25e..944b101962f 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -490,3 +490,13 @@ func TestTwoControllers(t *testing.T) { dc.sync(pdbName) ps.VerifyPdbStatus(t, pdbName, true, 7, 7, 22) } + +// Test pdb doesn't exist +func TestPDBNotExist(t *testing.T) { + dc, _ := newFakeDisruptionController() + pdb, _ := newPodDisruptionBudget(t, intstr.FromString("67%")) + add(t, dc.pdbLister.Store, pdb) + if err := dc.sync("notExist"); err != nil { + t.Errorf("Unexpected error: %v, expect nil", err) + } +}