diff --git a/test/e2e/framework/node/resource.go b/test/e2e/framework/node/resource.go index a57473bab83..9a71d4cd3d5 100644 --- a/test/e2e/framework/node/resource.go +++ b/test/e2e/framework/node/resource.go @@ -495,6 +495,16 @@ func hasNonblockingTaint(node *v1.Node, nonblockingTaints string) bool { return false } +// GetNodeHeartbeatTime returns the timestamp of the last status update of the node. +func GetNodeHeartbeatTime(node *v1.Node) metav1.Time { + for _, condition := range node.Status.Conditions { + if condition.Type == v1.NodeReady { + return condition.LastHeartbeatTime + } + } + return metav1.Time{} +} + // PodNodePairs return podNode pairs for all pods in a namespace func PodNodePairs(ctx context.Context, c clientset.Interface, ns string) ([]PodNode, error) { var result []PodNode diff --git a/test/e2e/framework/node/wait.go b/test/e2e/framework/node/wait.go index d87e59b3db5..cf8b3d388c9 100644 --- a/test/e2e/framework/node/wait.go +++ b/test/e2e/framework/node/wait.go @@ -22,6 +22,7 @@ import ( "regexp" "time" + "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -160,6 +161,23 @@ func WaitForNodeSchedulable(ctx context.Context, c clientset.Interface, name str return false } +// WaitForNodeHeartbeatAfter waits up to timeout for node to send the next +// heartbeat after the given timestamp. +// +// To ensure the node status is posted by a restarted kubelet process, +// after should be retrieved by [GetNodeHeartbeatTime] while the kubelet is down. +func WaitForNodeHeartbeatAfter(ctx context.Context, c clientset.Interface, name string, after metav1.Time, timeout time.Duration) { + framework.Logf("Waiting up to %v for node %s to send a heartbeat after %v", timeout, name, after) + gomega.Eventually(ctx, func() (time.Time, error) { + node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + framework.Logf("Couldn't get node %s", name) + return time.Time{}, err + } + return GetNodeHeartbeatTime(node).Time, nil + }, timeout, poll).Should(gomega.BeTemporally(">", after.Time), "Node %s didn't send a heartbeat", name) +} + // CheckReady waits up to timeout for cluster to has desired size and // there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes. func CheckReady(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) { diff --git a/test/e2e/storage/utils/pod.go b/test/e2e/storage/utils/pod.go index b971bd152eb..603f86e2ce8 100644 --- a/test/e2e/storage/utils/pod.go +++ b/test/e2e/storage/utils/pod.go @@ -24,7 +24,6 @@ import ( "path" "regexp" "strings" - "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -99,74 +98,50 @@ func StartPodLogs(ctx context.Context, f *framework.Framework, driverNamespace * // KubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod and waits // for the desired statues.. -// - First issues the command via `systemctl` -// - If `systemctl` returns stderr "command not found, issues the command via `service` -// - If `service` also returns stderr "command not found", the test is aborted. // Allowed kubeletOps are `KStart`, `KStop`, and `KRestart` func KubeletCommand(ctx context.Context, kOp KubeletOpt, c clientset.Interface, pod *v1.Pod) { - command := "" - systemctlPresent := false - kubeletPid := "" - nodeIP, err := getHostAddress(ctx, c, pod) framework.ExpectNoError(err) nodeIP = nodeIP + ":22" - framework.Logf("Checking if systemctl command is present") - sshResult, err := e2essh.SSH(ctx, "systemctl --version", nodeIP, framework.TestContext.Provider) - framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) - if !strings.Contains(sshResult.Stderr, "command not found") { - command = fmt.Sprintf("systemctl %s kubelet", string(kOp)) - systemctlPresent = true - } else { - command = fmt.Sprintf("service kubelet %s", string(kOp)) - } - + commandTemplate := "systemctl %s kubelet" sudoPresent := isSudoPresent(ctx, nodeIP, framework.TestContext.Provider) if sudoPresent { - command = fmt.Sprintf("sudo %s", command) + commandTemplate = "sudo " + commandTemplate } - if kOp == KRestart { - kubeletPid = getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent) + runCmd := func(cmd string) { + command := fmt.Sprintf(commandTemplate, cmd) + framework.Logf("Attempting `%s`", command) + sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) + framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) + e2essh.LogResult(sshResult) + gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", cmd, sshResult) } - framework.Logf("Attempting `%s`", command) - sshResult, err = e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) - framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) - e2essh.LogResult(sshResult) - gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", string(kOp), sshResult) - + if kOp == KStop || kOp == KRestart { + runCmd("stop") + } if kOp == KStop { - if ok := e2enode.WaitForNodeToBeNotReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { - framework.Failf("Node %s failed to enter NotReady state", pod.Spec.NodeName) - } + return } - if kOp == KRestart { - // Wait for a minute to check if kubelet Pid is getting changed - isPidChanged := false - for start := time.Now(); time.Since(start) < 1*time.Minute; time.Sleep(2 * time.Second) { - if ctx.Err() != nil { - framework.Fail("timed out waiting for Kubelet POD change") - } - kubeletPidAfterRestart := getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent) - if kubeletPid != kubeletPidAfterRestart { - isPidChanged = true - break - } - } - if !isPidChanged { - framework.Fail("Kubelet PID remained unchanged after restarting Kubelet") - } - framework.Logf("Noticed that kubelet PID is changed. Waiting for 30 Seconds for Kubelet to come back") - time.Sleep(30 * time.Second) + if kOp == KStart && getKubeletRunning(ctx, nodeIP) { + framework.Logf("Kubelet is already running on node %q", pod.Spec.NodeName) + // Just skip. Or we cannot get a new heartbeat in time. + return } - if kOp == KStart || kOp == KRestart { - // For kubelet start and restart operations, Wait until Node becomes Ready - if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { - framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName) - } + + node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + heartbeatTime := e2enode.GetNodeHeartbeatTime(node) + + runCmd("start") + // Wait for next heartbeat, which must be sent by the new kubelet process. + e2enode.WaitForNodeHeartbeatAfter(ctx, c, pod.Spec.NodeName, heartbeatTime, NodeStateTimeout) + // Then wait until Node with new process becomes Ready. + if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { + framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName) } } diff --git a/test/e2e/storage/utils/utils.go b/test/e2e/storage/utils/utils.go index 1dea4b13d44..e902d5224d2 100644 --- a/test/e2e/storage/utils/utils.go +++ b/test/e2e/storage/utils/utils.go @@ -73,24 +73,16 @@ func VerifyFSGroupInPod(f *framework.Framework, filePath, expectedFSGroup string gomega.Expect(expectedFSGroup).To(gomega.Equal(fsGroupResult), "Expected fsGroup of %s, got %s", expectedFSGroup, fsGroupResult) } -// getKubeletMainPid return the Main PID of the Kubelet Process -func getKubeletMainPid(ctx context.Context, nodeIP string, sudoPresent bool, systemctlPresent bool) string { - command := "" - if systemctlPresent { - command = "systemctl status kubelet | grep 'Main PID'" - } else { - command = "service kubelet status | grep 'Main PID'" - } - if sudoPresent { - command = fmt.Sprintf("sudo %s", command) - } +// getKubeletRunning return if the kubelet is running or not +func getKubeletRunning(ctx context.Context, nodeIP string) bool { + command := "systemctl show kubelet --property ActiveState --value" framework.Logf("Attempting `%s`", command) sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", nodeIP)) e2essh.LogResult(sshResult) - gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet PID") - gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet Main PID should not be Empty") - return sshResult.Stdout + gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet status") + gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet status should not be Empty") + return strings.TrimSpace(sshResult.Stdout) == "active" } // TestKubeletRestartsAndRestoresMount tests that a volume mounted to a pod remains mounted after a kubelet restarts @@ -104,6 +96,9 @@ func TestKubeletRestartsAndRestoresMount(ctx context.Context, c clientset.Interf ginkgo.By("Restarting kubelet") KubeletCommand(ctx, KRestart, c, clientPod) + ginkgo.By("Wait 20s for the volume to become stable") + time.Sleep(20 * time.Second) + ginkgo.By("Testing that written file is accessible.") CheckReadFromPath(f, clientPod, v1.PersistentVolumeFilesystem, false, volumePath, byteLen, seed) @@ -121,6 +116,9 @@ func TestKubeletRestartsAndRestoresMap(ctx context.Context, c clientset.Interfac ginkgo.By("Restarting kubelet") KubeletCommand(ctx, KRestart, c, clientPod) + ginkgo.By("Wait 20s for the volume to become stable") + time.Sleep(20 * time.Second) + ginkgo.By("Testing that written pv is accessible.") CheckReadFromPath(f, clientPod, v1.PersistentVolumeBlock, false, volumePath, byteLen, seed)