mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
fix disruption hot loop
Change-Id: Ib8eb56cb87f688fe9b2016f574f3fb9b685ce796
This commit is contained in:
parent
5975535daa
commit
2a117798b6
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util/intstr"
|
"k8s.io/kubernetes/pkg/util/intstr"
|
||||||
|
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||||
"k8s.io/kubernetes/pkg/watch"
|
"k8s.io/kubernetes/pkg/watch"
|
||||||
@ -64,7 +65,8 @@ type DisruptionController struct {
|
|||||||
dController *cache.Controller
|
dController *cache.Controller
|
||||||
dLister cache.StoreToDeploymentLister
|
dLister cache.StoreToDeploymentLister
|
||||||
|
|
||||||
queue *workqueue.Type
|
// PodDisruptionBudget keys that need to be synced.
|
||||||
|
queue workqueue.RateLimitingInterface
|
||||||
|
|
||||||
broadcaster record.EventBroadcaster
|
broadcaster record.EventBroadcaster
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
@ -87,7 +89,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient *
|
|||||||
dc := &DisruptionController{
|
dc := &DisruptionController{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
podController: podInformer.GetController(),
|
podController: podInformer.GetController(),
|
||||||
queue: workqueue.NewNamed("disruption"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"),
|
||||||
broadcaster: record.NewBroadcaster(),
|
broadcaster: record.NewBroadcaster(),
|
||||||
}
|
}
|
||||||
dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
||||||
@ -390,33 +392,27 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) (
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DisruptionController) worker() {
|
func (dc *DisruptionController) worker() {
|
||||||
work := func() bool {
|
for dc.processNextWorkItem() {
|
||||||
key, quit := dc.queue.Get()
|
}
|
||||||
if quit {
|
}
|
||||||
return quit
|
|
||||||
}
|
|
||||||
defer dc.queue.Done(key)
|
|
||||||
glog.V(4).Infof("Syncing PodDisruptionBudget %q", key.(string))
|
|
||||||
if err := dc.sync(key.(string)); err != nil {
|
|
||||||
glog.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", key.(string), err)
|
|
||||||
// TODO(mml): In order to be safe in the face of a total inability to write state
|
|
||||||
// changes, we should write an expiration timestamp here and consumers
|
|
||||||
// of the PDB state (the /evict subresource handler) should check that
|
|
||||||
// any 'true' state is relatively fresh.
|
|
||||||
|
|
||||||
// TODO(mml): file an issue to that effect
|
func (dc *DisruptionController) processNextWorkItem() bool {
|
||||||
|
dKey, quit := dc.queue.Get()
|
||||||
// TODO(mml): If we used a workqueue.RateLimitingInterface, we could
|
if quit {
|
||||||
// improve our behavior (be a better citizen) when we need to retry.
|
|
||||||
dc.queue.Add(key)
|
|
||||||
}
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
for {
|
defer dc.queue.Done(dKey)
|
||||||
if quit := work(); quit {
|
|
||||||
return
|
err := dc.sync(dKey.(string))
|
||||||
}
|
if err == nil {
|
||||||
|
dc.queue.Forget(dKey)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
utilruntime.HandleError(fmt.Errorf("Error syncing PodDisruptionBudget %v, requeuing: %v", dKey.(string), err))
|
||||||
|
dc.queue.AddRateLimited(dKey)
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dc *DisruptionController) sync(key string) error {
|
func (dc *DisruptionController) sync(key string) error {
|
||||||
@ -427,10 +423,10 @@ func (dc *DisruptionController) sync(key string) error {
|
|||||||
|
|
||||||
obj, exists, err := dc.pdbLister.Store.GetByKey(key)
|
obj, exists, err := dc.pdbLister.Store.GetByKey(key)
|
||||||
if !exists {
|
if !exists {
|
||||||
return err
|
glog.V(4).Infof("PodDisruptionBudget %q has been deleted", key)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.Errorf("unable to retrieve PodDisruptionBudget %v from store: %v", key, err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -490,3 +490,13 @@ func TestTwoControllers(t *testing.T) {
|
|||||||
dc.sync(pdbName)
|
dc.sync(pdbName)
|
||||||
ps.VerifyPdbStatus(t, pdbName, true, 7, 7, 22)
|
ps.VerifyPdbStatus(t, pdbName, true, 7, 7, 22)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test pdb doesn't exist
|
||||||
|
func TestPDBNotExist(t *testing.T) {
|
||||||
|
dc, _ := newFakeDisruptionController()
|
||||||
|
pdb, _ := newPodDisruptionBudget(t, intstr.FromString("67%"))
|
||||||
|
add(t, dc.pdbLister.Store, pdb)
|
||||||
|
if err := dc.sync("notExist"); err != nil {
|
||||||
|
t.Errorf("Unexpected error: %v, expect nil", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user