diff --git a/cmd/kube-controller-manager/app/policy.go b/cmd/kube-controller-manager/app/policy.go index 59d379ac333..51512830ad8 100644 --- a/cmd/kube-controller-manager/app/policy.go +++ b/cmd/kube-controller-manager/app/policy.go @@ -58,6 +58,6 @@ func startDisruptionController(ctx context.Context, controllerContext Controller controllerContext.RESTMapper, scaleClient, client.Discovery(), - ).Run(ctx.Done()) + ).Run(ctx) return nil, true, nil } diff --git a/pkg/controller/disruption/disruption.go b/pkg/controller/disruption/disruption.go index 8e2b6c99d8b..3c8fcbab770 100644 --- a/pkg/controller/disruption/disruption.go +++ b/pkg/controller/disruption/disruption.go @@ -68,7 +68,7 @@ const ( DeletionTimeout = 2 * 60 * time.Second ) -type updater func(*policy.PodDisruptionBudget) error +type updater func(context.Context, *policy.PodDisruptionBudget) error type DisruptionController struct { kubeClient clientset.Interface @@ -114,7 +114,7 @@ type controllerAndScale struct { // podControllerFinder is a function type that maps a pod to a list of // controllers and their scale. -type podControllerFinder func(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) +type podControllerFinder func(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) func NewDisruptionController( podInformer coreinformers.PodInformer, @@ -192,7 +192,7 @@ var ( ) // getPodReplicaSet finds a replicaset which has no matching deployments. -func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { +func (dc *DisruptionController) getPodReplicaSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) if !ok || err != nil { return nil, err @@ -214,7 +214,7 @@ func (dc *DisruptionController) getPodReplicaSet(controllerRef *metav1.OwnerRefe } // getPodStatefulSet returns the statefulset referenced by the provided controllerRef. -func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { +func (dc *DisruptionController) getPodStatefulSet(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { ok, err := verifyGroupKind(controllerRef, controllerKindSS.Kind, []string{"apps"}) if !ok || err != nil { return nil, err @@ -232,7 +232,7 @@ func (dc *DisruptionController) getPodStatefulSet(controllerRef *metav1.OwnerRef } // getPodDeployments finds deployments for any replicasets which are being managed by deployments. -func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { +func (dc *DisruptionController) getPodDeployment(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { ok, err := verifyGroupKind(controllerRef, controllerKindRS.Kind, []string{"apps", "extensions"}) if !ok || err != nil { return nil, err @@ -265,7 +265,7 @@ func (dc *DisruptionController) getPodDeployment(controllerRef *metav1.OwnerRefe return &controllerAndScale{deployment.UID, *(deployment.Spec.Replicas)}, nil } -func (dc *DisruptionController) getPodReplicationController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { +func (dc *DisruptionController) getPodReplicationController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { ok, err := verifyGroupKind(controllerRef, controllerKindRC.Kind, []string{""}) if !ok || err != nil { return nil, err @@ -281,7 +281,7 @@ func (dc *DisruptionController) getPodReplicationController(controllerRef *metav return &controllerAndScale{rc.UID, *(rc.Spec.Replicas)}, nil } -func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { +func (dc *DisruptionController) getScaleController(ctx context.Context, controllerRef *metav1.OwnerReference, namespace string) (*controllerAndScale, error) { gv, err := schema.ParseGroupVersion(controllerRef.APIVersion) if err != nil { return nil, err @@ -298,7 +298,7 @@ func (dc *DisruptionController) getScaleController(controllerRef *metav1.OwnerRe } gr := mapping.Resource.GroupResource() - scale, err := dc.scaleNamespacer.Scales(namespace).Get(context.TODO(), gr, controllerRef.Name, metav1.GetOptions{}) + scale, err := dc.scaleNamespacer.Scales(namespace).Get(ctx, gr, controllerRef.Name, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { // The IsNotFound error can mean either that the resource does not exist, @@ -356,14 +356,14 @@ func verifyGroupKind(controllerRef *metav1.OwnerReference, expectedKind string, return false, nil } -func (dc *DisruptionController) Run(stopCh <-chan struct{}) { +func (dc *DisruptionController) Run(ctx context.Context) { defer utilruntime.HandleCrash() defer dc.queue.ShutDown() klog.Infof("Starting disruption controller") defer klog.Infof("Shutting down disruption controller") - if !cache.WaitForNamedCacheSync("disruption", stopCh, dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { + if !cache.WaitForNamedCacheSync("disruption", ctx.Done(), dc.podListerSynced, dc.pdbListerSynced, dc.rcListerSynced, dc.rsListerSynced, dc.dListerSynced, dc.ssListerSynced) { return } @@ -373,10 +373,10 @@ func (dc *DisruptionController) Run(stopCh <-chan struct{}) { } else { klog.Infof("No api server defined - no events will be sent to API server.") } - go wait.Until(dc.worker, time.Second, stopCh) - go wait.Until(dc.recheckWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, dc.worker, time.Second) + go wait.Until(dc.recheckWorker, time.Second, ctx.Done()) - <-stopCh + <-ctx.Done() } func (dc *DisruptionController) addDb(obj interface{}) { @@ -513,19 +513,19 @@ func (dc *DisruptionController) getPodsForPdb(pdb *policy.PodDisruptionBudget) ( return pods, nil } -func (dc *DisruptionController) worker() { - for dc.processNextWorkItem() { +func (dc *DisruptionController) worker(ctx context.Context) { + for dc.processNextWorkItem(ctx) { } } -func (dc *DisruptionController) processNextWorkItem() bool { +func (dc *DisruptionController) processNextWorkItem(ctx context.Context) bool { dKey, quit := dc.queue.Get() if quit { return false } defer dc.queue.Done(dKey) - err := dc.sync(dKey.(string)) + err := dc.sync(ctx, dKey.(string)) if err == nil { dc.queue.Forget(dKey) return true @@ -552,7 +552,7 @@ func (dc *DisruptionController) processNextRecheckWorkItem() bool { return true } -func (dc *DisruptionController) sync(key string) error { +func (dc *DisruptionController) sync(ctx context.Context, key string) error { startTime := time.Now() defer func() { klog.V(4).Infof("Finished syncing PodDisruptionBudget %q (%v)", key, time.Since(startTime)) @@ -571,7 +571,7 @@ func (dc *DisruptionController) sync(key string) error { return err } - err = dc.trySync(pdb) + err = dc.trySync(ctx, pdb) // If the reason for failure was a conflict, then allow this PDB update to be // requeued without triggering the failSafe logic. if errors.IsConflict(err) { @@ -579,13 +579,13 @@ func (dc *DisruptionController) sync(key string) error { } if err != nil { klog.Errorf("Failed to sync pdb %s/%s: %v", pdb.Namespace, pdb.Name, err) - return dc.failSafe(pdb, err) + return dc.failSafe(ctx, pdb, err) } return nil } -func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { +func (dc *DisruptionController) trySync(ctx context.Context, pdb *policy.PodDisruptionBudget) error { pods, err := dc.getPodsForPdb(pdb) if err != nil { dc.recorder.Eventf(pdb, v1.EventTypeWarning, "NoPods", "Failed to get pods: %v", err) @@ -595,7 +595,7 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { dc.recorder.Eventf(pdb, v1.EventTypeNormal, "NoPods", "No matching pods found") } - expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(pdb, pods) + expectedCount, desiredHealthy, unmanagedPods, err := dc.getExpectedPodCount(ctx, pdb, pods) if err != nil { dc.recorder.Eventf(pdb, v1.EventTypeWarning, "CalculateExpectedPodCountFailed", "Failed to calculate the number of expected pods: %v", err) return err @@ -609,7 +609,7 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { currentTime := time.Now() disruptedPods, recheckTime := dc.buildDisruptedPodMap(pods, pdb, currentTime) currentHealthy := countHealthyPods(pods, disruptedPods, currentTime) - err = dc.updatePdbStatus(pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) + err = dc.updatePdbStatus(ctx, pdb, currentHealthy, desiredHealthy, expectedCount, disruptedPods) if err == nil && recheckTime != nil { // There is always at most one PDB waiting with a particular name in the queue, @@ -620,7 +620,7 @@ func (dc *DisruptionController) trySync(pdb *policy.PodDisruptionBudget) error { return err } -func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) { +func (dc *DisruptionController) getExpectedPodCount(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount, desiredHealthy int32, unmanagedPods []string, err error) { err = nil // TODO(davidopp): consider making the way expectedCount and rules about // permitted controller configurations (specifically, considering it an error @@ -628,7 +628,7 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud // handled the same way for integer and percentage minAvailable if pdb.Spec.MaxUnavailable != nil { - expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods) + expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods) if err != nil { return } @@ -646,7 +646,7 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud desiredHealthy = pdb.Spec.MinAvailable.IntVal expectedCount = int32(len(pods)) } else if pdb.Spec.MinAvailable.Type == intstr.String { - expectedCount, unmanagedPods, err = dc.getExpectedScale(pdb, pods) + expectedCount, unmanagedPods, err = dc.getExpectedScale(ctx, pdb, pods) if err != nil { return } @@ -662,7 +662,7 @@ func (dc *DisruptionController) getExpectedPodCount(pdb *policy.PodDisruptionBud return } -func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) { +func (dc *DisruptionController) getExpectedScale(ctx context.Context, pdb *policy.PodDisruptionBudget, pods []*v1.Pod) (expectedCount int32, unmanagedPods []string, err error) { // When the user specifies a fraction of pods that must be available, we // use as the fraction's denominator // SUM_{all c in C} scale(c) @@ -701,7 +701,7 @@ func (dc *DisruptionController) getExpectedScale(pdb *policy.PodDisruptionBudget foundController := false for _, finder := range dc.finders() { var controllerNScale *controllerAndScale - controllerNScale, err = finder(controllerRef, pod.Namespace) + controllerNScale, err = finder(ctx, controllerRef, pod.Namespace) if err != nil { return } @@ -785,7 +785,7 @@ func (dc *DisruptionController) buildDisruptedPodMap(pods []*v1.Pod, pdb *policy // implement the "fail open" part of the design since if we manage to update // this field correctly, we will prevent the /evict handler from approving an // eviction when it may be unsafe to do so. -func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget, err error) error { +func (dc *DisruptionController) failSafe(ctx context.Context, pdb *policy.PodDisruptionBudget, err error) error { newPdb := pdb.DeepCopy() newPdb.Status.DisruptionsAllowed = 0 @@ -800,10 +800,10 @@ func (dc *DisruptionController) failSafe(pdb *policy.PodDisruptionBudget, err er ObservedGeneration: newPdb.Status.ObservedGeneration, }) - return dc.getUpdater()(newPdb) + return dc.getUpdater()(ctx, newPdb) } -func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32, +func (dc *DisruptionController) updatePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget, currentHealthy, desiredHealthy, expectedCount int32, disruptedPods map[string]metav1.Time) error { // We require expectedCount to be > 0 so that PDBs which currently match no @@ -837,12 +837,12 @@ func (dc *DisruptionController) updatePdbStatus(pdb *policy.PodDisruptionBudget, pdbhelper.UpdateDisruptionAllowedCondition(newPdb) - return dc.getUpdater()(newPdb) + return dc.getUpdater()(ctx, newPdb) } -func (dc *DisruptionController) writePdbStatus(pdb *policy.PodDisruptionBudget) error { +func (dc *DisruptionController) writePdbStatus(ctx context.Context, pdb *policy.PodDisruptionBudget) error { // If this update fails, don't retry it. Allow the failure to get handled & // retried in `processNextWorkItem()`. - _, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(context.TODO(), pdb, metav1.UpdateOptions{}) + _, err := dc.kubeClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).UpdateStatus(ctx, pdb, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/disruption/disruption_test.go b/pkg/controller/disruption/disruption_test.go index f29433c7b89..b242b35d761 100644 --- a/pkg/controller/disruption/disruption_test.go +++ b/pkg/controller/disruption/disruption_test.go @@ -59,7 +59,7 @@ type pdbStates map[string]policy.PodDisruptionBudget var alwaysReady = func() bool { return true } -func (ps *pdbStates) Set(pdb *policy.PodDisruptionBudget) error { +func (ps *pdbStates) Set(ctx context.Context, pdb *policy.PodDisruptionBudget) error { key, err := controller.KeyFunc(pdb) if err != nil { return err @@ -178,8 +178,8 @@ func newFakeDisruptionController() (*disruptionController, *pdbStates) { dc.rsListerSynced = alwaysReady dc.dListerSynced = alwaysReady dc.ssListerSynced = alwaysReady - - informerFactory.Start(context.TODO().Done()) + ctx := context.TODO() + informerFactory.Start(ctx.Done()) informerFactory.WaitForCacheSync(nil) return &disruptionController{ @@ -421,11 +421,12 @@ func TestNoSelector(t *testing.T) { pod, _ := newPod(t, "yo-yo-yo") add(t, dc.pdbStore, pdb) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 3, 0, map[string]metav1.Time{}) add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 3, 1, map[string]metav1.Time{}) } @@ -435,8 +436,9 @@ func TestUnavailable(t *testing.T) { dc, ps := newFakeDisruptionController() pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(3)) + ctx := context.TODO() add(t, dc.pdbStore, pdb) - dc.sync(pdbName) + dc.sync(ctx, pdbName) // Add three pods, verifying that the counts go up at each step. pods := []*v1.Pod{} @@ -445,14 +447,14 @@ func TestUnavailable(t *testing.T) { pod, _ := newPod(t, fmt.Sprintf("yo-yo-yo %d", i)) pods = append(pods, pod) add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) } ps.VerifyPdbStatus(t, pdbName, 1, 4, 3, 4, map[string]metav1.Time{}) // Now set one pod as unavailable pods[0].Status.Conditions = []v1.PodCondition{} update(t, dc.podStore, pods[0]) - dc.sync(pdbName) + dc.sync(ctx, pdbName) // Verify expected update ps.VerifyPdbStatus(t, pdbName, 0, 3, 3, 4, map[string]metav1.Time{}) @@ -465,13 +467,14 @@ func TestIntegerMaxUnavailable(t *testing.T) { pdb, pdbName := newMaxUnavailablePodDisruptionBudget(t, intstr.FromInt(1)) add(t, dc.pdbStore, pdb) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) pod, _ := newPod(t, "naked") add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyDisruptionAllowed(t, pdbName, 0) } @@ -489,15 +492,16 @@ func TestIntegerMaxUnavailableWithScaling(t *testing.T) { pod, _ := newPod(t, "pod") updatePodOwnerToRs(t, pod, rs) + ctx := context.TODO() add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 5, 7, map[string]metav1.Time{}) // Update scale of ReplicaSet and check PDB rs.Spec.Replicas = utilpointer.Int32Ptr(5) update(t, dc.rsStore, rs) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 3, 5, map[string]metav1.Time{}) } @@ -515,14 +519,15 @@ func TestPercentageMaxUnavailableWithScaling(t *testing.T) { pod, _ := newPod(t, "pod") updatePodOwnerToRs(t, pod, rs) add(t, dc.podStore, pod) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 4, 7, map[string]metav1.Time{}) // Update scale of ReplicaSet and check PDB rs.Spec.Replicas = utilpointer.Int32Ptr(3) update(t, dc.rsStore, rs) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 3, map[string]metav1.Time{}) } @@ -533,13 +538,14 @@ func TestNakedPod(t *testing.T) { pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) add(t, dc.pdbStore, pdb) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) pod, _ := newPod(t, "naked") add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyDisruptionAllowed(t, pdbName, 0) } @@ -550,13 +556,14 @@ func TestStatusForUnmanagedPod(t *testing.T) { pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) add(t, dc.pdbStore, pdb) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) pod, _ := newPod(t, "unmanaged") add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyNoStatusError(t, pdbName) @@ -568,16 +575,17 @@ func TestTotalUnmanagedPods(t *testing.T) { pdb, pdbName := newMinAvailablePodDisruptionBudget(t, intstr.FromString("28%")) add(t, dc.pdbStore, pdb) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) // This verifies that when a PDB has 0 pods, disruptions are not allowed. ps.VerifyDisruptionAllowed(t, pdbName, 0) pod, _ := newPod(t, "unmanaged") add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) var pods []*v1.Pod pods = append(pods, pod) - _, unmanagedPods, _ := dc.getExpectedScale(pdb, pods) + _, unmanagedPods, _ := dc.getExpectedScale(ctx, pdb, pods) if len(unmanagedPods) != 1 { t.Fatalf("expected one pod to be unmanaged pod but found %d", len(unmanagedPods)) } @@ -594,11 +602,11 @@ func TestReplicaSet(t *testing.T) { rs, _ := newReplicaSet(t, 10) add(t, dc.rsStore, rs) - + ctx := context.TODO() pod, _ := newPod(t, "pod") updatePodOwnerToRs(t, pod, rs) add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 2, 10, map[string]metav1.Time{}) } @@ -639,8 +647,8 @@ func TestScaleResource(t *testing.T) { }) add(t, dc.podStore, pod) } - - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) disruptionsAllowed := int32(0) if replicas-pods < maxUnavailable { disruptionsAllowed = maxUnavailable - (replicas - pods) @@ -706,7 +714,7 @@ func TestScaleFinderNoResource(t *testing.T) { UID: customResourceUID, } - _, err := dc.getScaleController(ownerRef, "default") + _, err := dc.getScaleController(context.TODO(), ownerRef, "default") if tc.expectError && err == nil { t.Error("expected error, but didn't get one") @@ -734,8 +742,8 @@ func TestMultipleControllers(t *testing.T) { pods = append(pods, pod) add(t, dc.podStore, pod) } - - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) // No controllers yet => no disruption allowed ps.VerifyDisruptionAllowed(t, pdbName, 0) @@ -746,7 +754,7 @@ func TestMultipleControllers(t *testing.T) { updatePodOwnerToRc(t, pods[i], rc) } add(t, dc.rcStore, rc) - dc.sync(pdbName) + dc.sync(ctx, pdbName) // One RC and 200%>1% healthy => disruption allowed ps.VerifyDisruptionAllowed(t, pdbName, 1) @@ -756,7 +764,7 @@ func TestMultipleControllers(t *testing.T) { updatePodOwnerToRc(t, pods[i], rc) } add(t, dc.rcStore, rc) - dc.sync(pdbName) + dc.sync(ctx, pdbName) // 100%>1% healthy BUT two RCs => no disruption allowed // TODO: Find out if this assert is still needed @@ -781,7 +789,8 @@ func TestReplicationController(t *testing.T) { rc, _ := newReplicationController(t, 3) rc.Spec.Selector = labels add(t, dc.rcStore, rc) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) // It starts out at 0 expected because, with no pods, the PDB doesn't know // about the RC. This is a known bug. TODO(mml): file issue @@ -792,7 +801,7 @@ func TestReplicationController(t *testing.T) { updatePodOwnerToRc(t, pod, rc) pod.Labels = labels add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) if i < 2 { ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) } else { @@ -802,7 +811,7 @@ func TestReplicationController(t *testing.T) { rogue, _ := newPod(t, "rogue") add(t, dc.podStore, rogue) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyDisruptionAllowed(t, pdbName, 2) } @@ -819,7 +828,8 @@ func TestStatefulSetController(t *testing.T) { add(t, dc.pdbStore, pdb) ss, _ := newStatefulSet(t, 3) add(t, dc.ssStore, ss) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) // It starts out at 0 expected because, with no pods, the PDB doesn't know // about the SS. This is a known bug. TODO(mml): file issue @@ -830,7 +840,7 @@ func TestStatefulSetController(t *testing.T) { updatePodOwnerToSs(t, pod, ss) pod.Labels = labels add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) if i < 2 { ps.VerifyPdbStatus(t, pdbName, 0, i+1, 2, 3, map[string]metav1.Time{}) } else { @@ -865,7 +875,8 @@ func TestTwoControllers(t *testing.T) { rc, _ := newReplicationController(t, collectionSize) rc.Spec.Selector = rcLabels add(t, dc.rcStore, rc) - dc.sync(pdbName) + ctx := context.TODO() + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) @@ -881,7 +892,7 @@ func TestTwoControllers(t *testing.T) { pod.Status.Conditions = []v1.PodCondition{} } add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) if i <= unavailablePods { ps.VerifyPdbStatus(t, pdbName, 0, 0, minimumOne, collectionSize, map[string]metav1.Time{}) } else if i-unavailablePods <= minimumOne { @@ -894,14 +905,14 @@ func TestTwoControllers(t *testing.T) { d, _ := newDeployment(t, collectionSize) d.Spec.Selector = newSel(dLabels) add(t, dc.dStore, d) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) rs, _ := newReplicaSet(t, collectionSize) rs.Spec.Selector = newSel(dLabels) rs.Labels = dLabels add(t, dc.rsStore, rs) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 1, minimumOne+1, minimumOne, collectionSize, map[string]metav1.Time{}) // By the end of this loop, the number of ready pods should be N+2 (hence minimumTwo+2). @@ -915,7 +926,7 @@ func TestTwoControllers(t *testing.T) { pod.Status.Conditions = []v1.PodCondition{} } add(t, dc.podStore, pod) - dc.sync(pdbName) + dc.sync(ctx, pdbName) if i <= unavailablePods { ps.VerifyPdbStatus(t, pdbName, 0, minimumOne+1, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) } else if i-unavailablePods <= minimumTwo-(minimumOne+1) { @@ -932,17 +943,17 @@ func TestTwoControllers(t *testing.T) { ps.VerifyPdbStatus(t, pdbName, 2, 2+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) pods[collectionSize-1].Status.Conditions = []v1.PodCondition{} update(t, dc.podStore, pods[collectionSize-1]) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) pods[collectionSize-2].Status.Conditions = []v1.PodCondition{} update(t, dc.podStore, pods[collectionSize-2]) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) pods[collectionSize-1].Status.Conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue}} update(t, dc.podStore, pods[collectionSize-1]) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 1, 1+minimumTwo, minimumTwo, 2*collectionSize, map[string]metav1.Time{}) } @@ -951,7 +962,7 @@ func TestPDBNotExist(t *testing.T) { dc, _ := newFakeDisruptionController() pdb, _ := newMinAvailablePodDisruptionBudget(t, intstr.FromString("67%")) add(t, dc.pdbStore, pdb) - if err := dc.sync("notExist"); err != nil { + if err := dc.sync(context.TODO(), "notExist"); err != nil { t.Errorf("Unexpected error: %v, expect nil", err) } } @@ -978,7 +989,7 @@ func TestUpdateDisruptedPods(t *testing.T) { add(t, dc.podStore, pod2) add(t, dc.podStore, pod3) - dc.sync(pdbName) + dc.sync(context.TODO(), pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 1, 1, 3, map[string]metav1.Time{"p3": {Time: currentTime}}) } @@ -1064,7 +1075,7 @@ func TestBasicFinderFunctions(t *testing.T) { UID: tc.uid, } - controllerAndScale, _ := tc.finderFunc(controllerRef, metav1.NamespaceDefault) + controllerAndScale, _ := tc.finderFunc(context.TODO(), controllerRef, metav1.NamespaceDefault) if controllerAndScale == nil { if tc.findsScale { @@ -1162,7 +1173,7 @@ func TestDeploymentFinderFunction(t *testing.T) { UID: rs.UID, } - controllerAndScale, _ := dc.getPodDeployment(controllerRef, metav1.NamespaceDefault) + controllerAndScale, _ := dc.getPodDeployment(context.TODO(), controllerRef, metav1.NamespaceDefault) if controllerAndScale == nil { if tc.findsScale { @@ -1199,17 +1210,17 @@ func TestUpdatePDBStatusRetries(t *testing.T) { dc, _ := newFakeDisruptionController() // Inject the production code over our fake impl dc.getUpdater = func() updater { return dc.writePdbStatus } - + ctx := context.TODO() // Create a PDB and 3 pods that match it. pdb, pdbKey := newMinAvailablePodDisruptionBudget(t, intstr.FromInt(1)) - pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(context.TODO(), pdb, metav1.CreateOptions{}) + pdb, err := dc.coreClient.PolicyV1().PodDisruptionBudgets(pdb.Namespace).Create(ctx, pdb, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create PDB: %v", err) } podNames := []string{"moe", "larry", "curly"} for _, name := range podNames { pod, _ := newPod(t, name) - _, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + _, err := dc.coreClient.CoreV1().Pods(pod.Namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { t.Fatalf("Failed to create pod: %v", err) } @@ -1228,7 +1239,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) { } // Sync DisruptionController once to update PDB status. - if err := dc.sync(pdbKey); err != nil { + if err := dc.sync(ctx, pdbKey); err != nil { t.Fatalf("Failed initial sync: %v", err) } @@ -1281,7 +1292,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) { }) // (A) Delete one pod - if err := dc.coreClient.CoreV1().Pods("default").Delete(context.TODO(), podNames[0], metav1.DeleteOptions{}); err != nil { + if err := dc.coreClient.CoreV1().Pods("default").Delete(ctx, podNames[0], metav1.DeleteOptions{}); err != nil { t.Fatal(err) } if err := waitForCacheCount(dc.podStore, len(podNames)-1); err != nil { @@ -1291,7 +1302,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) { // The sync() function should either write a correct status which takes the // evictions into account, or re-queue the PDB for another sync (by returning // an error) - if err := dc.sync(pdbKey); err != nil { + if err := dc.sync(ctx, pdbKey); err != nil { t.Logf("sync() returned with error: %v", err) } else { t.Logf("sync() returned with no error") @@ -1299,7 +1310,7 @@ func TestUpdatePDBStatusRetries(t *testing.T) { // (C) Whether or not sync() returned an error, the PDB status should reflect // the evictions that took place. - finalPDB, err := dc.coreClient.PolicyV1().PodDisruptionBudgets("default").Get(context.TODO(), pdb.Name, metav1.GetOptions{}) + finalPDB, err := dc.coreClient.PolicyV1().PodDisruptionBudgets("default").Get(ctx, pdb.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Failed to get PDB: %v", err) } @@ -1309,6 +1320,8 @@ func TestUpdatePDBStatusRetries(t *testing.T) { } func TestInvalidSelectors(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() testCases := map[string]struct { labelSelector *metav1.LabelSelector }{ @@ -1340,7 +1353,7 @@ func TestInvalidSelectors(t *testing.T) { pdb.Spec.Selector = tc.labelSelector add(t, dc.pdbStore, pdb) - dc.sync(pdbName) + dc.sync(ctx, pdbName) ps.VerifyPdbStatus(t, pdbName, 0, 0, 0, 0, map[string]metav1.Time{}) }) } diff --git a/test/integration/disruption/disruption_test.go b/test/integration/disruption/disruption_test.go index 0c91c2c7251..1229385cf4e 100644 --- a/test/integration/disruption/disruption_test.go +++ b/test/integration/disruption/disruption_test.go @@ -98,14 +98,12 @@ func setup(t *testing.T) (*kubeapiservertesting.TestServer, *disruption.Disrupti func TestPDBWithScaleSubresource(t *testing.T) { s, pdbc, informers, clientSet, apiExtensionClient, dynamicClient := setup(t) defer s.TearDownFn() - + ctx := context.TODO() nsName := "pdb-scale-subresource" - createNs(t, nsName, clientSet) + createNs(ctx, t, nsName, clientSet) - stopCh := make(chan struct{}) - informers.Start(stopCh) - go pdbc.Run(stopCh) - defer close(stopCh) + informers.Start(ctx.Done()) + go pdbc.Run(ctx) crdDefinition := newCustomResourceDefinition() etcd.CreateTestCRDs(t, apiExtensionClient, true, crdDefinition) @@ -128,7 +126,7 @@ func TestPDBWithScaleSubresource(t *testing.T) { }, }, } - createdResource, err := resourceClient.Create(context.TODO(), resource, metav1.CreateOptions{}) + createdResource, err := resourceClient.Create(ctx, resource, metav1.CreateOptions{}) if err != nil { t.Error(err) } @@ -144,7 +142,7 @@ func TestPDBWithScaleSubresource(t *testing.T) { }, } for i := 0; i < replicas; i++ { - createPod(t, fmt.Sprintf("pod-%d", i), nsName, map[string]string{"app": "test-crd"}, clientSet, ownerRefs) + createPod(ctx, t, fmt.Sprintf("pod-%d", i), nsName, map[string]string{"app": "test-crd"}, clientSet, ownerRefs) } waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4, v1.PodRunning) @@ -163,13 +161,13 @@ func TestPDBWithScaleSubresource(t *testing.T) { }, }, } - if _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}); err != nil { + if _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{}); err != nil { t.Errorf("Error creating PodDisruptionBudget: %v", err) } - waitPDBStable(t, clientSet, 4, nsName, pdb.Name) + waitPDBStable(ctx, t, clientSet, 4, nsName, pdb.Name) - newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(context.TODO(), pdb.Name, metav1.GetOptions{}) + newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(ctx, pdb.Name, metav1.GetOptions{}) if err != nil { t.Errorf("Error getting PodDisruptionBudget: %v", err) } @@ -186,6 +184,8 @@ func TestPDBWithScaleSubresource(t *testing.T) { } func TestEmptySelector(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() testcases := []struct { name string createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error @@ -203,7 +203,7 @@ func TestEmptySelector(t *testing.T) { Selector: &metav1.LabelSelector{}, }, } - _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) + _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{}) return err }, expectedCurrentHealthy: 0, @@ -220,7 +220,7 @@ func TestEmptySelector(t *testing.T) { Selector: &metav1.LabelSelector{}, }, } - _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) + _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{}) return err }, expectedCurrentHealthy: 4, @@ -233,18 +233,16 @@ func TestEmptySelector(t *testing.T) { defer s.TearDownFn() nsName := fmt.Sprintf("pdb-empty-selector-%d", i) - createNs(t, nsName, clientSet) + createNs(ctx, t, nsName, clientSet) - stopCh := make(chan struct{}) - informers.Start(stopCh) - go pdbc.Run(stopCh) - defer close(stopCh) + informers.Start(ctx.Done()) + go pdbc.Run(ctx) replicas := 4 minAvailable := intstr.FromInt(2) for j := 0; j < replicas; j++ { - createPod(t, fmt.Sprintf("pod-%d", j), nsName, map[string]string{"app": "test-crd"}, + createPod(ctx, t, fmt.Sprintf("pod-%d", j), nsName, map[string]string{"app": "test-crd"}, clientSet, []metav1.OwnerReference{}) } @@ -255,9 +253,9 @@ func TestEmptySelector(t *testing.T) { t.Errorf("Error creating PodDisruptionBudget: %v", err) } - waitPDBStable(t, clientSet, tc.expectedCurrentHealthy, nsName, pdbName) + waitPDBStable(ctx, t, clientSet, tc.expectedCurrentHealthy, nsName, pdbName) - newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(context.TODO(), pdbName, metav1.GetOptions{}) + newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(ctx, pdbName, metav1.GetOptions{}) if err != nil { t.Errorf("Error getting PodDisruptionBudget: %v", err) } @@ -270,6 +268,8 @@ func TestEmptySelector(t *testing.T) { } func TestSelectorsForPodsWithoutLabels(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() testcases := []struct { name string createPDBFunc func(clientSet clientset.Interface, name, nsName string, minAvailable intstr.IntOrString) error @@ -311,7 +311,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { }, }, } - _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) + _, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{}) return err }, expectedCurrentHealthy: 1, @@ -335,7 +335,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { }, }, } - _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(context.TODO(), pdb, metav1.CreateOptions{}) + _, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(nsName).Create(ctx, pdb, metav1.CreateOptions{}) return err }, expectedCurrentHealthy: 1, @@ -348,12 +348,10 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { defer s.TearDownFn() nsName := fmt.Sprintf("pdb-selectors-%d", i) - createNs(t, nsName, clientSet) + createNs(ctx, t, nsName, clientSet) - stopCh := make(chan struct{}) - informers.Start(stopCh) - go pdbc.Run(stopCh) - defer close(stopCh) + informers.Start(ctx.Done()) + go pdbc.Run(ctx) minAvailable := intstr.FromInt(1) @@ -362,16 +360,16 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { if err := tc.createPDBFunc(clientSet, pdbName, nsName, minAvailable); err != nil { t.Errorf("Error creating PodDisruptionBudget: %v", err) } - waitPDBStable(t, clientSet, 0, nsName, pdbName) + waitPDBStable(ctx, t, clientSet, 0, nsName, pdbName) // Create a pod and wait for it be reach the running phase. - createPod(t, "pod", nsName, map[string]string{}, clientSet, []metav1.OwnerReference{}) + createPod(ctx, t, "pod", nsName, map[string]string{}, clientSet, []metav1.OwnerReference{}) waitToObservePods(t, informers.Core().V1().Pods().Informer(), 1, v1.PodRunning) // Then verify that the added pod are picked up by the disruption controller. - waitPDBStable(t, clientSet, 1, nsName, pdbName) + waitPDBStable(ctx, t, clientSet, 1, nsName, pdbName) - newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(context.TODO(), pdbName, metav1.GetOptions{}) + newPdb, err := clientSet.PolicyV1().PodDisruptionBudgets(nsName).Get(ctx, pdbName, metav1.GetOptions{}) if err != nil { t.Errorf("Error getting PodDisruptionBudget: %v", err) } @@ -383,7 +381,7 @@ func TestSelectorsForPodsWithoutLabels(t *testing.T) { } } -func createPod(t *testing.T, name, namespace string, labels map[string]string, clientSet clientset.Interface, ownerRefs []metav1.OwnerReference) { +func createPod(ctx context.Context, t *testing.T, name, namespace string, labels map[string]string, clientSet clientset.Interface, ownerRefs []metav1.OwnerReference) { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, @@ -400,7 +398,7 @@ func createPod(t *testing.T, name, namespace string, labels map[string]string, c }, }, } - _, err := clientSet.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + _, err := clientSet.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { t.Error(err) } @@ -410,8 +408,8 @@ func createPod(t *testing.T, name, namespace string, labels map[string]string, c } } -func createNs(t *testing.T, name string, clientSet clientset.Interface) { - _, err := clientSet.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ +func createNs(ctx context.Context, t *testing.T, name string, clientSet clientset.Interface) { + _, err := clientSet.CoreV1().Namespaces().Create(ctx, &v1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: name, }, @@ -463,9 +461,9 @@ func newCustomResourceDefinition() *apiextensionsv1.CustomResourceDefinition { } } -func waitPDBStable(t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) { +func waitPDBStable(ctx context.Context, t *testing.T, clientSet clientset.Interface, podNum int32, ns, pdbName string) { if err := wait.PollImmediate(2*time.Second, 60*time.Second, func() (bool, error) { - pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(context.TODO(), pdbName, metav1.GetOptions{}) + pdb, err := clientSet.PolicyV1beta1().PodDisruptionBudgets(ns).Get(ctx, pdbName, metav1.GetOptions{}) if err != nil { return false, err } diff --git a/test/integration/evictions/evictions_test.go b/test/integration/evictions/evictions_test.go index 491712a4df0..f6e1987335d 100644 --- a/test/integration/evictions/evictions_test.go +++ b/test/integration/evictions/evictions_test.go @@ -67,10 +67,10 @@ func TestConcurrentEvictionRequests(t *testing.T) { ns := framework.CreateTestingNamespace("concurrent-eviction-requests", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := make(chan struct{}) - informers.Start(stopCh) - go rm.Run(stopCh) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informers.Start(ctx.Done()) + go rm.Run(ctx) config := restclient.Config{Host: s.URL} clientSet, err := clientset.NewForConfig(&config) @@ -186,10 +186,10 @@ func TestTerminalPodEviction(t *testing.T) { ns := framework.CreateTestingNamespace("terminalpod-eviction", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := make(chan struct{}) - informers.Start(stopCh) - go rm.Run(stopCh) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informers.Start(ctx.Done()) + go rm.Run(ctx) config := restclient.Config{Host: s.URL} clientSet, err := clientset.NewForConfig(&config) @@ -262,10 +262,10 @@ func TestEvictionVersions(t *testing.T) { s, closeFn, rm, informers, clientSet := rmSetup(t) defer closeFn() - stopCh := make(chan struct{}) - informers.Start(stopCh) - go rm.Run(stopCh) - defer close(stopCh) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + informers.Start(ctx.Done()) + go rm.Run(ctx) config := restclient.Config{Host: s.URL} diff --git a/test/integration/scheduler/util.go b/test/integration/scheduler/util.go index 69dde6e0894..61c793d8933 100644 --- a/test/integration/scheduler/util.go +++ b/test/integration/scheduler/util.go @@ -77,7 +77,7 @@ func initDisruptionController(t *testing.T, testCtx *testutils.TestContext) *dis informers.Start(testCtx.Scheduler.StopEverything) informers.WaitForCacheSync(testCtx.Scheduler.StopEverything) - go dc.Run(testCtx.Scheduler.StopEverything) + go dc.Run(testCtx.Ctx) return dc }