Merge pull request #41659 from jeffvance/kubelet-wedge2

Automatic merge from submit-queue

add kubelet tests to verify host clean up

**What this PR does / why we need it**:
Increasingly we're seeing more failures in persistent volume e2e tests where pv tests are run in parallel with disruptive tests. The quick solution is to tag the pv tests as Flaky. This pr addresses one cause of the flakiness and adds a disruptive kubelet test.
Once this pr is shown to not produce flakes the [Flaky] tag for the "HostCleanup" tests will be removed in a separate pr.
+ Adds volume tests to _kubelet.go_ motivated by issues [31272](https://github.com/kubernetes/kubernetes/issues/31272) and [37657](https://github.com/kubernetes/kubernetes/issues/37657)
+ Addresses  reverted pr [41178](https://github.com/kubernetes/kubernetes/pull/41178) and negates the need for pr [41229](https://github.com/kubernetes/kubernetes/pull/41229)

**Which issue this PR fixes** 
Adds regression tests to cover issues: #31272 and #37657

**Special notes for your reviewer**:
It's possible that one of the new tests, which relies on the existence of _/usr/sbin/rpc.nfsd_ in the nfs-server pod, will not work in the GCI container env. If this turns out to be true then I will add a `SkipIfProviderIs("gke")` to the `It` block.

**Release note**:
```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-04-13 07:12:15 -07:00 committed by GitHub
commit b0a724173a
2 changed files with 114 additions and 54 deletions

View File

@ -412,14 +412,13 @@ func DeletePodWithWait(f *Framework, c clientset.Interface, pod *v1.Pod) {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
} }
// wait for pod to terminate. Expect apierr NotFound // wait for pod to terminate
err = f.WaitForPodTerminated(pod.Name, "") err = f.WaitForPodTerminated(pod.Name, "")
Expect(err).To(HaveOccurred()) if err != nil {
if !apierrs.IsNotFound(err) { Expect(apierrs.IsNotFound(err)).To(BeTrue(), fmt.Sprintf("Expected 'IsNotFound' error deleting pod \"%v/%v\", instead got: %v", pod.Namespace, pod.Name, err))
Logf("Error! Expected IsNotFound error deleting pod %q, instead got: %v", pod.Name, err) Logf("Ignore \"not found\" error above")
Expect(apierrs.IsNotFound(err)).To(BeTrue())
} }
Logf("Ignore \"not found\" error above. Pod %v successfully deleted", pod.Name) Logf("Pod %q successfully deleted", pod.Name)
} }
// Sanity check for GCE testing. Verify the persistent disk attached to the node. // Sanity check for GCE testing. Verify the persistent disk attached to the node.
@ -616,17 +615,13 @@ func deletePD(pdName string) error {
} }
} }
// Create the test pod, wait for (hopefully) success, and then delete the pod. // Create the test pod, wait for success, and then delete the pod.
func CreateWaitAndDeletePod(f *Framework, c clientset.Interface, ns string, pvc *v1.PersistentVolumeClaim) { func CreateWaitAndDeletePod(f *Framework, c clientset.Interface, ns string, pvc *v1.PersistentVolumeClaim) {
Logf("Creating nfs test pod") Logf("Creating nfs test pod")
// Make pod spec
pod := MakeWritePod(ns, pvc) pod := MakeWritePod(ns, pvc)
// Instantiate pod (Create)
runPod, err := c.CoreV1().Pods(ns).Create(pod) runPod, err := c.CoreV1().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(runPod).NotTo(BeNil()) Expect(runPod).NotTo(BeNil())
defer DeletePodWithWait(f, c, runPod) defer DeletePodWithWait(f, c, runPod)
// Wait for the test pod to complete its lifecycle // Wait for the test pod to complete its lifecycle

View File

@ -104,7 +104,7 @@ func updateNodeLabels(c clientset.Interface, nodeNames sets.String, toAdd, toRem
var node *v1.Node var node *v1.Node
var err error var err error
for i := 0; i < maxRetries; i++ { for i := 0; i < maxRetries; i++ {
node, err = c.Core().Nodes().Get(nodeName, metav1.GetOptions{}) node, err = c.CoreV1().Nodes().Get(nodeName, metav1.GetOptions{})
if err != nil { if err != nil {
framework.Logf("Error getting node %s: %v", nodeName, err) framework.Logf("Error getting node %s: %v", nodeName, err)
continue continue
@ -119,7 +119,7 @@ func updateNodeLabels(c clientset.Interface, nodeNames sets.String, toAdd, toRem
delete(node.ObjectMeta.Labels, k) delete(node.ObjectMeta.Labels, k)
} }
} }
_, err = c.Core().Nodes().Update(node) _, err = c.CoreV1().Nodes().Update(node)
if err != nil { if err != nil {
framework.Logf("Error updating node %s: %v", nodeName, err) framework.Logf("Error updating node %s: %v", nodeName, err)
} else { } else {
@ -145,6 +145,26 @@ func createNfsServerPod(c clientset.Interface, config framework.VolumeTestConfig
return pod, ip return pod, ip
} }
// Restart the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 1` command in the
// pod's (only) container. This command changes the number of nfs server threads from
// (presumably) zero back to 1, and therefore allows nfs to open connections again.
func restartNfsServer(serverPod *v1.Pod) {
const startcmd = "/usr/sbin/rpc.nfsd 1"
ns := fmt.Sprintf("--namespace=%v", serverPod.Namespace)
framework.RunKubectlOrDie("exec", ns, serverPod.Name, "--", "/bin/sh", "-c", startcmd)
}
// Stop the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 0` command in the
// pod's (only) container. This command changes the number of nfs server threads to 0,
// thus closing all open nfs connections.
func stopNfsServer(serverPod *v1.Pod) {
const stopcmd = "/usr/sbin/rpc.nfsd 0"
ns := fmt.Sprintf("--namespace=%v", serverPod.Namespace)
framework.RunKubectlOrDie("exec", ns, serverPod.Name, "--", "/bin/sh", "-c", stopcmd)
}
// Creates a pod that mounts an nfs volume that is served by the nfs-server pod. The container // Creates a pod that mounts an nfs volume that is served by the nfs-server pod. The container
// will execute the passed in shell cmd. Waits for the pod to start. // will execute the passed in shell cmd. Waits for the pod to start.
// Note: the nfs plugin is defined inline, no PV or PVC. // Note: the nfs plugin is defined inline, no PV or PVC.
@ -196,18 +216,35 @@ func createPodUsingNfs(f *framework.Framework, c clientset.Interface, ns, nfsIP,
}, },
}, },
} }
rtnPod, err := c.Core().Pods(ns).Create(pod) rtnPod, err := c.CoreV1().Pods(ns).Create(pod)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
err = f.WaitForPodReady(rtnPod.Name) // running & ready err = f.WaitForPodReady(rtnPod.Name) // running & ready
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
rtnPod, err = c.Core().Pods(ns).Get(rtnPod.Name, metav1.GetOptions{}) // return fresh pod rtnPod, err = c.CoreV1().Pods(ns).Get(rtnPod.Name, metav1.GetOptions{}) // return fresh pod
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
return rtnPod return rtnPod
} }
// move the passed-in pod's UID directory to /tmp.
func movePodUidDir(c clientset.Interface, pod *v1.Pod) {
dest := "/tmp"
podDir := filepath.Join("/var/lib/kubelet/pods", string(pod.UID))
cmd := fmt.Sprintf("mv %v %v", podDir, dest)
// use ip rather than hostname in GCE
nodeIP, err := framework.GetHostExternalAddress(c, pod)
Expect(err).NotTo(HaveOccurred())
// excute cmd over ssh
result, _ := nodeExec(nodeIP, cmd)
framework.LogSSHResult(result)
Expect(result.Code).To(BeZero())
Expect(len(result.Stderr)).To(BeZero())
}
// Checks for a lingering nfs mount and/or uid directory on the pod's host. The host IP is used // Checks for a lingering nfs mount and/or uid directory on the pod's host. The host IP is used
// so that this test runs in GCE, where it appears that SSH cannot resolve the hostname. // so that this test runs in GCE, where it appears that SSH cannot resolve the hostname.
// If expectClean is true then we expect the node to be cleaned up and thus commands like // If expectClean is true then we expect the node to be cleaned up and thus commands like
@ -218,42 +255,48 @@ func checkPodCleanup(c clientset.Interface, pod *v1.Pod, expectClean bool) {
timeout := 5 * time.Minute timeout := 5 * time.Minute
poll := 20 * time.Second poll := 20 * time.Second
podUID := string(pod.UID) podDir := filepath.Join("/var/lib/kubelet/pods", string(pod.UID))
podDir := filepath.Join("/var/lib/kubelet/pods", podUID)
mountDir := filepath.Join(podDir, "volumes", "kubernetes.io~nfs") mountDir := filepath.Join(podDir, "volumes", "kubernetes.io~nfs")
// use ip rather than hostname in GCE // use ip rather than hostname in GCE
nodeIP, err := framework.GetHostExternalAddress(c, pod) nodeIP, err := framework.GetHostExternalAddress(c, pod)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
condMsg := map[bool]string{ condMsg := "deleted"
true: "deleted", if !expectClean {
false: "present", condMsg = "present"
} }
// table of host tests to perform // table of host tests to perform (order may matter so not using a map)
tests := map[string]string{ //["what-to-test"] "remote-command" type testT struct {
"pod UID directory": fmt.Sprintf("sudo ls %v", podDir), feature string // feature to test
"pod nfs mount": fmt.Sprintf("sudo mount | grep %v", mountDir), cmd string // remote command to execute on node
}
tests := []testT{
{
feature: "pod UID directory",
cmd: fmt.Sprintf("sudo ls %v", podDir),
},
{
feature: "pod nfs mount",
cmd: fmt.Sprintf("sudo mount | grep %v", mountDir),
},
} }
for test, cmd := range tests { for _, test := range tests {
framework.Logf("Wait up to %v for host's (%v) %q to be %v", timeout, nodeIP, test, condMsg[expectClean]) framework.Logf("Wait up to %v for host's (%v) %q to be %v", timeout, nodeIP, test.feature, condMsg)
err = wait.Poll(poll, timeout, func() (bool, error) { err = wait.Poll(poll, timeout, func() (bool, error) {
result, _ := nodeExec(nodeIP, cmd) result, _ := nodeExec(nodeIP, test.cmd)
framework.LogSSHResult(result) framework.LogSSHResult(result)
sawFiles := result.Code == 0 ok := (result.Code == 0 && len(result.Stdout) > 0 && len(result.Stderr) == 0)
if expectClean && sawFiles { // keep trying if expectClean && ok { // keep trying
return false, nil return false, nil
} }
if !expectClean && !sawFiles { // stop wait loop if !expectClean && !ok { // stop wait loop
return true, fmt.Errorf("%v is gone but expected to exist", test) return true, fmt.Errorf("%v is gone but expected to exist", test.feature)
} }
return true, nil // done, host is as expected return true, nil // done, host is as expected
}) })
if err != nil { Expect(err).NotTo(HaveOccurred(), fmt.Sprintf("Host (%v) cleanup error: %v. Expected %q to be %v", nodeIP, err, test.feature, condMsg))
framework.Logf("Host (%v) cleanup error: %v. Expected %q to be %v", nodeIP, err, test, condMsg[expectClean])
Expect(err).NotTo(HaveOccurred())
}
} }
if expectClean { if expectClean {
@ -375,14 +418,20 @@ var _ = framework.KubeDescribe("kubelet", func() {
} }
}) })
// Delete nfs server pod after another pods accesses the mounted nfs volume. // Test host cleanup when disrupting the volume environment.
framework.KubeDescribe("host cleanup with volume mounts [HostCleanup][Flaky]", func() { framework.KubeDescribe("host cleanup with volume mounts [Volume][HostCleanup][Flaky]", func() {
type hostCleanupTest struct { type hostCleanupTest struct {
itDescr string itDescr string
podCmd string podCmd string
} }
Context("Host cleanup after pod using NFS mount is deleted [Volume][NFS]", func() { // Disrupt the nfs-server pod after a client pod accesses the nfs volume.
// Note: the nfs-server is stopped NOT deleted. This is done to preserve its ip addr.
// If the nfs-server pod is deleted the client pod's mount can not be unmounted.
// If the nfs-server pod is deleted and re-created, due to having a different ip
// addr, the client pod's mount still cannot be unmounted.
Context("Host cleanup after disrupting NFS volume [NFS]", func() {
// issue #31272 // issue #31272
var ( var (
nfsServerPod *v1.Pod nfsServerPod *v1.Pod
@ -394,16 +443,17 @@ var _ = framework.KubeDescribe("kubelet", func() {
// fill in test slice for this context // fill in test slice for this context
testTbl := []hostCleanupTest{ testTbl := []hostCleanupTest{
{ {
itDescr: "after deleting the nfs-server, the host should be cleaned-up when deleting sleeping pod which mounts an NFS vol", itDescr: "after stopping the nfs-server and deleting the (sleeping) client pod, the NFS mount and the pod's UID directory should be removed.",
podCmd: "sleep 6000", podCmd: "sleep 6000", // keep pod running
}, },
{ {
itDescr: "after deleting the nfs-server, the host should be cleaned-up when deleting a pod accessing the NFS vol", itDescr: "after stopping the nfs-server and deleting the (active) client pod, the NFS mount and the pod's UID directory should be removed.",
podCmd: "while true; do echo FeFieFoFum >>/mnt/SUCCESS; cat /mnt/SUCCESS; done", podCmd: "while true; do echo FeFieFoFum >>/mnt/SUCCESS; sleep 1; cat /mnt/SUCCESS; done",
}, },
} }
BeforeEach(func() { BeforeEach(func() {
framework.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
NFSconfig = framework.VolumeTestConfig{ NFSconfig = framework.VolumeTestConfig{
Namespace: ns, Namespace: ns,
Prefix: "nfs", Prefix: "nfs",
@ -420,31 +470,46 @@ var _ = framework.KubeDescribe("kubelet", func() {
}) })
// execute It blocks from above table of tests // execute It blocks from above table of tests
for _, test := range testTbl { for _, t := range testTbl {
t := test // local copy for closure It(t.itDescr, func() {
It(fmt.Sprintf("%v [Serial]", t.itDescr), func() {
// create a pod which uses the nfs server's volume
pod = createPodUsingNfs(f, c, ns, nfsIP, t.podCmd) pod = createPodUsingNfs(f, c, ns, nfsIP, t.podCmd)
By("Delete the NFS server pod") By("Stop the NFS server")
framework.DeletePodWithWait(f, c, nfsServerPod) stopNfsServer(nfsServerPod)
nfsServerPod = nil
By("Delete the pod mounted to the NFS volume") By("Delete the pod mounted to the NFS volume")
framework.DeletePodWithWait(f, c, pod) framework.DeletePodWithWait(f, c, pod)
// pod object is now stale, but is intentionally not nil // pod object is now stale, but is intentionally not nil
By("Check if host running deleted pod has been cleaned up -- expect not") By("Check if pod's host has been cleaned up -- expect not")
// expect the pod's host *not* to be cleaned up
checkPodCleanup(c, pod, false) checkPodCleanup(c, pod, false)
By("Recreate the nfs server pod") By("Restart the nfs server")
nfsServerPod, nfsIP = createNfsServerPod(c, NFSconfig) restartNfsServer(nfsServerPod)
By("Verify host running the deleted pod is now cleaned up") By("Verify host running the deleted pod is now cleaned up")
// expect the pod's host to be cleaned up
checkPodCleanup(c, pod, true) checkPodCleanup(c, pod, true)
}) })
} }
// Move a pod's uid dir to /tmp and delete the pod.
// Addresses issue #37657.
// Note: the pod's vol mount (as a side effect) ends up being moved to /tmp
// and can be unmounted via `umount -f`.
It("move NFS client pod's UID directory then delete pod", func() {
pod = createPodUsingNfs(f, c, ns, nfsIP, "sleep 6000")
By("Move pod's uid dir to /tmp")
movePodUidDir(c, pod)
By("Delete the pod mounted to the NFS volume")
framework.DeletePodWithWait(f, c, pod)
// pod object is now stale, but is intentionally not nil
// Note: the pod's nfs mount, now in /tmp, will not be unmounted
By("Verify host running the deleted pod is cleaned up")
checkPodCleanup(c, pod, true)
})
}) })
}) })
}) })