Clean up e2e stress test resources concurrently

This commit is contained in:
Chris Henzie 2021-02-19 12:51:05 -08:00
parent 93d288e2a4
commit 5130db416d
2 changed files with 49 additions and 28 deletions

View File

@ -194,10 +194,9 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
wg sync.WaitGroup wg sync.WaitGroup
) )
for i, snapshot := range stressTest.snapshots { wg.Add(len(stressTest.snapshots))
wg.Add(1) for _, snapshot := range stressTest.snapshots {
go func(snapshot *storageframework.SnapshotResource) {
go func(i int, snapshot *storageframework.SnapshotResource) {
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
defer wg.Done() defer wg.Done()
@ -206,14 +205,13 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
errs = append(errs, err) errs = append(errs, err)
}(i, snapshot) }(snapshot)
} }
wg.Wait() wg.Wait()
for i, pod := range stressTest.pods { wg.Add(len(stressTest.pods))
wg.Add(1) for _, pod := range stressTest.pods {
go func(pod *v1.Pod) {
go func(i int, pod *v1.Pod) {
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
defer wg.Done() defer wg.Done()
@ -222,14 +220,13 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
errs = append(errs, err) errs = append(errs, err)
}(i, pod) }(pod)
} }
wg.Wait() wg.Wait()
for i, volume := range stressTest.volumes { wg.Add(len(stressTest.volumes))
wg.Add(1) for _, volume := range stressTest.volumes {
go func(volume *storageframework.VolumeResource) {
go func(i int, volume *storageframework.VolumeResource) {
defer ginkgo.GinkgoRecover() defer ginkgo.GinkgoRecover()
defer wg.Done() defer wg.Done()
@ -238,7 +235,7 @@ func (t *snapshottableStressTestSuite) DefineTests(driver storageframework.TestD
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
errs = append(errs, err) errs = append(errs, err)
}(i, volume) }(volume)
} }
wg.Wait() wg.Wait()

View File

@ -46,8 +46,8 @@ type volumeStressTest struct {
migrationCheck *migrationOpCheck migrationCheck *migrationOpCheck
resources []*storageframework.VolumeResource volumes []*storageframework.VolumeResource
pods []*v1.Pod pods []*v1.Pod
// stop and wait for any async routines // stop and wait for any async routines
wg sync.WaitGroup wg sync.WaitGroup
ctx context.Context ctx context.Context
@ -121,7 +121,7 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver,
// Now do the more expensive test initialization. // Now do the more expensive test initialization.
l.config, l.driverCleanup = driver.PrepareTest(f) l.config, l.driverCleanup = driver.PrepareTest(f)
l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName) l.migrationCheck = newMigrationOpCheck(f.ClientSet, dInfo.InTreePluginName)
l.resources = []*storageframework.VolumeResource{} l.volumes = []*storageframework.VolumeResource{}
l.pods = []*v1.Pod{} l.pods = []*v1.Pod{}
l.testOptions = *dInfo.StressTestOptions l.testOptions = *dInfo.StressTestOptions
l.ctx, l.cancel = context.WithCancel(context.Background()) l.ctx, l.cancel = context.WithCancel(context.Background())
@ -131,7 +131,7 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver,
for i := 0; i < l.testOptions.NumPods; i++ { for i := 0; i < l.testOptions.NumPods; i++ {
framework.Logf("Creating resources for pod %v/%v", i, l.testOptions.NumPods-1) framework.Logf("Creating resources for pod %v/%v", i, l.testOptions.NumPods-1)
r := storageframework.CreateVolumeResource(driver, l.config, pattern, t.GetTestSuiteInfo().SupportedSizeRange) r := storageframework.CreateVolumeResource(driver, l.config, pattern, t.GetTestSuiteInfo().SupportedSizeRange)
l.resources = append(l.resources, r) l.volumes = append(l.volumes, r)
podConfig := e2epod.Config{ podConfig := e2epod.Config{
NS: f.Namespace.Name, NS: f.Namespace.Name,
PVCs: []*v1.PersistentVolumeClaim{r.Pvc}, PVCs: []*v1.PersistentVolumeClaim{r.Pvc},
@ -145,21 +145,45 @@ func (t *volumeStressTestSuite) DefineTests(driver storageframework.TestDriver,
} }
cleanup := func() { cleanup := func() {
var errs []error
framework.Logf("Stopping and waiting for all test routines to finish") framework.Logf("Stopping and waiting for all test routines to finish")
l.cancel() l.cancel()
l.wg.Wait() l.wg.Wait()
for _, pod := range l.pods { var (
framework.Logf("Deleting pod %v", pod.Name) errs []error
err := e2epod.DeletePodWithWait(cs, pod) mu sync.Mutex
errs = append(errs, err) wg sync.WaitGroup
} )
for _, resource := range l.resources { wg.Add(len(l.pods))
errs = append(errs, resource.CleanupResource()) for _, pod := range l.pods {
go func(pod *v1.Pod) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
framework.Logf("Deleting pod %v", pod.Name)
err := e2epod.DeletePodWithWait(cs, pod)
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(pod)
} }
wg.Wait()
wg.Add(len(l.volumes))
for _, volume := range l.volumes {
go func(volume *storageframework.VolumeResource) {
defer ginkgo.GinkgoRecover()
defer wg.Done()
framework.Logf("Deleting volume %s", volume.Pvc.GetName())
err := volume.CleanupResource()
mu.Lock()
defer mu.Unlock()
errs = append(errs, err)
}(volume)
}
wg.Wait()
errs = append(errs, storageutils.TryFunc(l.driverCleanup)) errs = append(errs, storageutils.TryFunc(l.driverCleanup))
framework.ExpectNoError(errors.NewAggregate(errs), "while cleaning up resource") framework.ExpectNoError(errors.NewAggregate(errs), "while cleaning up resource")