From 3ce538812854d4223618ee6024b30adad89f9144 Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Fri, 9 Feb 2018 16:03:07 -0800 Subject: [PATCH] Add Local PV stress test --- test/e2e/storage/persistent_volumes-local.go | 313 +++++++++++++++---- 1 file changed, 251 insertions(+), 62 deletions(-) diff --git a/test/e2e/storage/persistent_volumes-local.go b/test/e2e/storage/persistent_volumes-local.go index da75a0552f1..d3065ee0bbc 100644 --- a/test/e2e/storage/persistent_volumes-local.go +++ b/test/e2e/storage/persistent_volumes-local.go @@ -18,11 +18,11 @@ package storage import ( "fmt" - "math/rand" "path" "path/filepath" "strconv" "strings" + "sync" "time" "github.com/ghodss/yaml" @@ -49,12 +49,13 @@ import ( ) type localTestConfig struct { - ns string - nodes []v1.Node - node0 *v1.Node - client clientset.Interface - scName string - ssTester *framework.StatefulSetTester + ns string + nodes []v1.Node + node0 *v1.Node + client clientset.Interface + scName string + ssTester *framework.StatefulSetTester + discoveryDir string } type localVolumeType string @@ -97,8 +98,6 @@ const ( // TODO: This may not be available/writable on all images. hostBase = "/tmp" containerBase = "/myvol" - // 'hostBase + discoveryDir' is the path for volume discovery. - discoveryDir = "disks" // Path to the first volume in the test containers // created via createLocalPod or makeLocalPod // leveraging pv_util.MakePod @@ -153,18 +152,19 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { // Get all the schedulable nodes nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet) Expect(len(nodes.Items)).NotTo(BeZero(), "No available nodes for scheduling") - scName = fmt.Sprintf("%v-%v-%v", testSCPrefix, f.Namespace.Name, rand.Int()) + scName = fmt.Sprintf("%v-%v", testSCPrefix, f.Namespace.Name) // Choose the first node node0 := &nodes.Items[0] ssTester := framework.NewStatefulSetTester(f.ClientSet) config = &localTestConfig{ - ns: f.Namespace.Name, - client: f.ClientSet, - nodes: nodes.Items, - node0: node0, - scName: scName, - ssTester: ssTester, + ns: f.Namespace.Name, + client: f.ClientSet, + nodes: nodes.Items, + node0: node0, + scName: scName, + ssTester: ssTester, + discoveryDir: filepath.Join(hostBase, f.Namespace.Name), } }) @@ -333,24 +333,21 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { BeforeEach(func() { setupStorageClass(config, &immediateMode) setupLocalVolumeProvisioner(config) - volumePath = path.Join( - hostBase, discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) - setupLocalVolumeProvisionerMountPoint(config, volumePath) + volumePath = path.Join(config.discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) + setupLocalVolumeProvisionerMountPoint(config, volumePath, config.node0) }) AfterEach(func() { - cleanupLocalVolumeProvisionerMountPoint(config, volumePath) - cleanupLocalVolumeProvisioner(config, volumePath) + cleanupLocalVolumeProvisionerMountPoint(config, volumePath, config.node0) + cleanupLocalVolumeProvisioner(config) cleanupStorageClass(config) }) It("should create and recreate local persistent volume", func() { By("Starting a provisioner daemonset") createProvisionerDaemonset(config) - kind := schema.GroupKind{Group: "extensions", Kind: "DaemonSet"} - framework.WaitForControlledPodsRunning(config.client, config.ns, daemonSetName, kind) - By("Waiting for a PersitentVolume to be created") + By("Waiting for a PersistentVolume to be created") oldPV, err := waitForLocalPersistentVolume(config.client, volumePath) Expect(err).NotTo(HaveOccurred()) @@ -381,6 +378,9 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { fileDoesntExistCmd := createFileDoesntExistCmd(volumePath, testFile) err = framework.IssueSSHCommand(fileDoesntExistCmd, framework.TestContext.Provider, config.node0) Expect(err).NotTo(HaveOccurred()) + + By("Deleting provisioner daemonset") + deleteProvisionerDaemonset(config) }) }) @@ -420,9 +420,158 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() { }) }) - // TODO: add stress test that creates many pods in parallel across multiple nodes + Context("Stress with local volume provisioner [Serial]", func() { + var testVols [][]string + + const ( + volsPerNode = 10 // Make this non-divisable by volsPerPod to increase changes of partial binding failure + volsPerPod = 3 + podsFactor = 5 + ) + + BeforeEach(func() { + setupStorageClass(config, &waitMode) + setupLocalVolumeProvisioner(config) + + testVols = [][]string{} + for i, node := range config.nodes { + By(fmt.Sprintf("Setting up local volumes on node %q", node.Name)) + paths := []string{} + for j := 0; j < volsPerNode; j++ { + volumePath := path.Join(config.discoveryDir, fmt.Sprintf("vol-%v", string(uuid.NewUUID()))) + setupLocalVolumeProvisionerMountPoint(config, volumePath, &config.nodes[i]) + paths = append(paths, volumePath) + } + testVols = append(testVols, paths) + } + + By("Starting the local volume provisioner") + createProvisionerDaemonset(config) + }) + + AfterEach(func() { + By("Deleting provisioner daemonset") + deleteProvisionerDaemonset(config) + + for i, paths := range testVols { + for _, volumePath := range paths { + cleanupLocalVolumeProvisionerMountPoint(config, volumePath, &config.nodes[i]) + } + } + cleanupLocalVolumeProvisioner(config) + cleanupStorageClass(config) + }) + + It("should use be able to process many pods and reuse local volumes", func() { + var ( + podsLock sync.Mutex + // Have one extra pod pending + numConcurrentPods = volsPerNode/volsPerPod*len(config.nodes) + 1 + totalPods = numConcurrentPods * podsFactor + numCreated = 0 + numFinished = 0 + pods = map[string]*v1.Pod{} + ) + + // Create pods gradually instead of all at once because scheduler has + // exponential backoff + // TODO: this is still a bit slow because of the provisioner polling period + By(fmt.Sprintf("Creating %v pods periodically", numConcurrentPods)) + stop := make(chan struct{}) + go wait.Until(func() { + podsLock.Lock() + defer podsLock.Unlock() + + if numCreated >= totalPods { + // Created all the pods for the test + return + } + + if len(pods) > numConcurrentPods/2 { + // Too many outstanding pods + return + } + + for i := 0; i < numConcurrentPods; i++ { + pvcs := []*v1.PersistentVolumeClaim{} + for j := 0; j < volsPerPod; j++ { + pvc := framework.MakePersistentVolumeClaim(makeLocalPVCConfig(config), config.ns) + pvc, err := framework.CreatePVC(config.client, config.ns, pvc) + framework.ExpectNoError(err) + pvcs = append(pvcs, pvc) + } + + pod := framework.MakeSecPod(config.ns, pvcs, false, "sleep 1", false, false, selinuxLabel) + pod, err := config.client.CoreV1().Pods(config.ns).Create(pod) + Expect(err).NotTo(HaveOccurred()) + pods[pod.Name] = pod + numCreated++ + } + }, 2*time.Second, stop) + + defer func() { + close(stop) + podsLock.Lock() + defer podsLock.Unlock() + + for _, pod := range pods { + if err := deletePodAndPVCs(config, pod); err != nil { + framework.Logf("Deleting pod %v failed: %v", pod.Name, err) + } + } + }() + + By("Waiting for all pods to complete successfully") + err := wait.PollImmediate(time.Second, 5*time.Minute, func() (done bool, err error) { + podsList, err := config.client.CoreV1().Pods(config.ns).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + + podsLock.Lock() + defer podsLock.Unlock() + + for _, pod := range podsList.Items { + switch pod.Status.Phase { + case v1.PodSucceeded: + // Delete pod and its PVCs + if err := deletePodAndPVCs(config, &pod); err != nil { + return false, err + } + delete(pods, pod.Name) + numFinished++ + framework.Logf("%v/%v pods finished", numFinished, totalPods) + case v1.PodFailed: + case v1.PodUnknown: + return false, fmt.Errorf("pod %v is in %v phase", pod.Name, pod.Status.Phase) + } + } + + return numFinished == totalPods, nil + }) + Expect(err).ToNot(HaveOccurred()) + }) + }) }) +func deletePodAndPVCs(config *localTestConfig, pod *v1.Pod) error { + framework.Logf("Deleting pod %v", pod.Name) + if err := config.client.CoreV1().Pods(config.ns).Delete(pod.Name, nil); err != nil { + return err + } + + // Delete PVCs + for _, vol := range pod.Spec.Volumes { + pvcSource := vol.VolumeSource.PersistentVolumeClaim + if pvcSource != nil { + if err := framework.DeletePersistentVolumeClaim(config.client, pvcSource.ClaimName, config.ns); err != nil { + return err + } + } + } + return nil +} + type makeLocalPodWith func(config *localTestConfig, volume *localTestVolume, nodeName string) *v1.Pod func testPodWithNodeConflict(config *localTestConfig, testVolType localVolumeType, nodeName string, makeLocalPodFunc makeLocalPodWith, bindingMode storagev1.VolumeBindingMode) { @@ -869,50 +1018,56 @@ func setupLocalVolumeProvisioner(config *localTestConfig) { createProvisionerClusterRoleBinding(config) createVolumeConfigMap(config) - By("Initializing local volume discovery base path") - mkdirCmd := fmt.Sprintf("mkdir -p %v -m 777", path.Join(hostBase, discoveryDir)) - err := framework.IssueSSHCommand(mkdirCmd, framework.TestContext.Provider, config.node0) - Expect(err).NotTo(HaveOccurred()) + for _, node := range config.nodes { + By(fmt.Sprintf("Initializing local volume discovery base path on node %v", node.Name)) + mkdirCmd := fmt.Sprintf("mkdir -p %v -m 777", config.discoveryDir) + err := framework.IssueSSHCommand(mkdirCmd, framework.TestContext.Provider, &node) + Expect(err).NotTo(HaveOccurred()) + } } -func cleanupLocalVolumeProvisioner(config *localTestConfig, volumePath string) { +func cleanupLocalVolumeProvisioner(config *localTestConfig) { By("Cleaning up cluster role binding") deleteClusterRoleBinding(config) - By("Removing the test discovery directory") - removeCmd := fmt.Sprintf("rm -r %s", path.Join(hostBase, discoveryDir)) - err := framework.IssueSSHCommand(removeCmd, framework.TestContext.Provider, config.node0) + for _, node := range config.nodes { + By(fmt.Sprintf("Removing the test discovery directory on node %v", node.Name)) + removeCmd := fmt.Sprintf("[ ! -e %v ] || rm -r %v", config.discoveryDir, config.discoveryDir) + err := framework.IssueSSHCommand(removeCmd, framework.TestContext.Provider, &node) + Expect(err).NotTo(HaveOccurred()) + } +} + +func setupLocalVolumeProvisionerMountPoint(config *localTestConfig, volumePath string, node *v1.Node) { + By(fmt.Sprintf("Creating local directory at path %q", volumePath)) + mkdirCmd := fmt.Sprintf("mkdir %v -m 777", volumePath) + err := framework.IssueSSHCommand(mkdirCmd, framework.TestContext.Provider, node) + Expect(err).NotTo(HaveOccurred()) + + By(fmt.Sprintf("Mounting local directory at path %q", volumePath)) + mntCmd := fmt.Sprintf("sudo mount --bind %v %v", volumePath, volumePath) + err = framework.IssueSSHCommand(mntCmd, framework.TestContext.Provider, node) + Expect(err).NotTo(HaveOccurred()) +} + +func cleanupLocalVolumeProvisionerMountPoint(config *localTestConfig, volumePath string, node *v1.Node) { + By(fmt.Sprintf("Unmounting the test mount point from %q", volumePath)) + umountCmd := fmt.Sprintf("[ ! -e %v ] || sudo umount %v", volumePath, volumePath) + err := framework.IssueSSHCommand(umountCmd, framework.TestContext.Provider, node) + Expect(err).NotTo(HaveOccurred()) + + By("Removing the test mount point") + removeCmd := fmt.Sprintf("[ ! -e %v ] || rm -r %v", volumePath, volumePath) + err = framework.IssueSSHCommand(removeCmd, framework.TestContext.Provider, node) Expect(err).NotTo(HaveOccurred()) By("Cleaning up persistent volume") pv, err := findLocalPersistentVolume(config.client, volumePath) Expect(err).NotTo(HaveOccurred()) - err = config.client.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{}) - Expect(err).NotTo(HaveOccurred()) -} - -func setupLocalVolumeProvisionerMountPoint(config *localTestConfig, volumePath string) { - By(fmt.Sprintf("Creating local directory at path %q", volumePath)) - mkdirCmd := fmt.Sprintf("mkdir %v -m 777", volumePath) - err := framework.IssueSSHCommand(mkdirCmd, framework.TestContext.Provider, config.node0) - Expect(err).NotTo(HaveOccurred()) - - By(fmt.Sprintf("Mounting local directory at path %q", volumePath)) - mntCmd := fmt.Sprintf("sudo mount --bind %v %v", volumePath, volumePath) - err = framework.IssueSSHCommand(mntCmd, framework.TestContext.Provider, config.node0) - Expect(err).NotTo(HaveOccurred()) -} - -func cleanupLocalVolumeProvisionerMountPoint(config *localTestConfig, volumePath string) { - By(fmt.Sprintf("Unmounting the test mount point from %q", volumePath)) - umountCmd := fmt.Sprintf("sudo umount %v", volumePath) - err := framework.IssueSSHCommand(umountCmd, framework.TestContext.Provider, config.node0) - Expect(err).NotTo(HaveOccurred()) - - By("Removing the test mount point") - removeCmd := fmt.Sprintf("rm -r %s", volumePath) - err = framework.IssueSSHCommand(removeCmd, framework.TestContext.Provider, config.node0) - + if pv != nil { + err = config.client.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + } } func createServiceAccount(config *localTestConfig) { @@ -967,6 +1122,7 @@ func createProvisionerClusterRoleBinding(config *localTestConfig) { Subjects: subjects, } + deleteClusterRoleBinding(config) _, err := config.client.RbacV1beta1().ClusterRoleBindings().Create(&pvBinding) Expect(err).NotTo(HaveOccurred()) _, err = config.client.RbacV1beta1().ClusterRoleBindings().Create(&nodeBinding) @@ -995,7 +1151,7 @@ func createVolumeConfigMap(config *localTestConfig) { var provisionerConfig ProvisionerConfiguration provisionerConfig.StorageClassConfig = map[string]MountConfig{ config.scName: { - HostDir: path.Join(hostBase, discoveryDir), + HostDir: config.discoveryDir, MountDir: provisionerDefaultMountRoot, }, } @@ -1016,6 +1172,7 @@ func createVolumeConfigMap(config *localTestConfig) { "storageClassMap": string(data), }, } + _, err = config.client.CoreV1().ConfigMaps(config.ns).Create(&configMap) Expect(err).NotTo(HaveOccurred()) } @@ -1093,7 +1250,7 @@ func createProvisionerDaemonset(config *localTestConfig) { Name: "local-disks", VolumeSource: v1.VolumeSource{ HostPath: &v1.HostPathVolumeSource{ - Path: path.Join(hostBase, discoveryDir), + Path: config.discoveryDir, }, }, }, @@ -1104,6 +1261,37 @@ func createProvisionerDaemonset(config *localTestConfig) { } _, err := config.client.ExtensionsV1beta1().DaemonSets(config.ns).Create(provisioner) Expect(err).NotTo(HaveOccurred()) + + kind := schema.GroupKind{Group: "extensions", Kind: "DaemonSet"} + framework.WaitForControlledPodsRunning(config.client, config.ns, daemonSetName, kind) +} + +func deleteProvisionerDaemonset(config *localTestConfig) { + ds, err := config.client.ExtensionsV1beta1().DaemonSets(config.ns).Get(daemonSetName, metav1.GetOptions{}) + if ds == nil { + return + } + + err = config.client.ExtensionsV1beta1().DaemonSets(config.ns).Delete(daemonSetName, nil) + Expect(err).NotTo(HaveOccurred()) + + err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) { + pods, err := config.client.CoreV1().Pods(config.ns).List(metav1.ListOptions{}) + if err != nil { + return false, err + } + + for _, pod := range pods.Items { + if metav1.IsControlledBy(&pod, ds) { + // DaemonSet pod still exists + return false, nil + } + } + + // All DaemonSet pods are deleted + return true, nil + }) + Expect(err).NotTo(HaveOccurred()) } // newLocalClaim creates a new persistent volume claim. @@ -1192,7 +1380,8 @@ func findLocalPersistentVolume(c clientset.Interface, volumePath string) (*v1.Pe return &p, nil } } - return nil, fmt.Errorf("Unable to find local persistent volume with path %v", volumePath) + // Doesn't exist, that's fine, it could be invoked by early cleanup + return nil, nil } func createStatefulSet(config *localTestConfig, ssReplicas int32, volumeCount int) *appsv1.StatefulSet {