diff --git a/pkg/apis/policy/types.go b/pkg/apis/policy/types.go index 97e58beec94..6104b2fce8c 100644 --- a/pkg/apis/policy/types.go +++ b/pkg/apis/policy/types.go @@ -51,6 +51,19 @@ type PodDisruptionBudgetStatus struct { // total number of pods counted by this disruption budget ExpectedPods int32 `json:"expectedPods"` + + // DisruptedPods contains information about pods whose eviction was + // processed by the API server eviction subresource handler but has not + // yet been observed by the PodDisruptionBudget controller. + // A pod will be in this map from the time when the API server processed the + // eviction request to the time when the pod is seen by PDB controller + // as having been marked for deletion (or after a timeout). The key in the map is the name of the pod + // and the value is the time when the API server processed the eviction request. If + // the deletion didn't occur and a pod is still there it will be removed from + // the list automatically by PodDisruptionBudget controller after some time. + // If everything goes smooth this map should be empty for the most of the time. + // Large number of entries in the map may indicate problems with pod deletions. + DisruptedPods map[string]unversioned.Time `json:"disruptedPods" protobuf:"bytes,5,rep,name=disruptedPods"` } // +genclient=true diff --git a/pkg/apis/policy/v1beta1/types.go b/pkg/apis/policy/v1beta1/types.go index 8a57baca419..46c7e7d1b3d 100644 --- a/pkg/apis/policy/v1beta1/types.go +++ b/pkg/apis/policy/v1beta1/types.go @@ -49,6 +49,19 @@ type PodDisruptionBudgetStatus struct { // total number of pods counted by this disruption budget ExpectedPods int32 `json:"expectedPods" protobuf:"varint,4,opt,name=expectedPods"` + + // DisruptedPods contains information about pods whose eviction was + // processed by the API server eviction subresource handler but has not + // yet been observed by the PodDisruptionBudget controller. + // A pod will be in this map from the time when the API server processed the + // eviction request to the time when the pod is seen by PDB controller + // as having been marked for deletion (or after a timeout). The key in the map is the name of the pod + // and the value is the time when the API server processed the eviction request. If + // the deletion didn't occur and a pod is still there it will be removed from + // the list automatically by PodDisruptionBudget controller after some time. + // If everything goes smooth this map should be empty for the most of the time. + // Large number of entries in the map may indicate problems with pod deletions. + DisruptedPods map[string]unversioned.Time `json:"disruptedPods" protobuf:"bytes,5,rep,name=disruptedPods"` } // +genclient=true diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 55ec5e7e940..1707a981d14 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -18,6 +18,7 @@ package disruption import ( "fmt" + "reflect" "time" "k8s.io/kubernetes/pkg/api" @@ -43,6 +44,17 @@ import ( const statusUpdateRetries = 2 +// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status +// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion. +// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at +// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that +// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should +// be more than enough. +// If the cotroller is running on a different node it is important that the two nodes have synced +// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough +// protection against unwanted pod disruptions. +const DeletionTimeout = 2 * 60 * time.Second + type updater func(*policy.PodDisruptionBudget) error type DisruptionController struct { @@ -68,7 +80,8 @@ type DisruptionController struct { dLister cache.StoreToDeploymentLister // PodDisruptionBudget keys that need to be synced. - queue workqueue.RateLimitingInterface + queue workqueue.RateLimitingInterface + recheckQueue workqueue.DelayingInterface broadcaster record.EventBroadcaster recorder record.EventRecorder @@ -92,6 +105,7 @@ func NewDisruptionController(podInformer cache.SharedIndexInformer, kubeClient i kubeClient: kubeClient, podController: podInformer.GetController(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), + recheckQueue: workqueue.NewNamedDelayingQueue("disruption-recheck"), broadcaster: record.NewBroadcaster(), } dc.recorder = dc.broadcaster.NewRecorder(api.EventSource{Component: "controllermanager"}) @@ -270,6 +284,8 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) { go dc.rsController.Run(stopCh) go dc.dController.Run(stopCh) go wait.Until(dc.worker, time.Second, stopCh) + go wait.Until(dc.recheckWorker, time.Second, stopCh) + <-stopCh glog.V(0).Infof("Shutting down disruption controller") } @@ -355,6 +371,15 @@ func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) { dc.queue.Add(key) } +func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) { + key, err := controller.KeyFunc(pdb) + if err != nil { + glog.Errorf("Cound't get key for PodDisruptionBudget object %+v: %v", pdb, err) + return + } + dc.recheckQueue.AddAfter(key, delay) +} + func (dc *DisruptionController) getPdbForPod(pod *api.Pod) *policy.PodDisruptionBudget { // GetPodPodDisruptionBudgets returns an error only if no // PodDisruptionBudgets are found. We don't return that as an error to the @@ -417,6 +442,21 @@ func (dc *DisruptionController) processNextWorkItem() bool { return true } +func (dc *DisruptionController) recheckWorker() { + for dc.processNextRecheckWorkItem() { + } +} + +func (dc *DisruptionController) processNextRecheckWorkItem() bool { + dKey, quit := dc.recheckQueue.Get() + if quit { + return false + } + defer dc.recheckQueue.Done(dKey) + dc.queue.AddRateLimited(dKey) + return true +} + func (dc *DisruptionController) sync(key string) error { startTime := time.Now() defer func() { @@ -452,9 +492,17 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { return err } - currentHealthy := countHealthyPods(pods) - err = dc.updatePdbSpec(pdb, currentHealthy, desiredHealthy, expectedCount) + currentTime := time.Now() + disruptedPods, recheckTime := buildDisruptedPodMap(pods, pdb, currentTime) + currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) + err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) + if err == nil && recheckTime != nil { + // There is always at most one PDB waiting with a particular name in the queue, + // and each PDB in the queue is associated with the lowest timestamp + // that was supplied when a PDB with that name was added. + dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime)) + } return err } @@ -527,20 +575,60 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud return } -func countHealthyPods(pods []*api.Pod) (currentHealthy int32) { +func countHealthyPods(pods []*api.Pod, disruptedPods map[string]unversioned.Time, currentTime time.Time) (currentHealthy int32) { Pod: for _, pod := range pods { - for _, c := range pod.Status.Conditions { - if c.Type == api.PodReady && c.Status == api.ConditionTrue { - currentHealthy++ - continue Pod - } + // Pod is beeing deleted. + if pod.DeletionTimestamp != nil { + continue + } + // Pod is expected to be deleted soon. + if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) { + continue + } + if api.IsPodReady(pod) { + currentHealthy++ + continue Pod } } return } +// Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted +// or not-deleted at all items. Also returns an information when this check should be repeated. +func buildDisruptedPodMap(pods []*api.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]unversioned.Time, *time.Time) { + disruptedPods := pdb.Status.DisruptedPods + result := make(map[string]unversioned.Time) + var recheckTime *time.Time + + if disruptedPods == nil || len(disruptedPods) == 0 { + return result, recheckTime + } + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + // Already being deleted. + continue + } + disruptionTime, found := disruptedPods[pod.Name] + if !found { + // Pod not on the list. + continue + } + expectedDeletion := disruptionTime.Time.Add(DeletionTimeout) + if expectedDeletion.Before(currentTime) { + glog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s", + pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name) + } else { + if recheckTime == nil || expectedDeletion.Before(*recheckTime) { + recheckTime = &expectedDeletion + } + result[pod.Name] = disruptionTime + } + } + return result, recheckTime +} + // failSafe is an attempt to at least update the PodDisruptionsAllowed field to // 0 if everything else has failed. This is one place we // implement the "fail open" part of the design since if we manage to update @@ -557,7 +645,9 @@ func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget) error return dc.getUpdater()(&newPdb) } -func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32) error { +func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32, + disruptedPods map[string]unversioned.Time) error { + // We require expectedCount to be > 0 so that PDBs which currently match no // pods are in a safe state when their first pods appear but this controller // has not updated their status yet. This isn't the only race, but it's a @@ -567,7 +657,11 @@ func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, c disruptionsAllowed = 0 } - if pdb.Status.CurrentHealthy == currentHealthy && pdb.Status.DesiredHealthy == desiredHealthy && pdb.Status.ExpectedPods == expectedCount && pdb.Status.PodDisruptionsAllowed == disruptionsAllowed { + if pdb.Status.CurrentHealthy == currentHealthy && + pdb.Status.DesiredHealthy == desiredHealthy && + pdb.Status.ExpectedPods == expectedCount && + pdb.Status.PodDisruptionsAllowed == disruptionsAllowed && + reflect.DeepEqual(pdb.Status.DisruptedPods, disruptedPods) { return nil } @@ -582,6 +676,7 @@ func (dc *DisruptionController) updatePdbSpec(pdb *policy.PodDisruptionBudget, c DesiredHealthy: desiredHealthy, ExpectedPods: expectedCount, PodDisruptionsAllowed: disruptionsAllowed, + DisruptedPods: disruptedPods, } return dc.getUpdater()(&newPdb) diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index 85c66622ce8..a02d26a21ba 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -21,6 +21,7 @@ import ( "reflect" "runtime/debug" "testing" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" @@ -32,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/util/intstr" "k8s.io/kubernetes/pkg/util/uuid" + "k8s.io/kubernetes/pkg/util/workqueue" ) type pdbStates map[string]policy.PodDisruptionBudget @@ -54,12 +56,14 @@ func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget { return (*ps)[key] } -func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32) { +func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, + disruptedPodMap map[string]unversioned.Time) { expectedStatus := policy.PodDisruptionBudgetStatus{ PodDisruptionsAllowed: disruptionsAllowed, CurrentHealthy: currentHealthy, DesiredHealthy: desiredHealthy, ExpectedPods: expectedPods, + DisruptedPods: disruptedPodMap, } actualStatus := ps.Get(key).Status if !reflect.DeepEqual(actualStatus, expectedStatus) { @@ -251,11 +255,11 @@ func TestNoSelector(t *testing.T) { add(t, dc.pdbLister.Store, pdb) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0) + ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]unversioned.Time{}) add(t, dc.podLister.Indexer, pod) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0) + ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]unversioned.Time{}) } // Verify that available/expected counts go up as we add pods, then verify that @@ -270,13 +274,13 @@ func TestUnavailable(t *testing.T) { // Add three pods, verifying that the counts go up at each step. pods := []*api.Pod{} for i := int32(0); i < 4; i++ { - ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i) + ps.VerifyPdbStatus(t, pdbName, 0, i, 3, i, map[string]unversioned.Time{}) pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i)) pods = append(pods, pod) add(t, dc.podLister.Indexer, pod) dc.sync(pdbName) } - ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4) + ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]unversioned.Time{}) // Now set one pod as unavailable pods[0].Status.Conditions = []api.PodCondition{} @@ -284,7 +288,7 @@ func TestUnavailable(t *testing.T) { dc.sync(pdbName) // Verify expected update - ps.VerifyPdbStatus(t, pdbName, 0, 3, 3, 4) + ps.VerifyPdbStatus(t, pdbName, 0, 3, 3, 4, map[string]unversioned.Time{}) } // Create a pod with no controller, and verify that a PDB with a percentage @@ -318,7 +322,7 @@ func TestReplicaSet(t *testing.T) { pod, _ := newPod(t, "pod") add(t, dc.podLister.Indexer, pod) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10) + ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]unversioned.Time{}) } // Verify that multiple controllers doesn't allow the PDB to be set true. @@ -376,9 +380,10 @@ func TestReplicationController(t *testing.T) { rc.Spec.Selector = labels add(t, dc.rcLister.Indexer, rc) dc.sync(pdbName) + // It starts out at 0 expected because, with no pods, the PDB doesn't know // about the RC. This is a known bug. TODO(mml): file issue - ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0) + ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]unversioned.Time{}) pods := []*api.Pod{} @@ -389,9 +394,9 @@ func TestReplicationController(t *testing.T) { add(t, dc.podLister.Indexer, pod) dc.sync(pdbName) if i < 2 { - ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3) + ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]unversioned.Time{}) } else { - ps.VerifyPdbStatus(t, pdbName, 1, 3, 2, 3) + ps.VerifyPdbStatus(t, pdbName, 1, 3, 2, 3, map[string]unversioned.Time{}) } } @@ -430,7 +435,7 @@ func TestTwoControllers(t *testing.T) { add(t, dc.rcLister.Indexer, rc) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0) + ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]unversioned.Time{}) pods := []*api.Pod{} @@ -445,11 +450,11 @@ func TestTwoControllers(t *testing.T) { add(t, dc.podLister.Indexer, pod) dc.sync(pdbName) if i <= unavailablePods { - ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize) + ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]unversioned.Time{}) } else if i-unavailablePods <= minimumOne { - ps.VerifyPdbStatus(t, pdbName, 0, i-unavailablePods, minimumOne, collectionSize) + ps.VerifyPdbStatus(t, pdbName, 0, i-unavailablePods, minimumOne, collectionSize, map[string]unversioned.Time{}) } else { - ps.VerifyPdbStatus(t, pdbName, 1, i-unavailablePods, minimumOne, collectionSize) + ps.VerifyPdbStatus(t, pdbName, 1, i-unavailablePods, minimumOne, collectionSize, map[string]unversioned.Time{}) } } @@ -457,14 +462,14 @@ func TestTwoControllers(t *testing.T) { d.Spec.Selector = newSel(dLabels) add(t, dc.dLister.Indexer, d) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize) + ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]unversioned.Time{}) rs, _ := newReplicaSet(t, collectionSize) rs.Spec.Selector = newSel(dLabels) rs.Labels = dLabels add(t, dc.rsLister.Indexer, rs) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize) + ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]unversioned.Time{}) // By the end of this loop, the number of ready pods should be N+2 (hence minimumTwo+2). unavailablePods = 2*collectionSize - (minimumTwo + 2) - unavailablePods @@ -478,33 +483,33 @@ func TestTwoControllers(t *testing.T) { add(t, dc.podLister.Indexer, pod) dc.sync(pdbName) if i <= unavailablePods { - ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize) + ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]unversioned.Time{}) } else if i-unavailablePods <= minimumTwo-(minimumOne+1) { - ps.VerifyPdbStatus(t, pdbName, 0, (minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize) + ps.VerifyPdbStatus(t, pdbName, 0, (minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize, map[string]unversioned.Time{}) } else { ps.VerifyPdbStatus(t, pdbName, i-unavailablePods-(minimumTwo-(minimumOne+1)), - (minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize) + (minimumOne+1)+(i-unavailablePods), minimumTwo, 2*collectionSize, map[string]unversioned.Time{}) } } // Now we verify we can bring down 1 pod and a disruption is still permitted, // but if we bring down two, it's not. Then we make the pod ready again and // verify that a disruption is permitted again. - ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize) + ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{}) pods[collectionSize-1].Status.Conditions = []api.PodCondition{} update(t, dc.podLister.Indexer, pods[collectionSize-1]) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize) + ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{}) pods[collectionSize-2].Status.Conditions = []api.PodCondition{} update(t, dc.podLister.Indexer, pods[collectionSize-2]) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize) + ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{}) pods[collectionSize-1].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}} update(t, dc.podLister.Indexer, pods[collectionSize-1]) dc.sync(pdbName) - ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize) + ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]unversioned.Time{}) } // Test pdb doesn't exist @@ -516,3 +521,30 @@ func TestPDBNotExist(t *testing.T) { t.Errorf("Unexpected error: %v, expect nil", err) } } + +func TestUpdateDisruptedPods(t *testing.T) { + dc, ps := newFakeDisruptionController() + dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb-queue") + pdb, pdbName := newPodDisruptionBudget(t, intstr.FromInt(1)) + currentTime := time.Now() + pdb.Status.DisruptedPods = map[string]unversioned.Time{ + "p1": {Time: currentTime}, // Should be removed, pod deletion started. + "p2": {Time: currentTime.Add(-5 * time.Minute)}, // Should be removed, expired. + "p3": {Time: currentTime}, // Should remain, pod untouched. + "notthere": {Time: currentTime}, // Should be removed, pod deleted. + } + add(t, dc.pdbLister.Store, pdb) + + pod1, _ := newPod(t, "p1") + pod1.DeletionTimestamp = &unversioned.Time{Time: time.Now()} + pod2, _ := newPod(t, "p2") + pod3, _ := newPod(t, "p3") + + add(t, dc.podLister.Indexer, pod1) + add(t, dc.podLister.Indexer, pod2) + add(t, dc.podLister.Indexer, pod3) + + dc.sync(pdbName) + + ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]unversioned.Time{"p3": {Time: currentTime}}) +} diff --git a/pkg/registry/core/pod/etcd/eviction.go b/pkg/registry/core/pod/etcd/eviction.go index f7b3042ccc0..dbe7aed9dc8 100644 --- a/pkg/registry/core/pod/etcd/eviction.go +++ b/pkg/registry/core/pod/etcd/eviction.go @@ -18,6 +18,7 @@ package etcd import ( "fmt" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/rest" @@ -29,6 +30,16 @@ import ( "k8s.io/kubernetes/pkg/runtime" ) +const ( + // MaxDisruptedPodSize is the max size of PodDisruptionBudgetStatus.DisruptedPods. API server eviction + // subresource handler will refuse to evict pods covered by the corresponding PDB + // if the size of the map exceeds this value. It means a large number of + // evictions have been approved by the API server but not noticed by the PDB controller yet. + // This situation should self-correct because the PDB controller removes + // entries from the map automatically after the PDB DeletionTimeout regardless. + MaxDisruptedPodSize = 2000 +) + func newEvictionStorage(store *registry.Store, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) *EvictionREST { return &EvictionREST{store: store, podDisruptionBudgetClient: podDisruptionBudgetClient} } @@ -72,7 +83,7 @@ func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Obje // If it was false already, or if it becomes false during the course of our retries, // raise an error marked as a 429. - ok, err := r.checkAndDecrement(pod.Namespace, pdb) + ok, err := r.checkAndDecrement(pod.Namespace, pod.Name, pdb) if err != nil { return nil, err } @@ -104,14 +115,25 @@ func (r *EvictionREST) Create(ctx api.Context, obj runtime.Object) (runtime.Obje return &unversioned.Status{Status: unversioned.StatusSuccess}, nil } -func (r *EvictionREST) checkAndDecrement(namespace string, pdb policy.PodDisruptionBudget) (ok bool, err error) { +func (r *EvictionREST) checkAndDecrement(namespace string, podName string, pdb policy.PodDisruptionBudget) (ok bool, err error) { if pdb.Status.PodDisruptionsAllowed < 0 { return false, fmt.Errorf("pdb disruptions allowed is negative") } + if len(pdb.Status.DisruptedPods) > MaxDisruptedPodSize { + return false, fmt.Errorf("DisrputedPods map too big - too many evictions not confirmed by PDB controller") + } if pdb.Status.PodDisruptionsAllowed == 0 { return false, nil } pdb.Status.PodDisruptionsAllowed-- + if pdb.Status.DisruptedPods == nil { + pdb.Status.DisruptedPods = make(map[string]unversioned.Time) + } + // Eviction handler needs to inform the PDB controller that it is about to delete a pod + // so it should not consider it as available in calculations when updating PodDisruptions allowed. + // If the pod is not deleted within a reasonable time limit PDB controller will assume that it won't + // be deleted at all and remove it from DisruptedPod map. + pdb.Status.DisruptedPods[podName] = unversioned.Time{Time: time.Now()} if _, err := r.podDisruptionBudgetClient.PodDisruptionBudgets(namespace).UpdateStatus(&pdb); err != nil { return false, err }