Merge pull request #99244 from chrishenzie/stress-concurrent-cleanup

Clean up e2e stress test resources concurrently
This commit is contained in:
Kubernetes Prow Robot 2021-02-19 23:07:41 -08:00 committed by GitHub
commit 8eef1a9bf7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 49 additions and 28 deletions

View File

@ -194,10 +194,9 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
wg sync.WaitGroup
)
for i, snapshot := range stressTest.snapshots {
wg.Add(1)
go func(i int, snapshot *storageframework.SnapshotResource) {
wg.Add(len(stressTest.snapshots))
for _, snapshot := range stressTest.snapshots {
go func(snapshot *storageframework.SnapshotResource) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
@ -206,14 +205,13 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(i, snapshot)
}(snapshot)
}
wg.Wait()
for i, pod := range stressTest.pods {
wg.Add(1)
go func(i int, pod *v1.Pod) {
wg.Add(len(stressTest.pods))
for _, pod := range stressTest.pods {
go func(pod *v1.Pod) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
@ -222,14 +220,13 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(i, pod)
}(pod)
}
wg.Wait()
for i, volume := range stressTest.volumes {
wg.Add(1)
go func(i int, volume *storageframework.VolumeResource) {
wg.Add(len(stressTest.volumes))
for _, volume := range stressTest.volumes {
go func(volume *storageframework.VolumeResource) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
@ -238,7 +235,7 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(i, volume)
}(volume)
}
wg.Wait()

View File

@ -46,8 +46,8 @@ type volumeStressTest struct {
migrationCheck *migrationOpCheck
resources []*storageframework.VolumeResource
pods []*v1.Pod
volumes []*storageframework.VolumeResource
pods []*v1.Pod
// stop and wait for any async routines
wg sync.WaitGroup
ctx context.Context
@ -121,7 +121,7 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver,
// Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName)
l.resources = []*storageframework.VolumeResource{}
l.volumes = []*storageframework.VolumeResource{}
l.pods = []*v1.Pod{}
l.testOptions = *dInfo.StressTestOptions
l.ctx, l.cancel = context.WithCancel(context.Background())
@ -131,7 +131,7 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver,
for i := 0; i < l.testOptions.NumPods; i++ {
framework.Logf("Creating resources for pod %v/%v", i, l.testOptions.NumPods-1)
r := storageframework.CreateVolumeResource(driver, l.config, pattern, t.GetTestSuiteInfo().SupportedSizeRange)
l.resources = append(l.resources, r)
l.volumes = append(l.volumes, r)
podConfig := e2epod.Config{
NS: f.Namespace.Name,
PVCs: []*v1.PersistentVolumeClaim{r.Pvc},
@ -145,21 +145,45 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver,
}
cleanup := func() {
var errs []error
framework.Logf("Stopping and waiting for all test routines to finish")
l.cancel()
l.wg.Wait()
for _, pod := range l.pods {
framework.Logf("Deleting pod %v", pod.Name)
err := e2epod.DeletePodWithWait(cs, pod)
errs = append(errs, err)
}
var (
errs []error
mu sync.Mutex
wg sync.WaitGroup
)
for _, resource := range l.resources {
errs = append(errs, resource.CleanupResource())
wg.Add(len(l.pods))
for _, pod := range l.pods {
go func(pod *v1.Pod) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
framework.Logf("Deleting pod %v", pod.Name)
err := e2epod.DeletePodWithWait(cs, pod)
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(pod)
}
wg.Wait()
wg.Add(len(l.volumes))
for _, volume := range l.volumes {
go func(volume *storageframework.VolumeResource) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
framework.Logf("Deleting volume %s", volume.Pvc.GetName())
err := volume.CleanupResource()
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(volume)
}
wg.Wait()
errs = append(errs, storageutils.TryFunc(l.driverCleanup))
framework.ExpectNoError(errors.NewAggregate(errs), "while cleaning up resource")