diff --git a/test/e2e/storage/persistent_volumes-disruptive.go b/test/e2e/storage/persistent_volumes-disruptive.go index fbe7ea0522f..8eb10f59229 100644 --- a/test/e2e/storage/persistent_volumes-disruptive.go +++ b/test/e2e/storage/persistent_volumes-disruptive.go @@ -18,10 +18,12 @@ package storage import ( "fmt" + "strings" "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" + apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/kubernetes/pkg/api/v1" @@ -58,10 +60,11 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", volLabel labels.Set selector *metav1.LabelSelector ) - BeforeEach(func() { // To protect the NFS volume pod from the kubelet restart, we isolate it on its own node. framework.SkipUnlessNodeCountIsAtLeast(MinNodes) + framework.SkipIfProviderIs("local") + c = f.ClientSet ns = f.Namespace.Name volLabel = labels.Set{framework.VolumeSelectorKey: ns} @@ -70,7 +73,7 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", // Start the NFS server pod. framework.Logf("[BeforeEach] Creating NFS Server Pod") nfsServerPod = initNFSserverPod(c, ns) - + framework.Logf("NFS server Pod %q created on Node %q", nfsServerPod.Name, nfsServerPod.Spec.NodeName) framework.Logf("[BeforeEach] Configuring PersistentVolume") nfsServerIP = nfsServerPod.Status.PodIP Expect(nfsServerIP).NotTo(BeEmpty()) @@ -105,11 +108,9 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", Expect(clientNodeIP).NotTo(BeEmpty()) } }) - AfterEach(func() { framework.DeletePodWithWait(f, c, nfsServerPod) }) - Context("when kubelet restarts", func() { var ( @@ -117,24 +118,21 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", pv *v1.PersistentVolume pvc *v1.PersistentVolumeClaim ) - BeforeEach(func() { framework.Logf("Initializing test spec") clientPod, pv, pvc = initTestCase(f, c, nfsPVconfig, pvcConfig, ns, clientNode.Name) }) - AfterEach(func() { framework.Logf("Tearing down test spec") tearDownTestCase(c, f, ns, clientPod, pvc, pv) pv, pvc, clientPod = nil, nil, nil }) - // Test table housing the It() title string and test spec. runTest is type testBody, defined at // the start of this file. To add tests, define a function mirroring the testBody signature and assign // to runTest. disruptiveTestTable := []disruptiveTest{ { - testItStmt: "Should test that a file written to the mount before kubelet restart can be read after restart.", + testItStmt: "Should test that a file written to the mount before kubelet restart is readable after restart.", runTest: testKubeletRestartsAndRestoresMount, }, { @@ -142,7 +140,6 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", runTest: testVolumeUnmountsFromDeletedPod, }, } - // Test loop executes each disruptiveTest iteratively. for _, test := range disruptiveTestTable { func(t disruptiveTest) { @@ -159,14 +156,16 @@ var _ = framework.KubeDescribe("PersistentVolumes [Volume][Disruptive][Flaky]", func testKubeletRestartsAndRestoresMount(c clientset.Interface, f *framework.Framework, clientPod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) { By("Writing to the volume.") file := "/mnt/_SUCCESS" - _, err := podExec(clientPod, fmt.Sprintf("touch %s", file)) + out, err := podExec(clientPod, fmt.Sprintf("touch %s", file)) + framework.Logf(out) Expect(err).NotTo(HaveOccurred()) By("Restarting kubelet") kubeletCommand(kRestart, c, clientPod) By("Testing that written file is accessible.") - _, err = podExec(clientPod, fmt.Sprintf("cat %s", file)) + out, err = podExec(clientPod, fmt.Sprintf("cat %s", file)) + framework.Logf(out) Expect(err).NotTo(HaveOccurred()) framework.Logf("Volume mount detected on pod %s and written file %s is readable post-restart.", clientPod.Name, file) } @@ -178,36 +177,61 @@ func testVolumeUnmountsFromDeletedPod(c clientset.Interface, f *framework.Framew nodeIP = nodeIP + ":22" By("Expecting the volume mount to be found.") - result, err := framework.SSH(fmt.Sprintf("mount| grep %s", string(clientPod.UID)), nodeIP, framework.TestContext.Provider) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Code).To(BeZero()) + result, err := framework.SSH(fmt.Sprintf("mount | grep %s", clientPod.UID), nodeIP, framework.TestContext.Provider) + framework.LogSSHResult(result) + Expect(err).NotTo(HaveOccurred(), "Encountered SSH error.") + Expect(result.Code).To(BeZero(), fmt.Sprintf("Expected grep exit code of 0, got %s", result.Code)) - By("Restarting the kubelet.") + By("Stopping the kubelet.") kubeletCommand(kStop, c, clientPod) - framework.ExpectNoError(framework.DeletePodWithWait(f, c, clientPod), "Failed to delete pod ", clientPod.Name) + defer func() { + if err != nil { + kubeletCommand(kStart, c, clientPod) + } + }() + By(fmt.Sprintf("Deleting Pod %q", clientPod.Name)) + err = c.Core().Pods(clientPod.Namespace).Delete(clientPod.Name, &metav1.DeleteOptions{}) + Expect(err).NotTo(HaveOccurred()) + By("Starting the kubelet and waiting for pod to delete.") kubeletCommand(kStart, c, clientPod) + err = f.WaitForPodTerminated(clientPod.Name, "") + if !apierrs.IsNotFound(err) && err != nil { + Expect(err).NotTo(HaveOccurred(), "Expected pod to terminate.") + } By("Expecting the volume mount not to be found.") - result, err = framework.SSH(fmt.Sprintf("mount| grep %s", string(clientPod.UID)), nodeIP, framework.TestContext.Provider) - Expect(err).NotTo(HaveOccurred()) - Expect(result.Code).NotTo(BeZero()) + result, err = framework.SSH(fmt.Sprintf("mount | grep %s", clientPod.UID), nodeIP, framework.TestContext.Provider) + framework.LogSSHResult(result) + Expect(err).NotTo(HaveOccurred(), "Encountered SSH error.") + Expect(result.Stdout).To(BeEmpty(), "Expected grep stdout to be empty (i.e. no mount found).") framework.Logf("Volume unmounted on node %s", clientPod.Spec.NodeName) } // initTestCase initializes spec resources (pv, pvc, and pod) and returns pointers to be consumed // by the test. func initTestCase(f *framework.Framework, c clientset.Interface, pvConfig framework.PersistentVolumeConfig, pvcConfig framework.PersistentVolumeClaimConfig, ns, nodeName string) (*v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) { - pv, pvc, err := framework.CreatePVPVC(c, pvConfig, pvcConfig, ns, false) + defer func() { + if err != nil { + framework.DeletePersistentVolumeClaim(c, pvc.Name, ns) + framework.DeletePersistentVolume(c, pv.Name) + } + }() Expect(err).NotTo(HaveOccurred()) pod := framework.MakePod(ns, []*v1.PersistentVolumeClaim{pvc}, true, "") pod.Spec.NodeName = nodeName - framework.Logf("Creating nfs client Pod %s on node %s", pod.Name, nodeName) + framework.Logf("Creating NFS client pod.") pod, err = c.CoreV1().Pods(ns).Create(pod) + framework.Logf("NFS client Pod %q created on Node %q", pod.Name, nodeName) Expect(err).NotTo(HaveOccurred()) + defer func() { + if err != nil { + framework.DeletePodWithWait(f, c, pod) + } + }() err = framework.WaitForPodRunningInNamespace(c, pod) - Expect(err).NotTo(HaveOccurred()) - + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Pod %q timed out waiting for phase: Running", pod.Name)) + // Return created api objects pod, err = c.CoreV1().Pods(ns).Get(pod.Name, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred()) pvc, err = c.CoreV1().PersistentVolumeClaims(ns).Get(pvc.Name, metav1.GetOptions{}) @@ -218,22 +242,36 @@ func initTestCase(f *framework.Framework, c clientset.Interface, pvConfig framew } // tearDownTestCase destroy resources created by initTestCase. -func tearDownTestCase(c clientset.Interface, f *framework.Framework, ns string, pod *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) { - framework.ExpectNoError(framework.DeletePodWithWait(f, c, pod), "tearDown: Failed to delete pod ", pod.Name) - framework.ExpectNoError(framework.DeletePersistentVolumeClaim(c, pvc.Name, ns), "tearDown: Failed to delete PVC ", pvc.Name) - framework.ExpectNoError(framework.DeletePersistentVolume(c, pv.Name), "tearDown: Failed to delete PV ", pv.Name) +func tearDownTestCase(c clientset.Interface, f *framework.Framework, ns string, client *v1.Pod, pvc *v1.PersistentVolumeClaim, pv *v1.PersistentVolume) { + // Ignore deletion errors. Failing on them will interrupt test cleanup. + framework.DeletePodWithWait(f, c, client) + framework.DeletePersistentVolumeClaim(c, pvc.Name, ns) + framework.DeletePersistentVolume(c, pv.Name) } -// kubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod. +// kubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod and waits +// for the desired statues.. +// - First issues the command via `systemctl` +// - If `systemctl` returns stderr "command not found, issues the command via `service` +// - If `service` also returns stderr "command not found", the test is aborted. // Allowed kubeletOps are `kStart`, `kStop`, and `kRestart` func kubeletCommand(kOp kubeletOpt, c clientset.Interface, pod *v1.Pod) { nodeIP, err := framework.GetHostExternalAddress(c, pod) Expect(err).NotTo(HaveOccurred()) nodeIP = nodeIP + ":22" - sshResult, err := framework.SSH("sudo /etc/init.d/kubelet "+string(kOp), nodeIP, framework.TestContext.Provider) - Expect(err).NotTo(HaveOccurred()) + systemctlCmd := fmt.Sprintf("sudo systemctl %s kubelet", string(kOp)) + framework.Logf("Attempting `%s`", systemctlCmd) + sshResult, err := framework.SSH(systemctlCmd, nodeIP, framework.TestContext.Provider) + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) framework.LogSSHResult(sshResult) - + if strings.Contains(sshResult.Stderr, "command not found") { + serviceCmd := fmt.Sprintf("sudo service kubelet %s", string(kOp)) + framework.Logf("Attempting `%s`", serviceCmd) + sshResult, err = framework.SSH(serviceCmd, nodeIP, framework.TestContext.Provider) + Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) + framework.LogSSHResult(sshResult) + } + Expect(sshResult.Code).To(BeZero(), "Failed to [%s] kubelet:\n%#v", string(kOp), sshResult) // On restart, waiting for node NotReady prevents a race condition where the node takes a few moments to leave the // Ready state which in turn short circuits WaitForNodeToBeReady() if kOp == kStop || kOp == kRestart {