diff --git a/pkg/util/operationmanager/operationmanager.go b/pkg/util/operationmanager/operationmanager.go index eb8fed88ed0..39ff248573d 100644 --- a/pkg/util/operationmanager/operationmanager.go +++ b/pkg/util/operationmanager/operationmanager.go @@ -37,6 +37,9 @@ type OperationManager interface { // Attempts to send msg to the channel associated with ID. // Returns an error if no associated channel exists. Send(id string, msg interface{}) error + + // Returns true if an entry with the specified ID already exists. + Exists(id string) bool } // Returns a new instance of a channel manager. @@ -90,3 +93,11 @@ func (cm *operationManager) Send(id string, msg interface{}) error { cm.chanMap[id] <- msg return nil } + +// Returns true if an entry with the specified ID already exists. +func (cm *operationManager) Exists(id string) (exists bool) { + cm.RLock() + defer cm.RUnlock() + _, exists = cm.chanMap[id] + return +} diff --git a/pkg/util/operationmanager/operationmanager_test.go b/pkg/util/operationmanager/operationmanager_test.go index d19f4f3fe56..3ab23054577 100644 --- a/pkg/util/operationmanager/operationmanager_test.go +++ b/pkg/util/operationmanager/operationmanager_test.go @@ -32,15 +32,10 @@ func TestStart(t *testing.T) { sigErr := cm.Send(chanId, testMsg) // Assert - if startErr != nil { - t.Fatalf("Unexpected error on Start. Expected: Actual: <%v>", startErr) - } - if sigErr != nil { - t.Fatalf("Unexpected error on Send. Expected: Actual: <%v>", sigErr) - } - if actual := <-ch; actual != testMsg { - t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg, actual) - } + verifyNoError(t, startErr, "Start") + verifyNoError(t, sigErr, "Send") + actualMsg := <-ch + verifyMsg(t, testMsg /* expected */, actualMsg.(string) /* actual */) } func TestStartIdExists(t *testing.T) { @@ -53,12 +48,8 @@ func TestStartIdExists(t *testing.T) { _, startErr2 := cm.Start(chanId, 1 /* bufferSize */) // Assert - if startErr1 != nil { - t.Fatalf("Unexpected error on Start1. Expected: Actual: <%v>", startErr1) - } - if startErr2 == nil { - t.Fatalf("Expected error on Start2. Expected: Actual: ") - } + verifyNoError(t, startErr1, "Start1") + verifyError(t, startErr2, "Start2") } func TestStartAndAdd2Chans(t *testing.T) { @@ -76,25 +67,14 @@ func TestStartAndAdd2Chans(t *testing.T) { sigErr2 := cm.Send(chanId2, testMsg2) // Assert - if startErr1 != nil { - t.Fatalf("Unexpected error on Start1. Expected: Actual: <%v>", startErr1) - } - if startErr2 != nil { - t.Fatalf("Unexpected error on Start2. Expected: Actual: <%v>", startErr2) - } - if sigErr1 != nil { - t.Fatalf("Unexpected error on Send1. Expected: Actual: <%v>", sigErr1) - } - if sigErr2 != nil { - t.Fatalf("Unexpected error on Send2. Expected: Actual: <%v>", sigErr2) - } - if actual := <-ch1; actual != testMsg1 { - t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual) - } - if actual := <-ch2; actual != testMsg2 { - t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual) - } - + verifyNoError(t, startErr1, "Start1") + verifyNoError(t, startErr2, "Start2") + verifyNoError(t, sigErr1, "Send1") + verifyNoError(t, sigErr2, "Send2") + actualMsg1 := <-ch1 + actualMsg2 := <-ch2 + verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */) + verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */) } func TestStartAndAdd2ChansAndClose(t *testing.T) { @@ -114,26 +94,66 @@ func TestStartAndAdd2ChansAndClose(t *testing.T) { sigErr3 := cm.Send(chanId1, testMsg1) // Assert - if startErr1 != nil { - t.Fatalf("Unexpected error on Start1. Expected: Actual: <%v>", startErr1) - } - if startErr2 != nil { - t.Fatalf("Unexpected error on Start2. Expected: Actual: <%v>", startErr2) - } - if sigErr1 != nil { - t.Fatalf("Unexpected error on Send1. Expected: Actual: <%v>", sigErr1) - } - if sigErr2 != nil { - t.Fatalf("Unexpected error on Send2. Expected: Actual: <%v>", sigErr2) - } - if sigErr3 == nil { - t.Fatalf("Expected error on Send3. Expected: Actual: ", sigErr2) - } - if actual := <-ch1; actual != testMsg1 { - t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual) - } - if actual := <-ch2; actual != testMsg2 { - t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual) - } - + verifyNoError(t, startErr1, "Start1") + verifyNoError(t, startErr2, "Start2") + verifyNoError(t, sigErr1, "Send1") + verifyNoError(t, sigErr2, "Send2") + verifyError(t, sigErr3, "Send3") + actualMsg1 := <-ch1 + actualMsg2 := <-ch2 + verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */) + verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */) +} + +func TestExists(t *testing.T) { + // Arrange + cm := NewOperationManager() + chanId1 := "testChanId1" + chanId2 := "testChanId2" + + // Act & Assert + verifyExists(t, cm, chanId1, false /* expected */) + verifyExists(t, cm, chanId2, false /* expected */) + + _, startErr1 := cm.Start(chanId1, 1 /* bufferSize */) + verifyNoError(t, startErr1, "Start1") + verifyExists(t, cm, chanId1, true /* expected */) + verifyExists(t, cm, chanId2, false /* expected */) + + _, startErr2 := cm.Start(chanId2, 1 /* bufferSize */) + verifyNoError(t, startErr2, "Start2") + verifyExists(t, cm, chanId1, true /* expected */) + verifyExists(t, cm, chanId2, true /* expected */) + + cm.Close(chanId1) + verifyExists(t, cm, chanId1, false /* expected */) + verifyExists(t, cm, chanId2, true /* expected */) + + cm.Close(chanId2) + verifyExists(t, cm, chanId1, false /* expected */) + verifyExists(t, cm, chanId2, false /* expected */) +} + +func verifyExists(t *testing.T, cm OperationManager, id string, expected bool) { + if actual := cm.Exists(id); expected != actual { + t.Fatalf("Unexpected Exists(%q) response. Expected: <%v> Actual: <%v>", id, expected, actual) + } +} + +func verifyNoError(t *testing.T, err error, name string) { + if err != nil { + t.Fatalf("Unexpected response on %q. Expected: Actual: <%v>", name, err) + } +} + +func verifyError(t *testing.T, err error, name string) { + if err == nil { + t.Fatalf("Unexpected response on %q. Expected: Actual: ") + } +} + +func verifyMsg(t *testing.T, expected, actual string) { + if actual != expected { + t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", expected, actual) + } } diff --git a/pkg/volume/gce_pd/gce_util.go b/pkg/volume/gce_pd/gce_util.go index e855cb886d7..c853a6f9a77 100644 --- a/pkg/volume/gce_pd/gce_util.go +++ b/pkg/volume/gce_pd/gce_util.go @@ -43,6 +43,7 @@ const ( maxChecks = 10 maxRetries = 10 checkSleepDuration = time.Second + errorSleepDuration = 5 * time.Second ) // Singleton operation manager for managing detach clean up go routines @@ -54,24 +55,17 @@ type GCEDiskUtil struct{} // Mounts the disk to it's global path. func (diskUtil *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath string) error { glog.V(5).Infof("AttachAndMountDisk(pd, %q) where pd is %#v\r\n", globalPDPath, pd) - // Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered + + // Block execution until any pending detach goroutines for this pd have completed detachCleanupManager.Send(pd.pdName, true) + sdBefore, err := filepath.Glob(diskSDPattern) if err != nil { glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err) } sdBeforeSet := util.NewStringSet(sdBefore...) - gce, err := cloudprovider.GetCloudProvider("gce", nil) - if err != nil { - return err - } - - if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil { - return err - } - - devicePath, err := verifyAttached(pd, sdBeforeSet, gce) + devicePath, err := attachDiskAndVerify(pd, sdBeforeSet) if err != nil { return err } @@ -108,33 +102,51 @@ func (util *GCEDiskUtil) DetachDisk(pd *gcePersistentDisk) error { globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName) glog.V(5).Infof("DetachDisk(pd) where pd is %#v and the globalPDPath is %q\r\n", pd, globalPDPath) - // Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered - detachCleanupManager.Send(pd.pdName, true) - if err := pd.mounter.Unmount(globalPDPath); err != nil { return err } if err := os.Remove(globalPDPath); err != nil { return err } - // Detach the disk - gce, err := cloudprovider.GetCloudProvider("gce", nil) - if err != nil { - return err - } - if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil { - return err + + if detachCleanupManager.Exists(pd.pdName) { + glog.Warningf("Terminating new DetachDisk call for GCE PD %q. A previous detach call for this PD is still pending.", pd.pdName) + return nil + } - // Verify disk detached, retry if needed. - go verifyDetached(pd, gce) + // Detach disk, retry if needed. + go detachDiskAndVerify(pd) return nil } -// Verifys the disk device to be created has been succesffully attached, and retries if it fails. -func verifyAttached(pd *gcePersistentDisk, sdBeforeSet util.StringSet, gce cloudprovider.Interface) (string, error) { +// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails. +func attachDiskAndVerify(pd *gcePersistentDisk, sdBeforeSet util.StringSet) (string, error) { devicePaths := getDiskByIdPaths(pd) + var gce cloudprovider.Interface for numRetries := 0; numRetries < maxRetries; numRetries++ { + if gce == nil { + var err error + gce, err = cloudprovider.GetCloudProvider("gce", nil) + if err != nil || gce == nil { + // Retry on error. See issue #11321 + glog.Errorf("Error getting GCECloudProvider while attaching PD %q: %v", pd.pdName, err) + gce = nil + time.Sleep(errorSleepDuration) + continue + } + } + + if numRetries > 0 { + glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", pd.pdName) + } + + if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil { + // Retry on error. See issue #11321. Continue and verify if disk is attached, because a + // previous attach operation may still succeed. + glog.Errorf("Error attaching PD %q: %v", pd.pdName, err) + } + for numChecks := 0; numChecks < maxChecks; numChecks++ { if err := udevadmChangeToNewDrives(sdBeforeSet); err != nil { // udevadm errors should not block disk attachment, log and continue @@ -143,84 +155,107 @@ func verifyAttached(pd *gcePersistentDisk, sdBeforeSet util.StringSet, gce cloud for _, path := range devicePaths { if pathExists, err := pathExists(path); err != nil { - return "", err + // Retry on error. See issue #11321 + glog.Errorf("Error checking if path exists: %v", err) } else if pathExists { // A device path has succesfully been created for the PD - glog.V(5).Infof("Succesfully attached GCE PD %q.", pd.pdName) + glog.Infof("Succesfully attached GCE PD %q.", pd.pdName) return path, nil } } // Sleep then check again - glog.V(5).Infof("Waiting for GCE PD %q to attach.", pd.pdName) + glog.V(3).Infof("Waiting for GCE PD %q to attach.", pd.pdName) time.Sleep(checkSleepDuration) } - - // Try attaching the disk again - glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", pd.pdName) - if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil { - return "", err - } } return "", fmt.Errorf("Could not attach GCE PD %q. Timeout waiting for mount paths to be created.", pd.pdName) } -// Veify the specified persistent disk device has been succesfully detached, and retries if it fails. +// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails. // This function is intended to be called asynchronously as a go routine. -func verifyDetached(pd *gcePersistentDisk, gce cloudprovider.Interface) { +// It starts the detachCleanupManager with the specified pdName so that callers can wait for completion. +func detachDiskAndVerify(pd *gcePersistentDisk) { + glog.V(5).Infof("detachDiskAndVerify for pd %q.", pd.pdName) defer util.HandleCrash() - // Setting bufferSize to 0 so that when senders send, they are blocked until we recieve. This avoids the need to have a separate exit check. + // Start operation, so that other threads can wait on this detach operation. + // Set bufferSize to 0 so senders are blocked on send until we recieve. ch, err := detachCleanupManager.Start(pd.pdName, 0 /* bufferSize */) if err != nil { glog.Errorf("Error adding %q to detachCleanupManager: %v", pd.pdName, err) return } + defer detachCleanupManager.Close(pd.pdName) - devicePaths := getDiskByIdPaths(pd) - for numRetries := 0; numRetries < maxRetries; numRetries++ { - for numChecks := 0; numChecks < maxChecks; numChecks++ { + defer func() { + // Unblock any callers that have been waiting for this detach routine to complete. + for { select { case <-ch: - glog.Warningf("Terminating GCE PD %q detach verification. Another attach/detach call was made for this PD.", pd.pdName) - return + glog.V(5).Infof("detachDiskAndVerify for pd %q clearing chan.", pd.pdName) default: - allPathsRemoved := true - for _, path := range devicePaths { - if err := udevadmChangeToDrive(path); err != nil { - // udevadm errors should not block disk detachment, log and continue - glog.Errorf("%v", err) - } - if exists, err := pathExists(path); err != nil { - glog.Errorf("Error check path: %v", err) - return - } else { - allPathsRemoved = allPathsRemoved && !exists - } - } - if allPathsRemoved { - // All paths to the PD have been succefully removed - glog.V(5).Infof("Succesfully detached GCE PD %q.", pd.pdName) - return - } + glog.V(5).Infof("detachDiskAndVerify for pd %q done clearing chans.", pd.pdName) + return + } + } + }() - // Sleep then check again - glog.V(5).Infof("Waiting for GCE PD %q to detach.", pd.pdName) - time.Sleep(checkSleepDuration) + devicePaths := getDiskByIdPaths(pd) + var gce cloudprovider.Interface + for numRetries := 0; numRetries < maxRetries; numRetries++ { + if gce == nil { + var err error + gce, err = cloudprovider.GetCloudProvider("gce", nil) + if err != nil || gce == nil { + // Retry on error. See issue #11321 + glog.Errorf("Error getting GCECloudProvider while detaching PD %q: %v", pd.pdName, err) + gce = nil + time.Sleep(errorSleepDuration) + continue } } - // Try detaching disk again - glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", pd.pdName) - if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil { - glog.Errorf("Error on retry detach PD %q: %v", pd.pdName, err) - return + if numRetries > 0 { + glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", pd.pdName) } + + if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil { + // Retry on error. See issue #11321. Continue and verify if disk is detached, because a + // previous detach operation may still succeed. + glog.Errorf("Error detaching PD %q: %v", pd.pdName, err) + } + + for numChecks := 0; numChecks < maxChecks; numChecks++ { + allPathsRemoved := true + for _, path := range devicePaths { + if err := udevadmChangeToDrive(path); err != nil { + // udevadm errors should not block disk detachment, log and continue + glog.Errorf("%v", err) + } + if exists, err := pathExists(path); err != nil { + // Retry on error. See issue #11321 + glog.Errorf("Error checking if path exists: %v", err) + } else { + allPathsRemoved = allPathsRemoved && !exists + } + } + if allPathsRemoved { + // All paths to the PD have been succefully removed + glog.Infof("Succesfully detached GCE PD %q.", pd.pdName) + return + } + + // Sleep then check again + glog.V(3).Infof("Waiting for GCE PD %q to detach.", pd.pdName) + time.Sleep(checkSleepDuration) + } + } - glog.Errorf("Could not detach GCE PD %q. One or more mount paths was not removed.", pd.pdName) + glog.Errorf("Failed to detach GCE PD %q. One or more mount paths was not removed.", pd.pdName) } // Returns list of all /dev/disk/by-id/* paths for given PD. @@ -326,7 +361,7 @@ func (mounter *gceSafeFormatAndMount) Mount(source string, target string, fstype cmd := mounter.runner.Command("/usr/share/google/safe_format_and_mount", args...) dataOut, err := cmd.CombinedOutput() if err != nil { - glog.V(5).Infof("error running /usr/share/google/safe_format_and_mount\n%s", string(dataOut)) + glog.Errorf("error running /usr/share/google/safe_format_and_mount\n%s", string(dataOut)) } return err } diff --git a/test/e2e/framework.go b/test/e2e/framework.go index 092eeffc82b..d5a430f2f29 100644 --- a/test/e2e/framework.go +++ b/test/e2e/framework.go @@ -17,7 +17,9 @@ limitations under the License. package e2e import ( + "bytes" "fmt" + "strings" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -155,3 +157,50 @@ func (f *Framework) WaitForAnEndpoint(serviceName string) error { } } } + +// Write a file using kubectl exec echo > via specified container +// Because of the primitive technique we're using here, we only allow ASCII alphanumeric characters +func (f *Framework) WriteFileViaContainer(podName, containerName string, path string, contents string) error { + By("writing a file in the container") + allowedCharacters := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + for _, c := range contents { + if !strings.ContainsRune(allowedCharacters, c) { + return fmt.Errorf("Unsupported character in string to write: %v", c) + } + } + command := fmt.Sprintf("echo '%s' > '%s'", contents, path) + stdout, stderr, err := kubectlExec(f.Namespace.Name, podName, containerName, "--", "/bin/sh", "-c", command) + if err != nil { + Logf("error running kubectl exec to write file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr)) + } + return err +} + +// Read a file using kubectl exec cat +func (f *Framework) ReadFileViaContainer(podName, containerName string, path string) (string, error) { + By("reading a file in the container") + + stdout, stderr, err := kubectlExec(f.Namespace.Name, podName, containerName, "--", "cat", path) + if err != nil { + Logf("error running kubectl exec to read file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr)) + } + return string(stdout), err +} + +func kubectlExec(namespace string, podName, containerName string, args ...string) ([]byte, []byte, error) { + var stdout, stderr bytes.Buffer + cmdArgs := []string{ + "exec", + fmt.Sprintf("--namespace=%v", namespace), + podName, + fmt.Sprintf("-c=%v", containerName), + } + cmdArgs = append(cmdArgs, args...) + + cmd := kubectlCmd(cmdArgs...) + cmd.Stdout, cmd.Stderr = &stdout, &stderr + + Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args, " ")) + err := cmd.Run() + return stdout.Bytes(), stderr.Bytes(), err +} diff --git a/test/e2e/pd.go b/test/e2e/pd.go index 0236f9b6fef..c49147028c1 100644 --- a/test/e2e/pd.go +++ b/test/e2e/pd.go @@ -23,9 +23,9 @@ import ( "strings" "time" - "bytes" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" @@ -55,6 +55,8 @@ var _ = Describe("Pod Disks", func() { host0Name = nodes.Items[0].ObjectMeta.Name host1Name = nodes.Items[1].ObjectMeta.Name + + math_rand.Seed(time.Now().UTC().UnixNano()) }) It("should schedule a pod w/ a RW PD, remove it, then schedule it on another host", func() { @@ -64,8 +66,8 @@ var _ = Describe("Pod Disks", func() { diskName, err := createPD() expectNoError(err, "Error creating PD") - host0Pod := testPDPod(diskName, host0Name, false) - host1Pod := testPDPod(diskName, host1Name, false) + host0Pod := testPDPod(diskName, host0Name, false /* readOnly */, 1 /* numContainers */) + host1Pod := testPDPod(diskName, host1Name, false /* readOnly */, 1 /* numContainers */) defer func() { By("cleaning up PD-RW test environment") @@ -87,7 +89,7 @@ var _ = Describe("Pod Disks", func() { testFile := "/testpd/tracker" testFileContents := fmt.Sprintf("%v", math_rand.Int()) - expectNoError(writeFileOnPod(framework.Client, host0Pod.Name, testFile, testFileContents)) + expectNoError(framework.WriteFileViaContainer(host0Pod.Name, "testpd" /* containerName */, testFile, testFileContents)) Logf("Wrote value: %v", testFileContents) By("deleting host0Pod") @@ -99,7 +101,7 @@ var _ = Describe("Pod Disks", func() { expectNoError(framework.WaitForPodRunning(host1Pod.Name)) - v, err := readFileOnPod(framework.Client, host1Pod.Name, testFile) + v, err := framework.ReadFileViaContainer(host1Pod.Name, "testpd", testFile) expectNoError(err) Logf("Read value: %v", v) @@ -109,15 +111,7 @@ var _ = Describe("Pod Disks", func() { expectNoError(podClient.Delete(host1Pod.Name, nil), "Failed to delete host1Pod") By(fmt.Sprintf("deleting PD %q", diskName)) - for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) { - if err = deletePD(diskName); err != nil { - Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err) - continue - } - Logf("Deleted PD %v", diskName) - break - } - expectNoError(err, "Error deleting PD") + deletePDWithRetry(diskName) return }) @@ -129,9 +123,9 @@ var _ = Describe("Pod Disks", func() { diskName, err := createPD() expectNoError(err, "Error creating PD") - rwPod := testPDPod(diskName, host0Name, false) - host0ROPod := testPDPod(diskName, host0Name, true) - host1ROPod := testPDPod(diskName, host1Name, true) + rwPod := testPDPod(diskName, host0Name, false /* readOnly */, 1 /* numContainers */) + host0ROPod := testPDPod(diskName, host0Name, true /* readOnly */, 1 /* numContainers */) + host1ROPod := testPDPod(diskName, host1Name, true /* readOnly */, 1 /* numContainers */) defer func() { By("cleaning up PD-RO test environment") @@ -171,58 +165,89 @@ var _ = Describe("Pod Disks", func() { expectNoError(podClient.Delete(host1ROPod.Name, nil), "Failed to delete host1ROPod") By(fmt.Sprintf("deleting PD %q", diskName)) - for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) { - if err = deletePD(diskName); err != nil { - Logf("Couldn't delete PD. Sleeping 5 seconds") - continue - } - Logf("Successfully deleted PD %q", diskName) - break - } + deletePDWithRetry(diskName) + expectNoError(err, "Error deleting PD") }) + + It("should schedule a pod w/ a RW PD shared between multiple containers, write to PD, delete pod, verify contents, and repeat in rapid succession", func() { + SkipUnlessProviderIs("gce", "gke", "aws") + + By("creating PD") + diskName, err := createPD() + expectNoError(err, "Error creating PD") + numContainers := 4 + + host0Pod := testPDPod(diskName, host0Name, false /* readOnly */, numContainers) + + defer func() { + By("cleaning up PD-RW test environment") + // Teardown pods, PD. Ignore errors. + // Teardown should do nothing unless test failed. + podClient.Delete(host0Pod.Name, nil) + detachPD(host0Name, diskName) + deletePD(diskName) + }() + + fileAndContentToVerify := make(map[string]string) + for i := 0; i < 3; i++ { + Logf("PD Read/Writer Iteration #%v", i) + By("submitting host0Pod to kubernetes") + _, err = podClient.Create(host0Pod) + expectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err)) + + expectNoError(framework.WaitForPodRunning(host0Pod.Name)) + + // randomly select a container and read/verify pd contents from it + containerName := fmt.Sprintf("testpd%v", math_rand.Intn(numContainers)+1) + verifyPDContentsViaContainer(framework, host0Pod.Name, containerName, fileAndContentToVerify) + + // Randomly select a container to write a file to PD from + containerName = fmt.Sprintf("testpd%v", math_rand.Intn(numContainers)+1) + testFile := fmt.Sprintf("/testpd/tracker%v", i) + testFileContents := fmt.Sprintf("%v", math_rand.Int()) + fileAndContentToVerify[testFile] = testFileContents + expectNoError(framework.WriteFileViaContainer(host0Pod.Name, containerName, testFile, testFileContents)) + Logf("Wrote value: \"%v\" to PD %q from pod %q container %q", testFileContents, diskName, host0Pod.Name, containerName) + + // Randomly select a container and read/verify pd contents from it + containerName = fmt.Sprintf("testpd%v", math_rand.Intn(numContainers)+1) + verifyPDContentsViaContainer(framework, host0Pod.Name, containerName, fileAndContentToVerify) + + By("deleting host0Pod") + expectNoError(podClient.Delete(host0Pod.Name, nil), "Failed to delete host0Pod") + } + + By(fmt.Sprintf("deleting PD %q", diskName)) + deletePDWithRetry(diskName) + + return + }) }) -func kubectlExec(namespace string, podName string, args ...string) ([]byte, []byte, error) { - var stdout, stderr bytes.Buffer - cmdArgs := []string{"exec", fmt.Sprintf("--namespace=%v", namespace), podName} - cmdArgs = append(cmdArgs, args...) - - cmd := kubectlCmd(cmdArgs...) - cmd.Stdout, cmd.Stderr = &stdout, &stderr - - Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args, " ")) - err := cmd.Run() - return stdout.Bytes(), stderr.Bytes(), err -} - -// Write a file using kubectl exec echo > -// Because of the primitive technique we're using here, we only allow ASCII alphanumeric characters -func writeFileOnPod(c *client.Client, podName string, path string, contents string) error { - By("writing a file in the container") - allowedCharacters := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" - for _, c := range contents { - if !strings.ContainsRune(allowedCharacters, c) { - return fmt.Errorf("Unsupported character in string to write: %v", c) +func deletePDWithRetry(diskName string) { + var err error + for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) { + if err = deletePD(diskName); err != nil { + Logf("Couldn't delete PD %q. Sleeping 5 seconds (%v)", diskName, err) + continue } + Logf("Deleted PD %v", diskName) + break } - command := fmt.Sprintf("echo '%s' > '%s'", contents, path) - stdout, stderr, err := kubectlExec(api.NamespaceDefault, podName, "--", "/bin/sh", "-c", command) - if err != nil { - Logf("error running kubectl exec to write file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr)) - } - return err + expectNoError(err, "Error deleting PD") } -// Read a file using kubectl exec cat -func readFileOnPod(c *client.Client, podName string, path string) (string, error) { - By("reading a file in the container") - - stdout, stderr, err := kubectlExec(api.NamespaceDefault, podName, "--", "cat", path) - if err != nil { - Logf("error running kubectl exec to read file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr)) +func verifyPDContentsViaContainer(f *Framework, podName, containerName string, fileAndContentToVerify map[string]string) { + for filePath, expectedContents := range fileAndContentToVerify { + v, err := f.ReadFileViaContainer(podName, containerName, filePath) + if err != nil { + Logf("Error reading file: %v", err) + } + expectNoError(err) + Logf("Read file %q with content: %v", filePath, v) + Expect(strings.TrimSpace(v)).To(Equal(strings.TrimSpace(expectedContents))) } - return string(stdout), err } func createPD() (string, error) { @@ -284,7 +309,30 @@ func detachPD(hostName, pdName string) error { } } -func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod { +func testPDPod(diskName, targetHost string, readOnly bool, numContainers int) *api.Pod { + containers := make([]api.Container, numContainers) + for i := range containers { + containers[i].Name = "testpd" + if numContainers > 1 { + containers[i].Name = fmt.Sprintf("testpd%v", i+1) + } + + containers[i].Image = "gcr.io/google_containers/busybox" + + containers[i].Command = []string{"sleep", "6000"} + + containers[i].VolumeMounts = []api.VolumeMount{ + { + Name: "testpd", + MountPath: "/testpd", + }, + } + + containers[i].Resources.Limits = api.ResourceList{} + containers[i].Resources.Limits[api.ResourceCPU] = *resource.NewQuantity(int64(0), resource.DecimalSI) + + } + pod := &api.Pod{ TypeMeta: api.TypeMeta{ Kind: "Pod", @@ -294,20 +342,8 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod { Name: "pd-test-" + string(util.NewUUID()), }, Spec: api.PodSpec{ - Containers: []api.Container{ - { - Name: "testpd", - Image: "gcr.io/google_containers/busybox", - Command: []string{"sleep", "600"}, - VolumeMounts: []api.VolumeMount{ - { - Name: "testpd", - MountPath: "/testpd", - }, - }, - }, - }, - NodeName: targetHost, + Containers: containers, + NodeName: targetHost, }, }