From c0435352505754627c53ba2a5ff73987c8435357 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 11 May 2022 14:09:29 +0200 Subject: [PATCH 1/3] Clear shutdown in replicaset integration test --- .../integration/replicaset/replicaset_test.go | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/test/integration/replicaset/replicaset_test.go b/test/integration/replicaset/replicaset_test.go index 910fa9a8f10..e0c7feaedde 100644 --- a/test/integration/replicaset/replicaset_test.go +++ b/test/integration/replicaset/replicaset_test.go @@ -152,12 +152,12 @@ func rmSimpleSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, clients } // Run RS controller and informers -func runControllerAndInformers(t *testing.T, rm *replicaset.ReplicaSetController, informers informers.SharedInformerFactory, podNum int) chan struct{} { - stopCh := make(chan struct{}) - informers.Start(stopCh) +func runControllerAndInformers(t *testing.T, rm *replicaset.ReplicaSetController, informers informers.SharedInformerFactory, podNum int) func() { + ctx, cancelFn := context.WithCancel(context.Background()) + informers.Start(ctx.Done()) waitToObservePods(t, informers.Core().V1().Pods().Informer(), podNum) - go rm.Run(context.TODO(), 5) - return stopCh + go rm.Run(ctx, 5) + return cancelFn } // wait for the podInformer to observe the pods. Call this function before @@ -443,8 +443,8 @@ func TestAdoption(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - stopCh := runControllerAndInformers(t, rm, informers, 1) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 1) + defer stopControllers() if err := wait.PollImmediate(interval, timeout, func() (bool, error) { updatedPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{}) if err != nil { @@ -498,8 +498,8 @@ func TestSpecReplicasChange(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-spec-replicas-change", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 2) rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{}) @@ -540,8 +540,8 @@ func TestDeletingAndFailedPods(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-deleting-and-failed-pods", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 2) rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{}) @@ -643,8 +643,8 @@ func TestPodDeletionCost(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace(tc.name, s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 2) rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{}) @@ -702,8 +702,8 @@ func TestOverlappingRSs(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-overlapping-rss", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() // Create 2 RSs with identical selectors for i := 0; i < 2; i++ { @@ -737,8 +737,8 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-pod-orphaning-and-adoption-when-labels-change", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 1) rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{}) @@ -814,8 +814,8 @@ func TestGeneralPodAdoption(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-general-pod-adoption", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 1) rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{}) @@ -846,8 +846,8 @@ func TestReadyAndAvailableReplicas(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-ready-and-available-replicas", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 3) rs.Spec.MinReadySeconds = 3600 @@ -898,8 +898,8 @@ func TestRSScaleSubresource(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-rs-scale-subresource", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 1) rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, []*v1.Pod{}) @@ -928,8 +928,8 @@ func TestExtraPodsAdoptionAndDeletion(t *testing.T) { } rss, _ := createRSsPods(t, c, []*apps.ReplicaSet{rs}, podList) rs = rss[0] - stopCh := runControllerAndInformers(t, rm, informers, 3) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 3) + defer stopControllers() waitRSStable(t, c, rs) // Verify the extra pod is deleted eventually by determining whether number of @@ -949,8 +949,8 @@ func TestFullyLabeledReplicas(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-fully-labeled-replicas", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} rs := newRS("rs", ns.Name, 2) @@ -992,8 +992,8 @@ func TestReplicaSetsAppsV1DefaultGCPolicy(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-default-gc-v1", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rs := newRS("rs", ns.Name, 2) fakeFinalizer := "kube.io/dummy-finalizer" From ca68691feb7bff622c7bbd5ece4de0a7827d08b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 11 May 2022 14:15:16 +0200 Subject: [PATCH 2/3] Clear shutdown in replicationcontroller integration test --- .../replicationcontroller_test.go | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/test/integration/replicationcontroller/replicationcontroller_test.go b/test/integration/replicationcontroller/replicationcontroller_test.go index edd5139a6a4..1b408de4efa 100644 --- a/test/integration/replicationcontroller/replicationcontroller_test.go +++ b/test/integration/replicationcontroller/replicationcontroller_test.go @@ -133,12 +133,12 @@ func rmSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *replication. } // Run RC controller and informers -func runControllerAndInformers(t *testing.T, rm *replication.ReplicationManager, informers informers.SharedInformerFactory, podNum int) chan struct{} { - stopCh := make(chan struct{}) - informers.Start(stopCh) +func runControllerAndInformers(t *testing.T, rm *replication.ReplicationManager, informers informers.SharedInformerFactory, podNum int) func() { + ctx, cancelFn := context.WithCancel(context.Background()) + informers.Start(ctx.Done()) waitToObservePods(t, informers.Core().V1().Pods().Informer(), podNum) - go rm.Run(context.TODO(), 5) - return stopCh + go rm.Run(ctx, 5) + return cancelFn } // wait for the podInformer to observe the pods. Call this function before @@ -431,8 +431,8 @@ func TestAdoption(t *testing.T) { t.Fatalf("Failed to create Pod: %v", err) } - stopCh := runControllerAndInformers(t, rm, informers, 1) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 1) + defer stopControllers() if err := wait.PollImmediate(interval, timeout, func() (bool, error) { updatedPod, err := podClient.Get(context.TODO(), pod.Name, metav1.GetOptions{}) if err != nil { @@ -458,8 +458,8 @@ func TestSpecReplicasChange(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-spec-replicas-change", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rc := newRC("rc", ns.Name, 2) rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{}) @@ -501,8 +501,8 @@ func TestLogarithmicScaleDown(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-spec-replicas-change", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rc := newRC("rc", ns.Name, 2) rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{}) @@ -538,8 +538,8 @@ func TestDeletingAndFailedPods(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-deleting-and-failed-pods", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rc := newRC("rc", ns.Name, 2) rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{}) @@ -603,8 +603,8 @@ func TestOverlappingRCs(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-overlapping-rcs", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() // Create 2 RCs with identical selectors for i := 0; i < 2; i++ { @@ -638,8 +638,8 @@ func TestPodOrphaningAndAdoptionWhenLabelsChange(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-pod-orphaning-and-adoption-when-labels-change", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rc := newRC("rc", ns.Name, 1) rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{}) @@ -715,8 +715,8 @@ func TestGeneralPodAdoption(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-general-pod-adoption", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rc := newRC("rc", ns.Name, 1) rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{}) @@ -747,8 +747,8 @@ func TestReadyAndAvailableReplicas(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-ready-and-available-replicas", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rc := newRC("rc", ns.Name, 3) rc.Spec.MinReadySeconds = 3600 @@ -799,8 +799,8 @@ func TestRCScaleSubresource(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-rc-scale-subresource", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() rc := newRC("rc", ns.Name, 1) rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, []*v1.Pod{}) @@ -829,8 +829,8 @@ func TestExtraPodsAdoptionAndDeletion(t *testing.T) { } rcs, _ := createRCsPods(t, c, []*v1.ReplicationController{rc}, podList) rc = rcs[0] - stopCh := runControllerAndInformers(t, rm, informers, 3) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 3) + defer stopControllers() waitRCStable(t, c, rc) // Verify the extra pod is deleted eventually by determining whether number of @@ -850,8 +850,8 @@ func TestFullyLabeledReplicas(t *testing.T) { defer closeFn() ns := framework.CreateTestingNamespace("test-fully-labeled-replicas", s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := runControllerAndInformers(t, rm, informers, 0) - defer close(stopCh) + stopControllers := runControllerAndInformers(t, rm, informers, 0) + defer stopControllers() extraLabelMap := map[string]string{"foo": "bar", "extraKey": "extraValue"} rc := newRC("rc", ns.Name, 2) From 1fcc5d9eae1ea654a66a82771b930352552651ab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Wojciech=20Tyczy=C5=84ski?= Date: Wed, 11 May 2022 14:49:54 +0200 Subject: [PATCH 3/3] Clear shutdown in deployment integration test --- .../integration/deployment/deployment_test.go | 93 ++++++------------- test/integration/deployment/util.go | 9 ++ 2 files changed, 37 insertions(+), 65 deletions(-) diff --git a/test/integration/deployment/deployment_test.go b/test/integration/deployment/deployment_test.go index 64a346b23b7..7430623aed9 100644 --- a/test/integration/deployment/deployment_test.go +++ b/test/integration/deployment/deployment_test.go @@ -54,11 +54,8 @@ func TestNewDeployment(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -121,11 +118,8 @@ func TestDeploymentRollingUpdate(t *testing.T) { defer framework.DeleteTestingNamespace(ns, s, t) // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() replicas := int32(20) tester := &deploymentTester{t: t, c: c, deployment: newDeployment(name, ns.Name, replicas)} @@ -264,11 +258,8 @@ func TestPausedDeployment(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Verify that the paused deployment won't create new replica set. if err := tester.expectNoNewReplicaSet(); err != nil { @@ -365,11 +356,8 @@ func TestScalePausedDeployment(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -446,11 +434,8 @@ func TestDeploymentHashCollision(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -549,11 +534,8 @@ func TestFailedDeployment(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() if err = tester.waitForDeploymentUpdatedReplicasGTE(replicas); err != nil { t.Fatal(err) @@ -590,12 +572,10 @@ func TestOverlappingDeployments(t *testing.T) { {t: t, c: c, deployment: newDeployment(firstDeploymentName, ns.Name, replicas)}, {t: t, c: c, deployment: newDeployment(secondDeploymentName, ns.Name, replicas)}, } + // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Create 2 deployments with overlapping selectors var err error @@ -665,11 +645,9 @@ func TestScaledRolloutDeployment(t *testing.T) { ns := framework.CreateTestingNamespace(name, s, t) defer framework.DeleteTestingNamespace(ns, s, t) - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + // Start informer and controllers + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Create a deployment with rolling update strategy, max surge = 3, and max unavailable = 2 var err error @@ -868,11 +846,8 @@ func TestSpecReplicasChange(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Scale up/down deployment and verify its replicaset has matching .spec.replicas if err = tester.scaleDeployment(2); err != nil { @@ -926,11 +901,8 @@ func TestDeploymentAvailableCondition(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Wait for the deployment to be observed by the controller and has at least specified number of updated replicas if err = tester.waitForDeploymentUpdatedReplicasGTE(replicas); err != nil { @@ -1043,11 +1015,8 @@ func TestGeneralReplicaSetAdoption(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -1135,11 +1104,8 @@ func TestDeploymentScaleSubresource(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { @@ -1179,11 +1145,8 @@ func TestReplicaSetOrphaningAndAdoptionWhenLabelsChange(t *testing.T) { } // Start informer and controllers - stopCh := make(chan struct{}) - defer close(stopCh) - informers.Start(stopCh) - go rm.Run(context.TODO(), 5) - go dc.Run(context.TODO(), 5) + stopControllers := runControllersAndInformers(t, rm, dc, informers) + defer stopControllers() // Wait for the Deployment to be updated to revision 1 if err := tester.waitForDeploymentRevisionAndImage("1", fakeImage); err != nil { diff --git a/test/integration/deployment/util.go b/test/integration/deployment/util.go index 6f783878095..a49af3e1780 100644 --- a/test/integration/deployment/util.go +++ b/test/integration/deployment/util.go @@ -146,6 +146,15 @@ func dcSimpleSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, clients return s, closeFn, clientSet } +// runControllersAndInformers runs RS and deployment controllers and informers +func runControllersAndInformers(t *testing.T, rm *replicaset.ReplicaSetController, dc *deployment.DeploymentController, informers informers.SharedInformerFactory) func() { + ctx, cancelFn := context.WithCancel(context.Background()) + informers.Start(ctx.Done()) + go rm.Run(ctx, 5) + go dc.Run(ctx, 5) + return cancelFn +} + // addPodConditionReady sets given pod status to ready at given time func addPodConditionReady(pod *v1.Pod, time metav1.Time) { pod.Status = v1.PodStatus{