diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 9f3a246e5b7..581992fc100 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -51,21 +51,30 @@ import ( "k8s.io/client-go/util/workqueue" pdbhelper "k8s.io/component-helpers/apps/poddisruptionbudget" "k8s.io/klog/v2" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" + utilpod "k8s.io/kubernetes/pkg/util/pod" + "k8s.io/utils/clock" ) -// DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status -// to the time when the pod is expected to be seen by PDB controller as having been marked for deletion. -// If the pod was not marked for deletion during that time it is assumed that it won't be deleted at -// all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that -// pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should -// be more than enough. -// If the controller is running on a different node it is important that the two nodes have synced -// clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough -// protection against unwanted pod disruptions. const ( - DeletionTimeout = 2 * 60 * time.Second + // DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status + // to the time when the pod is expected to be seen by PDB controller as having been marked for deletion. + // If the pod was not marked for deletion during that time it is assumed that it won't be deleted at + // all and the corresponding entry can be removed from pdb.Status.DisruptedPods. It is assumed that + // pod/pdb apiserver to controller latency is relatively small (like 1-2sec) so the below value should + // be more than enough. + // If the controller is running on a different node it is important that the two nodes have synced + // clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough + // protection against unwanted pod disruptions. + DeletionTimeout = 2 * time.Minute + + // stalePodDisruptionTimeout sets the maximum time a pod can have a stale + // DisruptionTarget condition (the condition is present, but the Pod doesn't + // have a DeletionTimestamp). + // Once the timeout is reached, this controller attempts to set the status + // of the condition to False. + stalePodDisruptionTimeout = 2 * time.Minute ) type updater func(context.Context, *policy.PodDisruptionBudget) error @@ -99,10 +108,16 @@ type DisruptionController struct { queue workqueue.RateLimitingInterface recheckQueue workqueue.DelayingInterface + // pod keys that need to be synced due to a stale DisruptionTarget condition. + stalePodDisruptionQueue workqueue.RateLimitingInterface + stalePodDisruptionTimeout time.Duration + broadcaster record.EventBroadcaster recorder record.EventRecorder getUpdater func() updater + + clock clock.Clock } // controllerAndScale is used to return (controller, scale) pairs from the @@ -127,12 +142,46 @@ func NewDisruptionController( restMapper apimeta.RESTMapper, scaleNamespacer scaleclient.ScalesGetter, discoveryClient discovery.DiscoveryInterface, +) *DisruptionController { + return NewDisruptionControllerInternal( + podInformer, + pdbInformer, + rcInformer, + rsInformer, + dInformer, + ssInformer, + kubeClient, + restMapper, + scaleNamespacer, + discoveryClient, + clock.RealClock{}, + stalePodDisruptionTimeout) +} + +// NewDisruptionControllerInternal allows to set a clock and +// stalePodDisruptionTimeout +// It is only supposed to be used by tests. +func NewDisruptionControllerInternal( + podInformer coreinformers.PodInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, + rcInformer coreinformers.ReplicationControllerInformer, + rsInformer appsv1informers.ReplicaSetInformer, + dInformer appsv1informers.DeploymentInformer, + ssInformer appsv1informers.StatefulSetInformer, + kubeClient clientset.Interface, + restMapper apimeta.RESTMapper, + scaleNamespacer scaleclient.ScalesGetter, + discoveryClient discovery.DiscoveryInterface, + clock clock.WithTicker, + stalePodDisruptionTimeout time.Duration, ) *DisruptionController { dc := &DisruptionController{ - kubeClient: kubeClient, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), - recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"), - broadcaster: record.NewBroadcaster(), + kubeClient: kubeClient, + queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()), + recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"), + stalePodDisruptionQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "stale_pod_disruption"), workqueue.DefaultControllerRateLimiter()), + broadcaster: record.NewBroadcaster(), + stalePodDisruptionTimeout: stalePodDisruptionTimeout, } dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"}) @@ -172,6 +221,8 @@ func NewDisruptionController( dc.scaleNamespacer = scaleNamespacer dc.discoveryClient = discoveryClient + dc.clock = clock + return dc } @@ -376,6 +427,7 @@ func (dc *DisruptionController) Run(ctx context.Context) { defer dc.queue.ShutDown() defer dc.recheckQueue.ShutDown() + defer dc.stalePodDisruptionQueue.ShutDown() klog.Infof("Starting disruption controller") defer klog.Infof("Shutting down disruption controller") @@ -386,6 +438,7 @@ func (dc *DisruptionController) Run(ctx context.Context) { go wait.UntilWithContext(ctx, dc.worker, time.Second) go wait.Until(dc.recheckWorker, time.Second, ctx.Done()) + go wait.UntilWithContext(ctx, dc.stalePodDisruptionWorker, time.Second) <-ctx.Done() } @@ -427,22 +480,28 @@ func (dc *DisruptionController) addPod(obj interface{}) { pdb := dc.getPdbForPod(pod) if pdb == nil { klog.V(4).Infof("No matching pdb for pod %q", pod.Name) - return + } else { + klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name) + dc.enqueuePdb(pdb) + } + if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has { + dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) } - klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name) - dc.enqueuePdb(pdb) } -func (dc *DisruptionController) updatePod(old, cur interface{}) { +func (dc *DisruptionController) updatePod(_, cur interface{}) { pod := cur.(*v1.Pod) klog.V(4).Infof("updatePod called on pod %q", pod.Name) pdb := dc.getPdbForPod(pod) if pdb == nil { klog.V(4).Infof("No matching pdb for pod %q", pod.Name) - return + } else { + klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name) + dc.enqueuePdb(pdb) + } + if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has { + dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) } - klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name) - dc.enqueuePdb(pdb) } func (dc *DisruptionController) deletePod(obj interface{}) { @@ -492,6 +551,16 @@ func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBu dc.recheckQueue.AddAfter(key, delay) } +func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(pod *v1.Pod, d time.Duration) { + key, err := controller.KeyFunc(pod) + if err != nil { + klog.ErrorS(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod)) + return + } + dc.stalePodDisruptionQueue.AddAfter(key, d) + klog.V(4).InfoS("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod)) +} + func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget { // GetPodPodDisruptionBudgets returns an error only if no // PodDisruptionBudgets are found. We don't return that as an error to the @@ -563,10 +632,31 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool { return true } +func (dc *DisruptionController) stalePodDisruptionWorker(ctx context.Context) { + for dc.processNextStalePodDisruptionWorkItem(ctx) { + } +} + +func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx context.Context) bool { + key, quit := dc.stalePodDisruptionQueue.Get() + if quit { + return false + } + defer dc.stalePodDisruptionQueue.Done(key) + err := dc.syncStalePodDisruption(ctx, key.(string)) + if err == nil { + dc.queue.Forget(key) + return true + } + utilruntime.HandleError(fmt.Errorf("error syncing Pod %v to clear DisruptionTarget condition, requeueing: %v", key.(string), err)) + dc.stalePodDisruptionQueue.AddRateLimited(key) + return true +} + func (dc *DisruptionController) sync(ctx context.Context, key string) error { - startTime := time.Now() + startTime := dc.clock.Now() defer func() { - klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime)) + klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -617,7 +707,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr strings.Join(unmanagedPods, ",'")) } - currentTime := time.Now() + currentTime := dc.clock.Now() disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) @@ -631,6 +721,48 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr return err } +func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error { + startTime := dc.clock.Now() + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + defer func() { + klog.V(4).InfoS("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime)) + }() + pod, err := dc.podLister.Pods(namespace).Get(name) + if errors.IsNotFound(err) { + klog.V(4).InfoS("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod)) + return nil + } + if err != nil { + return err + } + + hasCond, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod) + if !hasCond { + return nil + } + if cleanAfter > 0 { + dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) + return nil + } + + newStatus := pod.Status.DeepCopy() + updated := apipod.UpdatePodCondition(newStatus, &v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionFalse, + }) + if !updated { + return nil + } + if _, _, _, err := utilpod.PatchPodStatus(ctx, dc.kubeClient, namespace, name, pod.UID, pod.Status, *newStatus); err != nil { + return err + } + klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod)) + return nil +} + func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) { err = nil // TODO(davidopp): consider making the way expectedCount and rules about @@ -747,7 +879,7 @@ func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, curr if disruptionTime, found := disruptedPods[pod.Name]; found && disruptionTime.Time.Add(DeletionTimeout).After(currentTime) { continue } - if podutil.IsPodReady(pod) { + if apipod.IsPodReady(pod) { currentHealthy++ } } @@ -857,3 +989,18 @@ func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy. _, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{}) return err } + +func (dc *DisruptionController) nonTerminatingPodHasStaleDisruptionCondition(pod *v1.Pod) (bool, time.Duration) { + if pod.DeletionTimestamp != nil { + return false, 0 + } + _, cond := apipod.GetPodCondition(&pod.Status, v1.AlphaNoCompatGuaranteeDisruptionTarget) + if cond == nil || cond.Status != v1.ConditionTrue { + return false, 0 + } + waitFor := dc.stalePodDisruptionTimeout - dc.clock.Since(cond.LastTransitionTime.Time) + if waitFor < 0 { + waitFor = 0 + } + return true, waitFor +} diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index e292a9839a1..aad96541fcf 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -27,11 +27,12 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" apps "k8s.io/api/apps/v1" autoscalingapi "k8s.io/api/autoscaling/v1" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta/testrestmapper" @@ -52,6 +53,7 @@ import ( "k8s.io/klog/v2" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" + clocktesting "k8s.io/utils/clock/testing" utilpointer "k8s.io/utils/pointer" ) @@ -72,8 +74,8 @@ func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget { return (*ps)[key] } -func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, - disruptedPodMap map[string]metav1.Time) { +func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, disruptedPodMap map[string]metav1.Time) { + t.Helper() actualPDB := ps.Get(key) actualConditions := actualPDB.Status.Conditions actualPDB.Status.Conditions = nil @@ -86,9 +88,8 @@ func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowe ObservedGeneration: actualPDB.Generation, } actualStatus := actualPDB.Status - if !apiequality.Semantic.DeepEqual(actualStatus, expectedStatus) { - debug.PrintStack() - t.Fatalf("PDB %q status mismatch. Expected %+v but got %+v.", key, expectedStatus, actualStatus) + if diff := cmp.Diff(expectedStatus, actualStatus, cmpopts.EquateEmpty()); diff != "" { + t.Fatalf("PDB %q status mismatch (-want,+got):\n%s", key, diff) } cond := apimeta.FindStatusCondition(actualConditions, policy.DisruptionAllowedCondition) @@ -138,6 +139,7 @@ type disruptionController struct { coreClient *fake.Clientset scaleClient *scalefake.FakeScaleClient discoveryClient *discoveryfake.FakeDiscovery + informerFactory informers.SharedInformerFactory } var customGVK = schema.GroupVersionKind{ @@ -147,6 +149,10 @@ var customGVK = schema.GroupVersionKind{ } func newFakeDisruptionController() (*disruptionController, *pdbStates) { + return newFakeDisruptionControllerWithTime(context.TODO(), time.Now()) +} + +func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*disruptionController, *pdbStates) { ps := &pdbStates{} coreClient := fake.NewSimpleClientset() @@ -158,8 +164,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { fakeDiscovery := &discoveryfake.FakeDiscovery{ Fake: &core.Fake{}, } + fakeClock := clocktesting.NewFakeClock(now) - dc := NewDisruptionController( + dc := NewDisruptionControllerInternal( informerFactory.Core().V1().Pods(), informerFactory.Policy().V1().PodDisruptionBudgets(), informerFactory.Core().V1().ReplicationControllers(), @@ -170,6 +177,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { testrestmapper.TestOnlyStaticRESTMapper(scheme), fakeScaleClient, fakeDiscovery, + fakeClock, + stalePodDisruptionTimeout, ) dc.getUpdater = func() updater { return ps.Set } dc.podListerSynced = alwaysReady @@ -178,9 +187,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { dc.rsListerSynced = alwaysReady dc.dListerSynced = alwaysReady dc.ssListerSynced = alwaysReady - ctx := context.TODO() informerFactory.Start(ctx.Done()) - informerFactory.WaitForCacheSync(nil) + informerFactory.WaitForCacheSync(ctx.Done()) return &disruptionController{ dc, @@ -193,6 +201,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { coreClient, fakeScaleClient, fakeDiscovery, + informerFactory, }, ps } @@ -990,17 +999,17 @@ func TestUpdateDisruptedPods(t *testing.T) { dc, ps := newFakeDisruptionController() dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue") pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1)) - currentTime := time.Now() + currentTime := dc.clock.Now() pdb.Status.DisruptedPods = map[string]metav1.Time{ "p1": {Time: currentTime}, // Should be removed, pod deletion started. - "p2": {Time: currentTime.Add(-5 * time.Minute)}, // Should be removed, expired. - "p3": {Time: currentTime}, // Should remain, pod untouched. + "p2": {Time: currentTime.Add(-3 * time.Minute)}, // Should be removed, expired. + "p3": {Time: currentTime.Add(-time.Minute)}, // Should remain, pod untouched. "notthere": {Time: currentTime}, // Should be removed, pod deleted. } add(t, dc.pdbStore, pdb) pod1, _ := newPod(t, "p1") - pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()} + pod1.DeletionTimestamp = &metav1.Time{Time: dc.clock.Now()} pod2, _ := newPod(t, "p2") pod3, _ := newPod(t, "p3") @@ -1010,7 +1019,7 @@ func TestUpdateDisruptedPods(t *testing.T) { dc.sync(context.TODO(), pdbName) - ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}}) + ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}}) } func TestBasicFinderFunctions(t *testing.T) { @@ -1284,7 +1293,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) { updatedPDB.Status.DisruptionsAllowed -= int32(len(podNames)) updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time) for _, name := range podNames { - updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now()) + updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(dc.clock.Now()) } if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil { t.Fatalf("Eviction (PDB update) failed: %v", err) @@ -1378,6 +1387,151 @@ func TestInvalidSelectors(t *testing.T) { } } +func TestStalePodDisruption(t *testing.T) { + now := time.Now() + cases := map[string]struct { + pod *v1.Pod + timePassed time.Duration + wantConditions []v1.PodCondition + }{ + "stale pod disruption": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: now}, + }, + }, + }, + }, + timePassed: 2*time.Minute + time.Second, + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionFalse, + }, + }, + }, + "pod disruption in progress": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: now}, + }, + }, + }, + }, + timePassed: 2*time.Minute - time.Second, + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + "pod disruption actuated": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + DeletionTimestamp: &metav1.Time{Time: now}, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: now}, + }, + }, + }, + }, + timePassed: 2*time.Minute + time.Second, + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + "no pod disruption": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + DeletionTimestamp: &metav1.Time{Time: now}, + }, + }, + timePassed: 2*time.Minute + time.Second, + }, + "pod disruption cleared": { + pod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: metav1.NamespaceDefault, + DeletionTimestamp: &metav1.Time{Time: now}, + }, + Status: v1.PodStatus{ + Conditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionFalse, + }, + }, + }, + }, + timePassed: 2*time.Minute + time.Second, + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionFalse, + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + dc, _ := newFakeDisruptionControllerWithTime(ctx, now) + go dc.Run(ctx) + if _, err := dc.coreClient.CoreV1().Pods(tc.pod.Namespace).Create(ctx, tc.pod, metav1.CreateOptions{}); err != nil { + t.Fatalf("Failed to create pod: %v", err) + } + if err := dc.informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(tc.pod); err != nil { + t.Fatalf("Failed adding pod to indexer: %v", err) + } + dc.clock.Sleep(tc.timePassed) + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + return dc.stalePodDisruptionQueue.Len() == 0, nil + }); err != nil { + t.Fatalf("Failed waiting for worker to sync: %v", err) + } + pod, err := dc.kubeClient.CoreV1().Pods(tc.pod.Namespace).Get(ctx, tc.pod.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed getting updated pod: %v", err) + } + diff := cmp.Diff(tc.wantConditions, pod.Status.Conditions, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")) + if diff != "" { + t.Errorf("Obtained pod conditions (-want,+got):\n%s", diff) + } + }) + } +} + // waitForCacheCount blocks until the given cache store has the desired number // of items in it. This will return an error if the condition is not met after a // 10 second timeout. diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go index 267f4ff4084..91cd33f193b 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go @@ -50,6 +50,13 @@ func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitin } } +func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: di, + rateLimiter: rateLimiter, + } +} + // rateLimitingType wraps an Interface and provides rateLimited re-enquing type rateLimitingType struct { DelayingInterface diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go index a810d45d5e6..3075b235c2f 100644 --- a/test/integration/disruption/disruption_test.go +++ b/test/integration/disruption/disruption_test.go @@ -24,7 +24,7 @@ import ( "time" "github.com/google/go-cmp/cmp" - + "github.com/google/go-cmp/cmp/cmpopts" v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" "k8s.io/api/policy/v1beta1" @@ -49,9 +49,13 @@ import ( "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/integration/util" + "k8s.io/utils/clock" "k8s.io/utils/pointer" ) +const stalePodDisruptionTimeout = 3 * time.Second + func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) { server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd()) @@ -83,7 +87,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti t.Fatalf("Error creating dynamicClient: %v", err) } - pdbc := disruption.NewDisruptionController( + pdbc := disruption.NewDisruptionControllerInternal( informers.Core().V1().Pods(), informers.Policy().V1().PodDisruptionBudgets(), informers.Core().V1().ReplicationControllers(), @@ -94,6 +98,8 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti mapper, scaleClient, client.Discovery(), + clock.RealClock{}, + stalePodDisruptionTimeout, ) return server, pdbc, informers, clientSet, apiExtensionClient, dynamicClient } @@ -410,7 +416,7 @@ func createPod(ctx context.Context, t *testing.T, name, namespace string, labels t.Error(err) } addPodConditionReady(pod) - if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil { + if _, err := clientSet.CoreV1().Pods(namespace).UpdateStatus(ctx, pod, metav1.UpdateOptions{}); err != nil { t.Error(err) } } @@ -645,3 +651,91 @@ func TestPatchCompatibility(t *testing.T) { }) } } + +func TestStalePodDisruption(t *testing.T) { + s, pdbc, informers, clientSet, _, _ := setup(t) + defer s.TearDownFn() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nsName := "pdb-stale-pod-disruption" + createNs(ctx, t, nsName, clientSet) + + informers.Start(ctx.Done()) + informers.WaitForCacheSync(ctx.Done()) + go pdbc.Run(ctx) + + cases := map[string]struct { + deletePod bool + wantConditions []v1.PodCondition + }{ + "stale-condition": { + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionFalse, + }, + }, + }, + "deleted-pod": { + deletePod: true, + wantConditions: []v1.PodCondition{ + { + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + }, + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + pod := util.InitPausePod(&util.PausePodConfig{ + Name: name, + Namespace: nsName, + NodeName: "foo", // mock pod as scheduled so that it's not immediately deleted when calling Delete. + }) + var err error + pod, err = util.CreatePausePod(clientSet, pod) + if err != nil { + t.Fatalf("Failed creating pod: %v", err) + } + + pod.Status.Phase = v1.PodRunning + pod.Status.Conditions = append(pod.Status.Conditions, v1.PodCondition{ + Type: v1.AlphaNoCompatGuaranteeDisruptionTarget, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Now(), + }) + pod, err = clientSet.CoreV1().Pods(nsName).UpdateStatus(ctx, pod, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed updating pod: %v", err) + } + + if tc.deletePod { + if err := clientSet.CoreV1().Pods(nsName).Delete(ctx, name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("Failed to delete pod: %v", err) + } + } + time.Sleep(stalePodDisruptionTimeout) + diff := "" + if err := wait.PollImmediate(100*time.Millisecond, wait.ForeverTestTimeout, func() (done bool, err error) { + pod, err = clientSet.CoreV1().Pods(nsName).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if tc.deletePod && pod.DeletionTimestamp == nil { + return false, nil + } + diff = cmp.Diff(tc.wantConditions, pod.Status.Conditions, cmpopts.IgnoreFields(v1.PodCondition{}, "LastTransitionTime")) + return diff == "", nil + }); err != nil { + t.Errorf("Failed waiting for status to change: %v", err) + if diff != "" { + t.Errorf("Pod has conditions (-want,+got):\n%s", diff) + } + } + }) + } +}