diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 9f3a246e5b7..eb39b972349 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -53,6 +53,7 @@ import ( "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" + "k8s.io/utils/clock" ) // DeletionTimeout sets maximum time from the moment a pod is added to DisruptedPods in PDB.Status @@ -65,7 +66,7 @@ import ( // clock (via ntp for example). Otherwise PodDisruptionBudget controller may not provide enough // protection against unwanted pod disruptions. const ( - DeletionTimeout = 2 * 60 * time.Second + DeletionTimeout = 2 * time.Minute ) type updater func(context.Context, *policy.PodDisruptionBudget) error @@ -103,6 +104,8 @@ type DisruptionController struct { recorder record.EventRecorder getUpdater func() updater + + clock clock.Clock } // controllerAndScale is used to return (controller, scale) pairs from the @@ -127,11 +130,41 @@ func NewDisruptionController( restMapper apimeta.RESTMapper, scaleNamespacer scaleclient.ScalesGetter, discoveryClient discovery.DiscoveryInterface, +) *DisruptionController { + return NewDisruptionControllerInternal( + podInformer, + pdbInformer, + rcInformer, + rsInformer, + dInformer, + ssInformer, + kubeClient, + restMapper, + scaleNamespacer, + discoveryClient, + clock.RealClock{}) +} + +// NewDisruptionControllerInternal allows to set a clock and +// stalePodDisruptionTimeout +// It is only supposed to be used by tests. +func NewDisruptionControllerInternal( + podInformer coreinformers.PodInformer, + pdbInformer policyinformers.PodDisruptionBudgetInformer, + rcInformer coreinformers.ReplicationControllerInformer, + rsInformer appsv1informers.ReplicaSetInformer, + dInformer appsv1informers.DeploymentInformer, + ssInformer appsv1informers.StatefulSetInformer, + kubeClient clientset.Interface, + restMapper apimeta.RESTMapper, + scaleNamespacer scaleclient.ScalesGetter, + discoveryClient discovery.DiscoveryInterface, + clock clock.WithTicker, ) *DisruptionController { dc := &DisruptionController{ kubeClient: kubeClient, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "disruption"), - recheckQueue: workqueue.NewNamedDelayingQueue("disruption_recheck"), + queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()), + recheckQueue: workqueue.NewDelayingQueueWithCustomClock(clock, "disruption_recheck"), broadcaster: record.NewBroadcaster(), } dc.recorder = dc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "controllermanager"}) @@ -172,6 +205,8 @@ func NewDisruptionController( dc.scaleNamespacer = scaleNamespacer dc.discoveryClient = discoveryClient + dc.clock = clock + return dc } @@ -564,9 +599,9 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool { } func (dc *DisruptionController) sync(ctx context.Context, key string) error { - startTime := time.Now() + startTime := dc.clock.Now() defer func() { - klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime)) + klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -617,7 +652,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr strings.Join(unmanagedPods, ",'")) } - currentTime := time.Now() + currentTime := dc.clock.Now() disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index e292a9839a1..0e1f698d4a8 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -27,11 +27,12 @@ import ( "testing" "time" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" apps "k8s.io/api/apps/v1" autoscalingapi "k8s.io/api/autoscaling/v1" v1 "k8s.io/api/core/v1" policy "k8s.io/api/policy/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta/testrestmapper" @@ -52,6 +53,7 @@ import ( "k8s.io/klog/v2" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" + clocktesting "k8s.io/utils/clock/testing" utilpointer "k8s.io/utils/pointer" ) @@ -72,8 +74,8 @@ func (ps *pdbStates) Get(key string) policy.PodDisruptionBudget { return (*ps)[key] } -func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, - disruptedPodMap map[string]metav1.Time) { +func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowed, currentHealthy, desiredHealthy, expectedPods int32, disruptedPodMap map[string]metav1.Time) { + t.Helper() actualPDB := ps.Get(key) actualConditions := actualPDB.Status.Conditions actualPDB.Status.Conditions = nil @@ -86,9 +88,8 @@ func (ps *pdbStates) VerifyPdbStatus(t *testing.T, key string, disruptionsAllowe ObservedGeneration: actualPDB.Generation, } actualStatus := actualPDB.Status - if !apiequality.Semantic.DeepEqual(actualStatus, expectedStatus) { - debug.PrintStack() - t.Fatalf("PDB %q status mismatch. Expected %+v but got %+v.", key, expectedStatus, actualStatus) + if diff := cmp.Diff(expectedStatus, actualStatus, cmpopts.EquateEmpty()); diff != "" { + t.Fatalf("PDB %q status mismatch (-want,+got):\n%s", key, diff) } cond := apimeta.FindStatusCondition(actualConditions, policy.DisruptionAllowedCondition) @@ -158,8 +159,9 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { fakeDiscovery := &discoveryfake.FakeDiscovery{ Fake: &core.Fake{}, } + fakeClock := clocktesting.NewFakeClock(time.Now()) - dc := NewDisruptionController( + dc := NewDisruptionControllerInternal( informerFactory.Core().V1().Pods(), informerFactory.Policy().V1().PodDisruptionBudgets(), informerFactory.Core().V1().ReplicationControllers(), @@ -170,6 +172,7 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { testrestmapper.TestOnlyStaticRESTMapper(scheme), fakeScaleClient, fakeDiscovery, + fakeClock, ) dc.getUpdater = func() updater { return ps.Set } dc.podListerSynced = alwaysReady @@ -990,17 +993,17 @@ func TestUpdateDisruptedPods(t *testing.T) { dc, ps := newFakeDisruptionController() dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue") pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1)) - currentTime := time.Now() + currentTime := dc.clock.Now() pdb.Status.DisruptedPods = map[string]metav1.Time{ "p1": {Time: currentTime}, // Should be removed, pod deletion started. - "p2": {Time: currentTime.Add(-5 * time.Minute)}, // Should be removed, expired. - "p3": {Time: currentTime}, // Should remain, pod untouched. + "p2": {Time: currentTime.Add(-3 * time.Minute)}, // Should be removed, expired. + "p3": {Time: currentTime.Add(-time.Minute)}, // Should remain, pod untouched. "notthere": {Time: currentTime}, // Should be removed, pod deleted. } add(t, dc.pdbStore, pdb) pod1, _ := newPod(t, "p1") - pod1.DeletionTimestamp = &metav1.Time{Time: time.Now()} + pod1.DeletionTimestamp = &metav1.Time{Time: dc.clock.Now()} pod2, _ := newPod(t, "p2") pod3, _ := newPod(t, "p3") @@ -1010,7 +1013,7 @@ func TestUpdateDisruptedPods(t *testing.T) { dc.sync(context.TODO(), pdbName) - ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}}) + ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}}) } func TestBasicFinderFunctions(t *testing.T) { @@ -1284,7 +1287,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) { updatedPDB.Status.DisruptionsAllowed -= int32(len(podNames)) updatedPDB.Status.DisruptedPods = make(map[string]metav1.Time) for _, name := range podNames { - updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(time.Now()) + updatedPDB.Status.DisruptedPods[name] = metav1.NewTime(dc.clock.Now()) } if err := dc.coreClient.Tracker().Update(poddisruptionbudgetsResource, updatedPDB, updatedPDB.Namespace); err != nil { t.Fatalf("Eviction (PDB update) failed: %v", err) diff --git a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go index 267f4ff4084..91cd33f193b 100644 --- a/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go +++ b/staging/src/k8s.io/client-go/util/workqueue/rate_limiting_queue.go @@ -50,6 +50,13 @@ func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitin } } +func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface { + return &rateLimitingType{ + DelayingInterface: di, + rateLimiter: rateLimiter, + } +} + // rateLimitingType wraps an Interface and provides rateLimited re-enquing type rateLimitingType struct { DelayingInterface diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go index a810d45d5e6..7a0b4cefcbb 100644 --- a/test/integration/disruption/disruption_test.go +++ b/test/integration/disruption/disruption_test.go @@ -24,7 +24,6 @@ import ( "time" "github.com/google/go-cmp/cmp" - v1 "k8s.io/api/core/v1" policyv1 "k8s.io/api/policy/v1" "k8s.io/api/policy/v1beta1"