Merge pull request #32850 from m1093782566/m109-disruption

Automatic merge from submit-queue

fix disruption controller hotloop

<!--  Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md
2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md
3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes
-->


Fix disruption controller hotloop on unexpected API server rejections.

**Which issue this PR fixes** 

Related issue is #30629

**Special notes for your reviewer**:

@deads2k @derekwaynecarr PTAL.
This commit is contained in:
Kubernetes Submit Queue 2016-09-29 07:10:15 -07:00 committed by GitHub
commit 10239c983d
2 changed files with 33 additions and 27 deletions

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/intstr"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/util/workqueue"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
@ -66,7 +67,8 @@ type DisruptionController struct {
dController *cache.Controller dController *cache.Controller
dLister cache.StoreToDeploymentLister dLister cache.StoreToDeploymentLister
queue *workqueue.Type // PodDisruptionBudget keys that need to be synced.
queue workqueue.RateLimitingInterface
broadcaster record.EventBroadcaster broadcaster record.EventBroadcaster
recorder record.EventRecorder recorder record.EventRecorder
@ -89,7 +91,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i
dc := &DisruptionController{ dc := &DisruptionController{
kubeClient: kubeClient, kubeClient: kubeClient,
podController: podInformer.GetController(), podController: podInformer.GetController(),
queue: workqueue.NewNamed("disruption"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
broadcaster: record.NewBroadcaster(), broadcaster: record.NewBroadcaster(),
} }
dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
@ -392,33 +394,27 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) (
} }
func (dc *DisruptionController) worker() { func (dc *DisruptionController) worker() {
work := func() bool { for dc.processNextWorkItem() {
key, quit := dc.queue.Get() }
if quit { }
return quit
}
defer dc.queue.Done(key)
glog.V(4).Infof("Syncing PodDisruptionBudget %q", key.(string))
if err := dc.sync(key.(string)); err != nil {
glog.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", key.(string), err)
// TODO(mml): In order to be safe in the face of a total inability to write state
// changes, we should write an expiration timestamp here and consumers
// of the PDB state (the /evict subresource handler) should check that
// any 'true' state is relatively fresh.
// TODO(mml): file an issue to that effect func (dc *DisruptionController) processNextWorkItem() bool {
dKey, quit := dc.queue.Get()
// TODO(mml): If we used a workqueue.RateLimitingInterface, we could if quit {
// improve our behavior (be a better citizen) when we need to retry.
dc.queue.Add(key)
}
return false return false
} }
for { defer dc.queue.Done(dKey)
if quit := work(); quit {
return err := dc.sync(dKey.(string))
} if err == nil {
dc.queue.Forget(dKey)
return true
} }
utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err))
dc.queue.AddRateLimited(dKey)
return true
} }
func (dc *DisruptionController) sync(key string) error { func (dc *DisruptionController) sync(key string) error {
@ -429,10 +425,10 @@ func (dc *DisruptionController) sync(key string) error {
obj, exists, err := dc.pdbLister.Store.GetByKey(key) obj, exists, err := dc.pdbLister.Store.GetByKey(key)
if !exists { if !exists {
return err glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
return nil
} }
if err != nil { if err != nil {
glog.Errorf("unable to retrieve PodDisruptionBudget %v from store: %v", key, err)
return err return err
} }

View File

@ -502,3 +502,13 @@ func TestTwoControllers(t *testing.T) {
dc.sync(pdbName) dc.sync(pdbName)
ps.VerifyPdbStatus(t, pdbName, true, 1+minimumTwo, minimumTwo, 2*collectionSize) ps.VerifyPdbStatus(t, pdbName, true, 1+minimumTwo, minimumTwo, 2*collectionSize)
} }
// Test pdb doesn't exist
func TestPDBNotExist(t *testing.T) {
dc, _ := newFakeDisruptionController()
pdb, _ := newPodDisruptionBudget(t, intstr.FromString("67%"))
add(t, dc.pdbLister.Store, pdb)
if err := dc.sync("notExist"); err != nil {
t.Errorf("Unexpected error: %v, expect nil", err)
}
}