test: Add node e2e test to verify static pod termination

Add node e2e test to verify that static pods can be started after a
previous static pod with the same config temporarily failed termination.

The scenario is:

1. Static pod is started
2. Static pod is deleted
3. Static pod termination fails (internally `syncTerminatedPod` fails)
4. At later time, pod termination should succeed
5. New static pod with the same config is (re)-added
6. New static pod is expected to start successfully

To repro this scenario, setup a pod using a NFS mount. The NFS server is
stopped which will result in volumes failing to unmount and
`syncTerminatedPod` to fail. The NFS server is later started, allowing
the volume to unmount successfully.

xref:

1. https://github.com/kubernetes/kubernetes/pull/113145#issuecomment-1289587988
2. https://github.com/kubernetes/kubernetes/pull/113065
3. https://github.com/kubernetes/kubernetes/pull/113093

Signed-off-by: David Porter <david@porter.me>
This commit is contained in:
David Porter 2022-11-01 16:53:12 -07:00 committed by Clayton Coleman
parent 1c75c2cda8
commit c5a1f0188b
No known key found for this signature in database
GPG Key ID: CF7DB7FC943D3E0E
4 changed files with 197 additions and 13 deletions

View File

@ -121,6 +121,15 @@ func LookForStringInLog(ns, podName, container, expectedString string, timeout t
})
}
// LookForStringInLogWithoutKubectl looks for the given string in the log of a specific pod container
func LookForStringInLogWithoutKubectl(ctx context.Context, client clientset.Interface, ns string, podName string, container string, expectedString string, timeout time.Duration) (result string, err error) {
return lookForString(expectedString, timeout, func() string {
podLogs, err := e2epod.GetPodLogs(ctx, client, ns, podName, container)
framework.ExpectNoError(err)
return podLogs
})
}
// CreateEmptyFileOnPod creates empty file at given path on the pod.
func CreateEmptyFileOnPod(namespace string, podName string, filePath string) error {
_, err := e2ekubectl.RunKubectl(namespace, "exec", podName, "--", "/bin/sh", "-c", fmt.Sprintf("touch %s", filePath))

View File

@ -149,6 +149,10 @@ type Test struct {
// NewNFSServer is a NFS-specific wrapper for CreateStorageServer.
func NewNFSServer(ctx context.Context, cs clientset.Interface, namespace string, args []string) (config TestConfig, pod *v1.Pod, host string) {
return NewNFSServerWithNodeName(ctx, cs, namespace, args, "")
}
func NewNFSServerWithNodeName(ctx context.Context, cs clientset.Interface, namespace string, args []string, nodeName string) (config TestConfig, pod *v1.Pod, host string) {
config = TestConfig{
Namespace: namespace,
Prefix: "nfs",
@ -157,6 +161,10 @@ func NewNFSServer(ctx context.Context, cs clientset.Interface, namespace string,
ServerVolumes: map[string]string{"": "/exports"},
ServerReadyMessage: "NFS started",
}
if nodeName != "" {
config.ClientNodeSelection = e2epod.NodeSelection{Name: nodeName}
}
if len(args) > 0 {
config.ServerArgs = args
}
@ -329,6 +337,10 @@ func startVolumeServer(ctx context.Context, client clientset.Interface, config T
},
}
if config.ClientNodeSelection.Name != "" {
serverPod.Spec.NodeName = config.ClientNodeSelection.Name
}
var pod *v1.Pod
serverPod, err := podClient.Create(ctx, serverPod, metav1.CreateOptions{})
// ok if the server pod already exists. TODO: make this controllable by callers
@ -355,7 +367,7 @@ func startVolumeServer(ctx context.Context, client clientset.Interface, config T
}
}
if config.ServerReadyMessage != "" {
_, err := e2epodoutput.LookForStringInLog(pod.Namespace, pod.Name, serverPodName, config.ServerReadyMessage, VolumeServerPodStartupTimeout)
_, err := e2epodoutput.LookForStringInLogWithoutKubectl(ctx, client, pod.Namespace, pod.Name, serverPodName, config.ServerReadyMessage, VolumeServerPodStartupTimeout)
framework.ExpectNoError(err, "Failed to find %q in pod logs: %s", config.ServerReadyMessage, err)
}
return pod

View File

@ -40,11 +40,13 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"k8s.io/cli-runtime/pkg/printers"
e2evolume "k8s.io/kubernetes/test/e2e/framework/volume"
)
var _ = SIGDescribe("MirrorPod", func() {
f := framework.NewDefaultFramework("mirror-pod")
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelBaseline
f.NamespacePodSecurityEnforceLevel = admissionapi.LevelPrivileged
ginkgo.Context("when create a mirror pod ", func() {
var ns, podPath, staticPodName, mirrorPodName string
ginkgo.BeforeEach(func(ctx context.Context) {
@ -196,8 +198,179 @@ var _ = SIGDescribe("MirrorPod", func() {
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
})
})
ginkgo.Context("when recreating a static pod", func() {
var ns, podPath, staticPodName, mirrorPodName string
ginkgo.It("it should launch successfully even if it temporarily failed termination due to volume failing to unmount [NodeConformance] [Serial]", func(ctx context.Context) {
node := getNodeName(ctx, f)
ns = f.Namespace.Name
c := f.ClientSet
nfsTestConfig, nfsServerPod, nfsServerHost := e2evolume.NewNFSServerWithNodeName(ctx, c, ns, []string{"-G", "777", "/exports"}, node)
ginkgo.DeferCleanup(func(ctx context.Context) {
framework.Logf("Cleaning up NFS server pod")
e2evolume.TestServerCleanup(ctx, f, nfsTestConfig)
})
podPath = framework.TestContext.KubeletConfig.StaticPodPath
staticPodName = "static-pod-nfs-test-pod" + string(uuid.NewUUID())
mirrorPodName = staticPodName + "-" + framework.TestContext.NodeName
ginkgo.By(fmt.Sprintf("Creating nfs test pod: %s", staticPodName))
err := createStaticPodUsingNfs(nfsServerHost, node, "sleep 999999", podPath, staticPodName, ns)
framework.ExpectNoError(err)
ginkgo.By(fmt.Sprintf("Wating for nfs test pod: %s to start running...", staticPodName))
gomega.Eventually(func() error {
return checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
mirrorPod, err := c.CoreV1().Pods(ns).Get(ctx, mirrorPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
hash, ok := mirrorPod.Annotations[kubetypes.ConfigHashAnnotationKey]
if !ok || hash == "" {
framework.Failf("Failed to get hash for mirrorPod")
}
ginkgo.By("Stopping the NFS server")
stopNfsServer(f, nfsServerPod)
ginkgo.By("Waiting for NFS server to stop...")
time.Sleep(30 * time.Second)
ginkgo.By(fmt.Sprintf("Deleting the static nfs test pod: %s", staticPodName))
err = deleteStaticPod(podPath, staticPodName, ns)
framework.ExpectNoError(err)
// Wait 5 mins for syncTerminatedPod to fail. We expect that the pod volume should not be cleaned up because the NFS server is down.
gomega.Consistently(func() bool {
return podVolumeDirectoryExists(types.UID(hash))
}, 5*time.Minute, 10*time.Second).Should(gomega.BeTrue(), "pod volume should exist while nfs server is stopped")
ginkgo.By("Start the NFS server")
restartNfsServer(f, nfsServerPod)
ginkgo.By("Waiting for the pod volume to deleted after the NFS server is started")
gomega.Eventually(func() bool {
return podVolumeDirectoryExists(types.UID(hash))
}, 5*time.Minute, 10*time.Second).Should(gomega.BeFalse(), "pod volume should be deleted after nfs server is started")
// Create the static pod again with the same config and expect it to start running
err = createStaticPodUsingNfs(nfsServerHost, node, "sleep 999999", podPath, staticPodName, ns)
framework.ExpectNoError(err)
ginkgo.By(fmt.Sprintf("Wating for nfs test pod: %s to start running (after being recreated)", staticPodName))
gomega.Eventually(func() error {
return checkMirrorPodRunning(ctx, f.ClientSet, mirrorPodName, ns)
}, 5*time.Minute, 5*time.Second).Should(gomega.BeNil())
})
ginkgo.AfterEach(func(ctx context.Context) {
ginkgo.By("delete the static pod")
err := deleteStaticPod(podPath, staticPodName, ns)
framework.ExpectNoError(err)
ginkgo.By("wait for the mirror pod to disappear")
gomega.Eventually(ctx, func(ctx context.Context) error {
return checkMirrorPodDisappear(ctx, f.ClientSet, mirrorPodName, ns)
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
})
})
})
func podVolumeDirectoryExists(uid types.UID) bool {
podVolumePath := fmt.Sprintf("/var/lib/kubelet/pods/%s/volumes/", uid)
var podVolumeDirectoryExists bool
if _, err := os.Stat(podVolumePath); !os.IsNotExist(err) {
podVolumeDirectoryExists = true
}
return podVolumeDirectoryExists
}
// Restart the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 1` command in the
// pod's (only) container. This command changes the number of nfs server threads from
// (presumably) zero back to 1, and therefore allows nfs to open connections again.
func restartNfsServer(f *framework.Framework, serverPod *v1.Pod) {
const startcmd = "/usr/sbin/rpc.nfsd 1"
_, _, err := e2evolume.PodExec(f, serverPod, startcmd)
framework.ExpectNoError(err)
}
// Stop the passed-in nfs-server by issuing a `/usr/sbin/rpc.nfsd 0` command in the
// pod's (only) container. This command changes the number of nfs server threads to 0,
// thus closing all open nfs connections.
func stopNfsServer(f *framework.Framework, serverPod *v1.Pod) {
const stopcmd = "/usr/sbin/rpc.nfsd 0"
_, _, err := e2evolume.PodExec(f, serverPod, stopcmd)
framework.ExpectNoError(err)
}
func createStaticPodUsingNfs(nfsIP string, nodeName string, cmd string, dir string, name string, ns string) error {
ginkgo.By("create pod using nfs volume")
isPrivileged := true
cmdLine := []string{"-c", cmd}
pod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: v1.PodSpec{
NodeName: nodeName,
Containers: []v1.Container{
{
Name: "pod-nfs-vol",
Image: imageutils.GetE2EImage(imageutils.BusyBox),
Command: []string{"/bin/sh"},
Args: cmdLine,
VolumeMounts: []v1.VolumeMount{
{
Name: "nfs-vol",
MountPath: "/mnt",
},
},
SecurityContext: &v1.SecurityContext{
Privileged: &isPrivileged,
},
},
},
RestartPolicy: v1.RestartPolicyNever, //don't restart pod
Volumes: []v1.Volume{
{
Name: "nfs-vol",
VolumeSource: v1.VolumeSource{
NFS: &v1.NFSVolumeSource{
Server: nfsIP,
Path: "/exports",
ReadOnly: false,
},
},
},
},
},
}
file := staticPodPath(dir, name, ns)
f, err := os.OpenFile(file, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0666)
if err != nil {
return err
}
defer f.Close()
y := printers.YAMLPrinter{}
y.PrintObj(pod, f)
return nil
}
func staticPodPath(dir, name, namespace string) string {
return filepath.Join(dir, namespace+"-"+name+".yaml")
}

View File

@ -87,15 +87,6 @@ func (n *NodeE2ERemote) SetupTestPackage(tardir, systemSpecName string) error {
return nil
}
// prependCOSMounterFlag prepends the flag for setting the GCI mounter path to
// args and returns the result.
func prependCOSMounterFlag(args, host, workspace string) (string, error) {
klog.V(2).Infof("GCI/COS node and GCI/COS mounter both detected, modifying --experimental-mounter-path accordingly")
mounterPath := filepath.Join(workspace, "mounter")
args = fmt.Sprintf("--kubelet-flags=--experimental-mounter-path=%s ", mounterPath) + args
return args, nil
}
// prependMemcgNotificationFlag prepends the flag for enabling memcg
// notification to args and returns the result.
func prependMemcgNotificationFlag(args string) string {
@ -124,8 +115,7 @@ func osSpecificActions(args, host, workspace string) (string, error) {
return args, setKubeletSELinuxLabels(host, workspace)
case strings.Contains(output, "gci"), strings.Contains(output, "cos"):
args = prependMemcgNotificationFlag(args)
args = prependGCPCredentialProviderFlag(args, workspace)
return prependCOSMounterFlag(args, host, workspace)
return prependGCPCredentialProviderFlag(args, workspace), nil
case strings.Contains(output, "ubuntu"):
args = prependGCPCredentialProviderFlag(args, workspace)
return prependMemcgNotificationFlag(args), nil