diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index 8c66cbf2a2f..001086cb8d8 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -38,6 +38,7 @@ func startDisruptionController(ctx context.Context, controllerContext Controller } go disruption.NewDisruptionController( + ctx, controllerContext.InformerFactory.Core().V1().Pods(), controllerContext.InformerFactory.Policy().V1().PodDisruptionBudgets(), controllerContext.InformerFactory.Core().V1().ReplicationControllers(), diff --git a/hack/logcheck.conf b/hack/logcheck.conf index d06f62968a2..4a397f268fd 100644 --- a/hack/logcheck.conf +++ b/hack/logcheck.conf @@ -38,7 +38,6 @@ contextual k8s.io/kubernetes/test/e2e/dra/.* # this point it is easier to list the exceptions. -contextual k8s.io/kubernetes/pkg/controller/controller_ref_manager.go -contextual k8s.io/kubernetes/pkg/controller/controller_utils.go --contextual k8s.io/kubernetes/pkg/controller/disruption/.* -contextual k8s.io/kubernetes/pkg/controller/endpoint/.* -contextual k8s.io/kubernetes/pkg/controller/endpointslicemirroring/.* -contextual k8s.io/kubernetes/pkg/controller/garbagecollector/.* diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 27a9179b202..da13ebb23e3 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -134,6 +134,7 @@ type controllerAndScale struct { type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) func NewDisruptionController( + ctx context.Context, podInformer coreinformers.PodInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer, rcInformer coreinformers.ReplicationControllerInformer, @@ -146,6 +147,7 @@ func NewDisruptionController( discoveryClient discovery.DiscoveryInterface, ) *DisruptionController { return NewDisruptionControllerInternal( + ctx, podInformer, pdbInformer, rcInformer, @@ -163,7 +165,7 @@ func NewDisruptionController( // NewDisruptionControllerInternal allows to set a clock and // stalePodDisruptionTimeout // It is only supposed to be used by tests. -func NewDisruptionControllerInternal( +func NewDisruptionControllerInternal(ctx context.Context, podInformer coreinformers.PodInformer, pdbInformer policyinformers.PodDisruptionBudgetInformer, rcInformer coreinformers.ReplicationControllerInformer, @@ -177,6 +179,7 @@ func NewDisruptionControllerInternal( clock clock.WithTicker, stalePodDisruptionTimeout time.Duration, ) *DisruptionController { + logger := klog.FromContext(ctx) dc := &DisruptionController{ kubeClient: kubeClient, queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "disruption"), workqueue.DefaultControllerRateLimiter()), @@ -190,20 +193,30 @@ func NewDisruptionControllerInternal( dc.getUpdater = func() updater { return dc.writePdbStatus } podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addPod, - UpdateFunc: dc.updatePod, - DeleteFunc: dc.deletePod, + AddFunc: func(obj interface{}) { + dc.addPod(logger, obj) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + dc.updatePod(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + dc.deletePod(logger, obj) + }, }) dc.podLister = podInformer.Lister() dc.podListerSynced = podInformer.Informer().HasSynced - pdbInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addDb, - UpdateFunc: dc.updateDb, - DeleteFunc: dc.removeDb, + pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + dc.addDB(logger, obj) }, - ) + UpdateFunc: func(oldObj, newObj interface{}) { + dc.updateDB(logger, oldObj, newObj) + }, + DeleteFunc: func(obj interface{}) { + dc.removeDB(logger, obj) + }, + }) dc.pdbLister = pdbInformer.Lister() dc.pdbListerSynced = pdbInformer.Informer().HasSynced @@ -418,12 +431,13 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, func (dc *DisruptionController) Run(ctx context.Context) { defer utilruntime.HandleCrash() + logger := klog.FromContext(ctx) // Start events processing pipeline. if dc.kubeClient != nil { - klog.Infof("Sending events to api server.") + logger.Info("Sending events to api server.") dc.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: dc.kubeClient.CoreV1().Events("")}) } else { - klog.Infof("No api server defined - no events will be sent to API server.") + logger.Info("No api server defined - no events will be sent to API server.") } defer dc.broadcaster.Shutdown() @@ -431,8 +445,8 @@ func (dc *DisruptionController) Run(ctx context.Context) { defer dc.recheckQueue.ShutDown() defer dc.stalePodDisruptionQueue.ShutDown() - klog.Infof("Starting disruption controller") - defer klog.Infof("Shutting down disruption controller") + logger.Info("Starting disruption controller") + defer logger.Info("Shutting down disruption controller") if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { return @@ -445,68 +459,68 @@ func (dc *DisruptionController) Run(ctx context.Context) { <-ctx.Done() } -func (dc *DisruptionController) addDb(obj interface{}) { +func (dc *DisruptionController) addDB(logger klog.Logger, obj interface{}) { pdb := obj.(*policy.PodDisruptionBudget) - klog.V(4).Infof("add DB %q", pdb.Name) - dc.enqueuePdb(pdb) + logger.V(4).Info("Add DB", "podDisruptionBudget", klog.KObj(pdb)) + dc.enqueuePdb(logger, pdb) } -func (dc *DisruptionController) updateDb(old, cur interface{}) { +func (dc *DisruptionController) updateDB(logger klog.Logger, old, cur interface{}) { // TODO(mml) ignore updates where 'old' is equivalent to 'cur'. pdb := cur.(*policy.PodDisruptionBudget) - klog.V(4).Infof("update DB %q", pdb.Name) - dc.enqueuePdb(pdb) + logger.V(4).Info("Update DB", "podDisruptionBudget", klog.KObj(pdb)) + dc.enqueuePdb(logger, pdb) } -func (dc *DisruptionController) removeDb(obj interface{}) { +func (dc *DisruptionController) removeDB(logger klog.Logger, obj interface{}) { pdb, ok := obj.(*policy.PodDisruptionBudget) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - klog.Errorf("Couldn't get object from tombstone %+v", obj) + logger.Error(nil, "Couldn't get object from tombstone", "obj", obj) return } pdb, ok = tombstone.Obj.(*policy.PodDisruptionBudget) if !ok { - klog.Errorf("Tombstone contained object that is not a pdb %+v", obj) + logger.Error(nil, "Tombstone contained object that is not a PDB", "obj", obj) return } } - klog.V(4).Infof("remove DB %q", pdb.Name) - dc.enqueuePdb(pdb) + logger.V(4).Info("Remove DB", "podDisruptionBudget", klog.KObj(pdb)) + dc.enqueuePdb(logger, pdb) } -func (dc *DisruptionController) addPod(obj interface{}) { +func (dc *DisruptionController) addPod(logger klog.Logger, obj interface{}) { pod := obj.(*v1.Pod) - klog.V(4).Infof("addPod called on pod %q", pod.Name) - pdb := dc.getPdbForPod(pod) + logger.V(4).Info("AddPod called on pod", "pod", klog.KObj(pod)) + pdb := dc.getPdbForPod(logger, pod) if pdb == nil { - klog.V(4).Infof("No matching pdb for pod %q", pod.Name) + logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod)) } else { - klog.V(4).Infof("addPod %q -> PDB %q", pod.Name, pdb.Name) - dc.enqueuePdb(pdb) + logger.V(4).Info("addPod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb)) + dc.enqueuePdb(logger, pdb) } if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has { - dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) + dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter) } } -func (dc *DisruptionController) updatePod(_, cur interface{}) { +func (dc *DisruptionController) updatePod(logger klog.Logger, _, cur interface{}) { pod := cur.(*v1.Pod) - klog.V(4).Infof("updatePod called on pod %q", pod.Name) - pdb := dc.getPdbForPod(pod) + logger.V(4).Info("UpdatePod called on pod", "pod", klog.KObj(pod)) + pdb := dc.getPdbForPod(logger, pod) if pdb == nil { - klog.V(4).Infof("No matching pdb for pod %q", pod.Name) + logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod)) } else { - klog.V(4).Infof("updatePod %q -> PDB %q", pod.Name, pdb.Name) - dc.enqueuePdb(pdb) + logger.V(4).Info("updatePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb)) + dc.enqueuePdb(logger, pdb) } if has, cleanAfter := dc.nonTerminatingPodHasStaleDisruptionCondition(pod); has { - dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) + dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter) } } -func (dc *DisruptionController) deletePod(obj interface{}) { +func (dc *DisruptionController) deletePod(logger klog.Logger, obj interface{}) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains @@ -516,66 +530,66 @@ func (dc *DisruptionController) deletePod(obj interface{}) { if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) if !ok { - klog.Errorf("Couldn't get object from tombstone %+v", obj) + logger.Error(nil, "Couldn't get object from tombstone", "obj", obj) return } pod, ok = tombstone.Obj.(*v1.Pod) if !ok { - klog.Errorf("Tombstone contained object that is not a pod %+v", obj) + logger.Error(nil, "Tombstone contained object that is not a pod", "obj", obj) return } } - klog.V(4).Infof("deletePod called on pod %q", pod.Name) - pdb := dc.getPdbForPod(pod) + logger.V(4).Info("DeletePod called on pod", "pod", klog.KObj(pod)) + pdb := dc.getPdbForPod(logger, pod) if pdb == nil { - klog.V(4).Infof("No matching pdb for pod %q", pod.Name) + logger.V(4).Info("No matching PDB for pod", "pod", klog.KObj(pod)) return } - klog.V(4).Infof("deletePod %q -> PDB %q", pod.Name, pdb.Name) - dc.enqueuePdb(pdb) + logger.V(4).Info("DeletePod -> PDB", "pod", klog.KObj(pod), "podDisruptionBudget", klog.KObj(pdb)) + dc.enqueuePdb(logger, pdb) } -func (dc *DisruptionController) enqueuePdb(pdb *policy.PodDisruptionBudget) { +func (dc *DisruptionController) enqueuePdb(logger klog.Logger, pdb *policy.PodDisruptionBudget) { key, err := controller.KeyFunc(pdb) if err != nil { - klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err) + logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb)) return } dc.queue.Add(key) } -func (dc *DisruptionController) enqueuePdbForRecheck(pdb *policy.PodDisruptionBudget, delay time.Duration) { +func (dc *DisruptionController) enqueuePdbForRecheck(logger klog.Logger, pdb *policy.PodDisruptionBudget, delay time.Duration) { key, err := controller.KeyFunc(pdb) if err != nil { - klog.Errorf("Couldn't get key for PodDisruptionBudget object %+v: %v", pdb, err) + logger.Error(err, "Couldn't get key for PodDisruptionBudget", "podDisruptionBudget", klog.KObj(pdb)) return } dc.recheckQueue.AddAfter(key, delay) } -func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(pod *v1.Pod, d time.Duration) { +func (dc *DisruptionController) enqueueStalePodDisruptionCleanup(logger klog.Logger, pod *v1.Pod, d time.Duration) { key, err := controller.KeyFunc(pod) if err != nil { - klog.ErrorS(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod)) + logger.Error(err, "Couldn't get key for Pod object", "pod", klog.KObj(pod)) return } dc.stalePodDisruptionQueue.AddAfter(key, d) - klog.V(4).InfoS("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod)) + logger.V(4).Info("Enqueued pod to cleanup stale DisruptionTarget condition", "pod", klog.KObj(pod)) } -func (dc *DisruptionController) getPdbForPod(pod *v1.Pod) *policy.PodDisruptionBudget { +func (dc *DisruptionController) getPdbForPod(logger klog.Logger, pod *v1.Pod) *policy.PodDisruptionBudget { // GetPodPodDisruptionBudgets returns an error only if no // PodDisruptionBudgets are found. We don't return that as an error to the // caller. pdbs, err := dc.pdbLister.GetPodPodDisruptionBudgets(pod) if err != nil { - klog.V(4).Infof("No PodDisruptionBudgets found for pod %v, PodDisruptionBudget controller will avoid syncing.", pod.Name) + logger.V(4).Info("No PodDisruptionBudgets found for pod, PodDisruptionBudget controller will avoid syncing.", "pod", klog.KObj(pod)) return nil } if len(pdbs) > 1 { msg := fmt.Sprintf("Pod %q/%q matches multiple PodDisruptionBudgets. Chose %q arbitrarily.", pod.Namespace, pod.Name, pdbs[0].Name) - klog.Warning(msg) + logger.Info(msg) dc.recorder.Event(pod, v1.EventTypeWarning, "MultiplePodDisruptionBudgets", msg) } return pdbs[0] @@ -656,9 +670,10 @@ func (dc *DisruptionController) processNextStalePodDisruptionWorkItem(ctx contex } func (dc *DisruptionController) sync(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) startTime := dc.clock.Now() defer func() { - klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, dc.clock.Since(startTime)) + logger.V(4).Info("Finished syncing PodDisruptionBudget", "key", key, "duration", dc.clock.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -667,7 +682,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error { } pdb, err := dc.pdbLister.PodDisruptionBudgets(namespace).Get(name) if errors.IsNotFound(err) { - klog.V(4).Infof("PodDisruptionBudget %q has been deleted", key) + logger.V(4).Info("podDisruptionBudget has been deleted", "key", key) return nil } if err != nil { @@ -681,7 +696,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error { return err } if err != nil { - klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) + logger.Error(err, "Failed to sync PDB", "podDisruptionBudget", klog.KRef(namespace, name)) return dc.failSafe(ctx, pdb, err) } @@ -689,6 +704,7 @@ func (dc *DisruptionController) sync(ctx context.Context, key string) error { } func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error { + logger := klog.FromContext(ctx) pods, err := dc.getPodsForPdb(pdb) if err != nil { dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err) @@ -705,7 +721,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr } // We have unmamanged pods, instead of erroring and hotlooping in disruption controller, log and continue. if len(unmanagedPods) > 0 { - klog.Warningf("found unmanaged pods associated with this PDB: %v", unmanagedPods) + logger.V(4).Info("Found unmanaged pods associated with this PDB", "pods", unmanagedPods) dc.recorder.Eventf(pdb, v1.EventTypeWarning, "UnmanagedPods", "Pods selected by this PodDisruptionBudget (selector: %v) were found "+ "to be unmanaged. As a result, the status of the PDB cannot be calculated correctly, which may result in undefined behavior. "+ "To account for these pods please set \".spec.minAvailable\" "+ @@ -713,7 +729,7 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr } currentTime := dc.clock.Now() - disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) + disruptedPods, recheckTime := dc.buildDisruptedPodMap(logger, pods, pdb, currentTime) currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) @@ -721,23 +737,24 @@ func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisr // There is always at most one PDB waiting with a particular name in the queue, // and each PDB in the queue is associated with the lowest timestamp // that was supplied when a PDB with that name was added. - dc.enqueuePdbForRecheck(pdb, recheckTime.Sub(currentTime)) + dc.enqueuePdbForRecheck(logger, pdb, recheckTime.Sub(currentTime)) } return err } func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key string) error { + logger := klog.FromContext(ctx) startTime := dc.clock.Now() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err } defer func() { - klog.V(4).InfoS("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime)) + logger.V(4).Info("Finished syncing Pod to clear DisruptionTarget condition", "pod", klog.KRef(namespace, name), "duration", dc.clock.Since(startTime)) }() pod, err := dc.podLister.Pods(namespace).Get(name) if errors.IsNotFound(err) { - klog.V(4).InfoS("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod)) + logger.V(4).Info("Skipping clearing DisruptionTarget condition because pod was deleted", "pod", klog.KObj(pod)) return nil } if err != nil { @@ -749,7 +766,7 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key return nil } if cleanAfter > 0 { - dc.enqueueStalePodDisruptionCleanup(pod, cleanAfter) + dc.enqueueStalePodDisruptionCleanup(logger, pod, cleanAfter) return nil } @@ -765,7 +782,7 @@ func (dc *DisruptionController) syncStalePodDisruption(ctx context.Context, key if _, err := dc.kubeClient.CoreV1().Pods(pod.Namespace).ApplyStatus(ctx, podApply, metav1.ApplyOptions{FieldManager: fieldManager, Force: true}); err != nil { return err } - klog.V(2).InfoS("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod)) + logger.V(2).Info("Reset stale DisruptionTarget condition to False", "pod", klog.KObj(pod)) return nil } @@ -895,7 +912,7 @@ func countHealthyPods(pods []*v1.Pod, disruptedPods map[string]metav1.Time, curr // Builds new PodDisruption map, possibly removing items that refer to non-existing, already deleted // or not-deleted at all items. Also returns an information when this check should be repeated. -func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) { +func (dc *DisruptionController) buildDisruptedPodMap(logger klog.Logger, pods []*v1.Pod, pdb *policy.PodDisruptionBudget, currentTime time.Time) (map[string]metav1.Time, *time.Time) { disruptedPods := pdb.Status.DisruptedPods result := make(map[string]metav1.Time) var recheckTime *time.Time @@ -915,8 +932,8 @@ func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy } expectedDeletion := disruptionTime.Time.Add(DeletionTimeout) if expectedDeletion.Before(currentTime) { - klog.V(1).Infof("Pod %s/%s was expected to be deleted at %s but it wasn't, updating pdb %s/%s", - pod.Namespace, pod.Name, disruptionTime.String(), pdb.Namespace, pdb.Name) + logger.V(1).Info("pod was expected to be deleted but it wasn't, updating PDB", + "pod", klog.KObj(pod), "deletionTime", disruptionTime, "podDisruptionBudget", klog.KObj(pdb)) dc.recorder.Eventf(pod, v1.EventTypeWarning, "NotDeleted", "Pod was expected by PDB %s/%s to be deleted but it wasn't", pdb.Namespace, pdb.Namespace) } else { diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index aedfb63e01a..3e9fb4e4fd3 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -18,9 +18,7 @@ package disruption import ( "context" - "flag" "fmt" - "os" "runtime/debug" "strings" "sync" @@ -51,9 +49,9 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" - "k8s.io/klog/v2" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/test/utils/ktesting" clocktesting "k8s.io/utils/clock/testing" "k8s.io/utils/pointer" ) @@ -149,8 +147,8 @@ var customGVK = schema.GroupVersionKind{ Kind: "customresource", } -func newFakeDisruptionController() (*disruptionController, *pdbStates) { - return newFakeDisruptionControllerWithTime(context.TODO(), time.Now()) +func newFakeDisruptionController(ctx context.Context) (*disruptionController, *pdbStates) { + return newFakeDisruptionControllerWithTime(ctx, time.Now()) } func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*disruptionController, *pdbStates) { @@ -168,6 +166,7 @@ func newFakeDisruptionControllerWithTime(ctx context.Context, now time.Time) (*d fakeClock := clocktesting.NewFakeClock(now) dc := NewDisruptionControllerInternal( + ctx, informerFactory.Core().V1().Pods(), informerFactory.Policy().V1().PodDisruptionBudgets(), informerFactory.Core().V1().ReplicationControllers(), @@ -424,14 +423,14 @@ func add(t *testing.T, store cache.Store, obj interface{}) { // Create one with no selector. Verify it matches all pods func TestNoSelector(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3)) pdb.Spec.Selector = &metav1.LabelSelector{} pod, _ := newPod(t, "yo-yo-yo") add(t, dc.pdbStore, pdb) - ctx := context.TODO() dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) @@ -443,10 +442,10 @@ func TestNoSelector(t *testing.T) { // Verify that available/expected counts go up as we add pods, then verify that // available count goes down when we make a pod unavailable. func TestUnavailable(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3)) - ctx := context.TODO() add(t, dc.pdbStore, pdb) dc.sync(ctx, pdbName) @@ -473,11 +472,11 @@ func TestUnavailable(t *testing.T) { // Verify that an integer MaxUnavailable won't // allow a disruption for pods with no controller. func TestIntegerMaxUnavailable(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(1)) add(t, dc.pdbStore, pdb) - ctx := context.TODO() dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) @@ -494,7 +493,8 @@ func TestIntegerMaxUnavailable(t *testing.T) { // Verify that an integer MaxUnavailable will recompute allowed disruptions when the scale of // the selected pod's controller is modified. func TestIntegerMaxUnavailableWithScaling(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt32(2)) add(t, dc.pdbStore, pdb) @@ -504,7 +504,6 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) { pod, _ := newPod(t, "pod") updatePodOwnerToRs(t, pod, rs) - ctx := context.TODO() add(t, dc.podStore, pod) dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 5, 7, map[string]metav1.Time{}) @@ -520,7 +519,8 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) { // Verify that an percentage MaxUnavailable will recompute allowed disruptions when the scale of // the selected pod's controller is modified. func TestPercentageMaxUnavailableWithScaling(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromString("30%")) add(t, dc.pdbStore, pdb) @@ -531,7 +531,6 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) { pod, _ := newPod(t, "pod") updatePodOwnerToRs(t, pod, rs) add(t, dc.podStore, pod) - ctx := context.TODO() dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 4, 7, map[string]metav1.Time{}) @@ -546,11 +545,11 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) { // Create a pod with no controller, and verify that a PDB with a percentage // specified won't allow a disruption. func TestNakedPod(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) add(t, dc.pdbStore, pdb) - ctx := context.TODO() dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) @@ -566,11 +565,11 @@ func TestNakedPod(t *testing.T) { // Create a pod with unsupported controller, and verify that a PDB with a percentage // specified won't allow a disruption. func TestUnsupportedControllerPod(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) add(t, dc.pdbStore, pdb) - ctx := context.TODO() dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) @@ -594,11 +593,11 @@ func TestUnsupportedControllerPod(t *testing.T) { // Verify that disruption controller is not erroring when unmanaged pods are found func TestStatusForUnmanagedPod(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) add(t, dc.pdbStore, pdb) - ctx := context.TODO() dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) @@ -612,11 +611,11 @@ func TestStatusForUnmanagedPod(t *testing.T) { // Check if the unmanaged pods are correctly collected or not func TestTotalUnmanagedPods(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) add(t, dc.pdbStore, pdb) - ctx := context.TODO() dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) @@ -636,14 +635,14 @@ func TestTotalUnmanagedPods(t *testing.T) { // Verify that we count the scale of a ReplicaSet even when it has no Deployment. func TestReplicaSet(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("20%")) add(t, dc.pdbStore, pdb) rs, _ := newReplicaSet(t, 10) add(t, dc.rsStore, rs) - ctx := context.TODO() pod, _ := newPod(t, "pod") updatePodOwnerToRs(t, pod, rs) add(t, dc.podStore, pod) @@ -657,7 +656,8 @@ func TestScaleResource(t *testing.T) { pods := int32(4) maxUnavailable := int32(5) - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) dc.scaleClient.AddReactor("get", "customresources", func(action core.Action) (handled bool, ret runtime.Object, err error) { obj := &autoscalingapi.Scale{ @@ -688,7 +688,6 @@ func TestScaleResource(t *testing.T) { }) add(t, dc.podStore, pod) } - ctx := context.TODO() dc.sync(ctx, pdbName) disruptionsAllowed := int32(0) if replicas-pods < maxUnavailable { @@ -750,7 +749,8 @@ func TestScaleFinderNoResource(t *testing.T) { t.Run(tn, func(t *testing.T) { customResourceUID := uuid.NewUUID() - dc, _ := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, _ := newFakeDisruptionController(ctx) dc.scaleClient.AddReactor("get", resourceName, func(action core.Action) (handled bool, ret runtime.Object, err error) { gr := schema.GroupResource{ @@ -774,7 +774,7 @@ func TestScaleFinderNoResource(t *testing.T) { UID: customResourceUID, } - _, err := dc.getScaleController(context.TODO(), ownerRef, "default") + _, err := dc.getScaleController(ctx, ownerRef, "default") if tc.expectError && err == nil { t.Error("expected error, but didn't get one") @@ -790,8 +790,8 @@ func TestScaleFinderNoResource(t *testing.T) { // Verify that multiple controllers doesn't allow the PDB to be set true. func TestMultipleControllers(t *testing.T) { const podCount = 2 - - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("1%")) add(t, dc.pdbStore, pdb) @@ -802,7 +802,6 @@ func TestMultipleControllers(t *testing.T) { pods = append(pods, pod) add(t, dc.podStore, pod) } - ctx := context.TODO() dc.sync(ctx, pdbName) // No controllers yet => no disruption allowed @@ -840,8 +839,8 @@ func TestReplicationController(t *testing.T) { "foo": "bar", "baz": "quux", } - - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) // 34% should round up to 2 pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%")) @@ -849,7 +848,6 @@ func TestReplicationController(t *testing.T) { rc, _ := newReplicationController(t, 3) rc.Spec.Selector = labels add(t, dc.rcStore, rc) - ctx := context.TODO() dc.sync(ctx, pdbName) // It starts out at 0 expected because, with no pods, the PDB doesn't know @@ -881,14 +879,14 @@ func TestStatefulSetController(t *testing.T) { "baz": "quux", } - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) // 34% should round up to 2 pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("34%")) add(t, dc.pdbStore, pdb) ss, _ := newStatefulSet(t, 3) add(t, dc.ssStore, ss) - ctx := context.TODO() dc.sync(ctx, pdbName) // It starts out at 0 expected because, with no pods, the PDB doesn't know @@ -920,7 +918,8 @@ func TestTwoControllers(t *testing.T) { "foo": "bar", "baz": "quuux", } - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) // These constants are related, but I avoid calculating the correct values in // code. If you update a parameter here, recalculate the correct values for @@ -935,7 +934,6 @@ func TestTwoControllers(t *testing.T) { rc, _ := newReplicationController(t, collectionSize) rc.Spec.Selector = rcLabels add(t, dc.rcStore, rc) - ctx := context.TODO() dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) @@ -1019,16 +1017,18 @@ func TestTwoControllers(t *testing.T) { // Test pdb doesn't exist func TestPDBNotExist(t *testing.T) { - dc, _ := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, _ := newFakeDisruptionController(ctx) pdb, _ := newMinAvailablePodDisruptionBudget(t, intstr.FromString("67%")) add(t, dc.pdbStore, pdb) - if err := dc.sync(context.TODO(), "notExist"); err != nil { + if err := dc.sync(ctx, "notExist"); err != nil { t.Errorf("Unexpected error: %v, expect nil", err) } } func TestUpdateDisruptedPods(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, ps := newFakeDisruptionController(ctx) dc.recheckQueue = workqueue.NewNamedDelayingQueue("pdb_queue") pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1)) currentTime := dc.clock.Now() @@ -1049,13 +1049,14 @@ func TestUpdateDisruptedPods(t *testing.T) { add(t, dc.podStore, pod2) add(t, dc.podStore, pod3) - dc.sync(context.TODO(), pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime.Add(-time.Minute)}}) } func TestBasicFinderFunctions(t *testing.T) { - dc, _ := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, _ := newFakeDisruptionController(ctx) rs, _ := newReplicaSet(t, 10) add(t, dc.rsStore, rs) @@ -1135,7 +1136,7 @@ func TestBasicFinderFunctions(t *testing.T) { UID: tc.uid, } - controllerAndScale, _ := tc.finderFunc(context.TODO(), controllerRef, metav1.NamespaceDefault) + controllerAndScale, _ := tc.finderFunc(ctx, controllerRef, metav1.NamespaceDefault) if controllerAndScale == nil { if tc.findsScale { @@ -1208,7 +1209,8 @@ func TestDeploymentFinderFunction(t *testing.T) { for tn, tc := range testCases { t.Run(tn, func(t *testing.T) { - dc, _ := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, _ := newFakeDisruptionController(ctx) dep, _ := newDeployment(t, 10) dep.Spec.Selector = newSel(labels) @@ -1233,7 +1235,7 @@ func TestDeploymentFinderFunction(t *testing.T) { UID: rs.UID, } - controllerAndScale, _ := dc.getPodDeployment(context.TODO(), controllerRef, metav1.NamespaceDefault) + controllerAndScale, _ := dc.getPodDeployment(ctx, controllerRef, metav1.NamespaceDefault) if controllerAndScale == nil { if tc.findsScale { @@ -1267,10 +1269,10 @@ func TestDeploymentFinderFunction(t *testing.T) { // (C) If the DisruptionController writes DisruptionsAllowed=1 despite the // resource conflict error, then there is a bug. func TestUpdatePDBStatusRetries(t *testing.T) { - dc, _ := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + dc, _ := newFakeDisruptionController(ctx) // Inject the production code over our fake impl dc.getUpdater = func() updater { return dc.writePdbStatus } - ctx := context.TODO() // Create a PDB and 3 pods that match it. pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(1)) pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(ctx, pdb, metav1.CreateOptions{}) @@ -1380,8 +1382,6 @@ func TestUpdatePDBStatusRetries(t *testing.T) { } func TestInvalidSelectors(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() testCases := map[string]struct { labelSelector *metav1.LabelSelector }{ @@ -1407,7 +1407,9 @@ func TestInvalidSelectors(t *testing.T) { for tn, tc := range testCases { t.Run(tn, func(t *testing.T) { - dc, ps := newFakeDisruptionController() + _, ctx := ktesting.NewTestContext(t) + + dc, ps := newFakeDisruptionController(ctx) pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt32(3)) pdb.Spec.Selector = tc.labelSelector @@ -1536,7 +1538,8 @@ func TestStalePodDisruption(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() dc, _ := newFakeDisruptionControllerWithTime(ctx, now) go dc.Run(ctx) @@ -1584,9 +1587,3 @@ func verifyEventEmitted(t *testing.T, dc *disruptionController, expectedEvent st } } } - -// TestMain adds klog flags to make debugging tests easier. -func TestMain(m *testing.M) { - klog.InitFlags(flag.CommandLine) - os.Exit(m.Run()) -} diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go index 8d2b47b3c2d..04c9d65a320 100644 --- a/test/integration/disruption/disruption_test.go +++ b/test/integration/disruption/disruption_test.go @@ -50,13 +50,14 @@ import ( "k8s.io/kubernetes/test/integration/etcd" "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/util" + "k8s.io/kubernetes/test/utils/ktesting" "k8s.io/utils/clock" "k8s.io/utils/pointer" ) const stalePodDisruptionTimeout = 3 * time.Second -func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) { +func setup(ctx context.Context, t *testing.T) (*kubeapiservertesting.TestServer, *disruption.DisruptionController, informers.SharedInformerFactory, clientset.Interface, *apiextensionsclientset.Clientset, dynamic.Interface) { server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins", "ServiceAccount"}, framework.SharedEtcd()) clientSet, err := clientset.NewForConfig(server.ClientConfig) @@ -88,6 +89,7 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti } pdbc := disruption.NewDisruptionControllerInternal( + ctx, informers.Core().V1().Pods(), informers.Policy().V1().PodDisruptionBudgets(), informers.Core().V1().ReplicationControllers(), @@ -105,11 +107,13 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti } func TestPDBWithScaleSubresource(t *testing.T) { - s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t) - defer s.TearDownFn() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) - ctx, cancel := context.WithCancel(context.Background()) + s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(ctx, t) + defer s.TearDownFn() defer cancel() + nsName := "pdb-scale-subresource" createNs(ctx, t, nsName, clientSet) @@ -238,10 +242,11 @@ func TestEmptySelector(t *testing.T) { for i, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - s, pdbc, informers, clientSet, _, _ := setup(t) - defer s.TearDownFn() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) - ctx, cancel := context.WithCancel(context.Background()) + s, pdbc, informers, clientSet, _, _ := setup(ctx, t) + defer s.TearDownFn() defer cancel() nsName := fmt.Sprintf("pdb-empty-selector-%d", i) @@ -354,10 +359,11 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { for i, tc := range testcases { t.Run(tc.name, func(t *testing.T) { - s, pdbc, informers, clientSet, _, _ := setup(t) - defer s.TearDownFn() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) - ctx, cancel := context.WithCancel(context.Background()) + s, pdbc, informers, clientSet, _, _ := setup(ctx, t) + defer s.TearDownFn() defer cancel() nsName := fmt.Sprintf("pdb-selectors-%d", i) @@ -508,12 +514,12 @@ func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podN } func TestPatchCompatibility(t *testing.T) { - s, pdbc, _, clientSet, _, _ := setup(t) - defer s.TearDownFn() + ctx, cancel := context.WithCancel(context.Background()) + s, pdbc, _, clientSet, _, _ := setup(ctx, t) + defer s.TearDownFn() // Even though pdbc isn't used in this test, its creation is already // spawning some goroutines. So we need to run it to ensure they won't leak. - ctx, cancel := context.WithCancel(context.Background()) cancel() pdbc.Run(ctx) @@ -653,10 +659,11 @@ func TestPatchCompatibility(t *testing.T) { } func TestStalePodDisruption(t *testing.T) { - s, pdbc, informers, clientSet, _, _ := setup(t) - defer s.TearDownFn() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) - ctx, cancel := context.WithCancel(context.Background()) + s, pdbc, informers, clientSet, _, _ := setup(ctx, t) + defer s.TearDownFn() defer cancel() nsName := "pdb-stale-pod-disruption" diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index 6c9e70e02e3..32f285651d3 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -57,6 +57,7 @@ import ( "k8s.io/kubernetes/pkg/controller/disruption" "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/test/integration/framework" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -68,14 +69,16 @@ const ( func TestConcurrentEvictionRequests(t *testing.T) { podNameFormat := "test-pod-%d" - closeFn, rm, informers, _, clientSet := rmSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + + closeFn, rm, informers, _, clientSet := rmSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientSet, "concurrent-eviction-requests", t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t) - - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + informers.Start(ctx.Done()) go rm.Run(ctx) @@ -181,14 +184,16 @@ func TestConcurrentEvictionRequests(t *testing.T) { // TestTerminalPodEviction ensures that PDB is not checked for terminal pods. func TestTerminalPodEviction(t *testing.T) { - closeFn, rm, informers, _, clientSet := rmSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + + closeFn, rm, informers, _, clientSet := rmSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientSet, "terminalpod-eviction", t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t) - - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + informers.Start(ctx.Done()) go rm.Run(ctx) @@ -255,11 +260,13 @@ func TestTerminalPodEviction(t *testing.T) { // TestEvictionVersions ensures the eviction endpoint accepts and returns the correct API versions func TestEvictionVersions(t *testing.T) { - closeFn, rm, informers, config, clientSet := rmSetup(t) - defer closeFn() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) - ctx, cancel := context.WithCancel(context.Background()) + closeFn, rm, informers, config, clientSet := rmSetup(ctx, t) + defer closeFn() defer cancel() + informers.Start(ctx.Done()) go rm.Run(ctx) @@ -379,15 +386,17 @@ func TestEvictionWithFinalizers(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - closeFn, rm, informers, _, clientSet := rmSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + + closeFn, rm, informers, _, clientSet := rmSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-finalizers", t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t) defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PodDisruptionConditions, tc.enablePodDisruptionConditions)() - - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + informers.Start(ctx.Done()) go rm.Run(ctx) @@ -465,15 +474,17 @@ func TestEvictionWithUnhealthyPodEvictionPolicy(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.PDBUnhealthyPodEvictionPolicy, tc.enableUnhealthyPodEvictionPolicy)() - closeFn, rm, informers, _, clientSet := rmSetup(t) + closeFn, rm, informers, _, clientSet := rmSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-pdb-pod-healthy-policy", t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t) - - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + informers.Start(ctx.Done()) go rm.Run(ctx) @@ -559,13 +570,15 @@ func TestEvictionWithPrecondition(t *testing.T) { } for name, tc := range cases { t.Run(name, func(t *testing.T) { - closeFn, rm, informers, _, clientSet := rmSetup(t) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + + closeFn, rm, informers, _, clientSet := rmSetup(ctx, t) defer closeFn() ns := framework.CreateNamespaceOrDie(clientSet, "eviction-with-preconditions", t) defer framework.DeleteNamespaceOrDie(clientSet, ns, t) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() informers.Start(ctx.Done()) go rm.Run(ctx) @@ -685,7 +698,7 @@ func newV1Eviction(ns, evictionName string, deleteOption metav1.DeleteOptions) * } } -func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.DisruptionController, informers.SharedInformerFactory, *restclient.Config, clientset.Interface) { +func rmSetup(ctx context.Context, t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.DisruptionController, informers.SharedInformerFactory, *restclient.Config, clientset.Interface) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd()) @@ -709,6 +722,7 @@ func rmSetup(t *testing.T) (kubeapiservertesting.TearDownFunc, *disruption.Disru } rm := disruption.NewDisruptionController( + ctx, informers.Core().V1().Pods(), informers.Policy().V1().PodDisruptionBudgets(), informers.Core().V1().ReplicationControllers(), diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 548bb66e663..94df419ae53 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -605,6 +605,7 @@ func PodScheduled(c clientset.Interface, podNamespace, podName string) wait.Cond // InitDisruptionController initializes and runs a Disruption Controller to properly // update PodDisuptionBudget objects. func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.DisruptionController { + _, ctx := ktesting.NewTestContext(t) informers := informers.NewSharedInformerFactory(testCtx.ClientSet, 12*time.Hour) discoveryClient := cacheddiscovery.NewMemCacheClient(testCtx.ClientSet.Discovery()) @@ -618,6 +619,7 @@ func InitDisruptionController(t *testing.T, testCtx *TestContext) *disruption.Di } dc := disruption.NewDisruptionController( + ctx, informers.Core().V1().Pods(), informers.Policy().V1().PodDisruptionBudgets(), informers.Core().V1().ReplicationControllers(),