From f3f44f70bf813ea195737e37483eaa081bf7bfd3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Fri, 22 Mar 2024 16:57:06 +0800 Subject: [PATCH 1/2] e2e/storage: speed up kubectl commands Speed up stopping by not waiting for Node not ready, `systemctl` will ensure kubelet process stopped before return. This should save 40s per case. Since stop command does not wait for not ready, start command needs to wait for the next heartbeat to ensure we are checking status from new process. implement restart by stop then start, to get heartbeat time when kubelet is down. And we do not need to sleep 30s now. The sleep is moved to callers, since they still need them to ensure the volume does not disappear. Dropped support for non-systemd system. --- test/e2e/framework/node/resource.go | 10 ++++ test/e2e/framework/node/wait.go | 21 ++++++++ test/e2e/storage/utils/pod.go | 83 +++++++++++------------------ test/e2e/storage/utils/utils.go | 26 +++++---- 4 files changed, 73 insertions(+), 67 deletions(-) diff --git a/test/e2e/framework/node/resource.go b/test/e2e/framework/node/resource.go index a57473bab83..9a71d4cd3d5 100644 --- a/test/e2e/framework/node/resource.go +++ b/test/e2e/framework/node/resource.go @@ -495,6 +495,16 @@ func hasNonblockingTaint(node *v1.Node, nonblockingTaints string) bool { return false } +// GetNodeHeartbeatTime returns the timestamp of the last status update of the node. +func GetNodeHeartbeatTime(node *v1.Node) metav1.Time { + for _, condition := range node.Status.Conditions { + if condition.Type == v1.NodeReady { + return condition.LastHeartbeatTime + } + } + return metav1.Time{} +} + // PodNodePairs return podNode pairs for all pods in a namespace func PodNodePairs(ctx context.Context, c clientset.Interface, ns string) ([]PodNode, error) { var result []PodNode diff --git a/test/e2e/framework/node/wait.go b/test/e2e/framework/node/wait.go index d87e59b3db5..7af8c4df285 100644 --- a/test/e2e/framework/node/wait.go +++ b/test/e2e/framework/node/wait.go @@ -160,6 +160,27 @@ func WaitForNodeSchedulable(ctx context.Context, c clientset.Interface, name str return false } +// WaitForNodeHeartbeatAfter waits up to timeout for node to send the next +// heartbeat after the given timestamp. +// +// To ensure the node status is posted by a restarted kubelet process, +// after should be retrieved by [GetNodeHeartbeatTime] while the kubelet is down. +func WaitForNodeHeartbeatAfter(ctx context.Context, c clientset.Interface, name string, after metav1.Time, timeout time.Duration) bool { + framework.Logf("Waiting up to %v for node %s to send a heartbeat after %v", timeout, name, after) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + framework.Logf("Couldn't get node %s", name) + continue + } + if GetNodeHeartbeatTime(node).After(after.Time) { + return true + } + } + framework.Logf("Node %s didn't send a heartbeat after %v within %v", name, after, timeout) + return false +} + // CheckReady waits up to timeout for cluster to has desired size and // there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes. func CheckReady(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) { diff --git a/test/e2e/storage/utils/pod.go b/test/e2e/storage/utils/pod.go index b971bd152eb..552650b985e 100644 --- a/test/e2e/storage/utils/pod.go +++ b/test/e2e/storage/utils/pod.go @@ -24,7 +24,6 @@ import ( "path" "regexp" "strings" - "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -99,74 +98,52 @@ func StartPodLogs(ctx context.Context, f *framework.Framework, driverNamespace * // KubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod and waits // for the desired statues.. -// - First issues the command via `systemctl` -// - If `systemctl` returns stderr "command not found, issues the command via `service` -// - If `service` also returns stderr "command not found", the test is aborted. // Allowed kubeletOps are `KStart`, `KStop`, and `KRestart` func KubeletCommand(ctx context.Context, kOp KubeletOpt, c clientset.Interface, pod *v1.Pod) { - command := "" - systemctlPresent := false - kubeletPid := "" - nodeIP, err := getHostAddress(ctx, c, pod) framework.ExpectNoError(err) nodeIP = nodeIP + ":22" - framework.Logf("Checking if systemctl command is present") - sshResult, err := e2essh.SSH(ctx, "systemctl --version", nodeIP, framework.TestContext.Provider) - framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) - if !strings.Contains(sshResult.Stderr, "command not found") { - command = fmt.Sprintf("systemctl %s kubelet", string(kOp)) - systemctlPresent = true - } else { - command = fmt.Sprintf("service kubelet %s", string(kOp)) - } - + commandTemplate := "systemctl %s kubelet" sudoPresent := isSudoPresent(ctx, nodeIP, framework.TestContext.Provider) if sudoPresent { - command = fmt.Sprintf("sudo %s", command) + commandTemplate = "sudo " + commandTemplate } - if kOp == KRestart { - kubeletPid = getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent) + runCmd := func(cmd string) { + command := fmt.Sprintf(commandTemplate, cmd) + framework.Logf("Attempting `%s`", command) + sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) + framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) + e2essh.LogResult(sshResult) + gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", cmd, sshResult) } - framework.Logf("Attempting `%s`", command) - sshResult, err = e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) - framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) - e2essh.LogResult(sshResult) - gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", string(kOp), sshResult) - + if kOp == KStop || kOp == KRestart { + runCmd("stop") + } if kOp == KStop { - if ok := e2enode.WaitForNodeToBeNotReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { - framework.Failf("Node %s failed to enter NotReady state", pod.Spec.NodeName) - } + return } - if kOp == KRestart { - // Wait for a minute to check if kubelet Pid is getting changed - isPidChanged := false - for start := time.Now(); time.Since(start) < 1*time.Minute; time.Sleep(2 * time.Second) { - if ctx.Err() != nil { - framework.Fail("timed out waiting for Kubelet POD change") - } - kubeletPidAfterRestart := getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent) - if kubeletPid != kubeletPidAfterRestart { - isPidChanged = true - break - } - } - if !isPidChanged { - framework.Fail("Kubelet PID remained unchanged after restarting Kubelet") - } - framework.Logf("Noticed that kubelet PID is changed. Waiting for 30 Seconds for Kubelet to come back") - time.Sleep(30 * time.Second) + if kOp == KStart && getKubeletRunning(ctx, nodeIP) { + framework.Logf("Kubelet is already running on node %q", pod.Spec.NodeName) + // Just skip. Or we cannot get a new heartbeat in time. + return } - if kOp == KStart || kOp == KRestart { - // For kubelet start and restart operations, Wait until Node becomes Ready - if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { - framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName) - } + + node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + heartbeatTime := e2enode.GetNodeHeartbeatTime(node) + + runCmd("start") + // Wait for next heartbeat, which must be sent by the new kubelet process. + if ok := e2enode.WaitForNodeHeartbeatAfter(ctx, c, pod.Spec.NodeName, heartbeatTime, NodeStateTimeout); !ok { + framework.Failf("Node %s failed to send a heartbeat after %v", pod.Spec.NodeName, heartbeatTime) + } + // Then wait until Node with new process becomes Ready. + if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { + framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName) } } diff --git a/test/e2e/storage/utils/utils.go b/test/e2e/storage/utils/utils.go index b6f18a11832..ff93e03a779 100644 --- a/test/e2e/storage/utils/utils.go +++ b/test/e2e/storage/utils/utils.go @@ -73,24 +73,16 @@ func VerifyFSGroupInPod(f *framework.Framework, filePath, expectedFSGroup string gomega.Expect(expectedFSGroup).To(gomega.Equal(fsGroupResult), "Expected fsGroup of %s, got %s", expectedFSGroup, fsGroupResult) } -// getKubeletMainPid return the Main PID of the Kubelet Process -func getKubeletMainPid(ctx context.Context, nodeIP string, sudoPresent bool, systemctlPresent bool) string { - command := "" - if systemctlPresent { - command = "systemctl status kubelet | grep 'Main PID'" - } else { - command = "service kubelet status | grep 'Main PID'" - } - if sudoPresent { - command = fmt.Sprintf("sudo %s", command) - } +// getKubeletRunning return if the kubelet is running or not +func getKubeletRunning(ctx context.Context, nodeIP string) bool { + command := "systemctl show kubelet --property ActiveState --value" framework.Logf("Attempting `%s`", command) sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", nodeIP)) e2essh.LogResult(sshResult) - gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet PID") - gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet Main PID should not be Empty") - return sshResult.Stdout + gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet status") + gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet status should not be Empty") + return strings.TrimSpace(sshResult.Stdout) == "active" } // TestKubeletRestartsAndRestoresMount tests that a volume mounted to a pod remains mounted after a kubelet restarts @@ -104,6 +96,9 @@ func TestKubeletRestartsAndRestoresMount(ctx context.Context, c clientset.Interf ginkgo.By("Restarting kubelet") KubeletCommand(ctx, KRestart, c, clientPod) + ginkgo.By("Wait 20s for the volume to become stable") + time.Sleep(20 * time.Second) + ginkgo.By("Testing that written file is accessible.") CheckReadFromPath(f, clientPod, v1.PersistentVolumeFilesystem, false, volumePath, byteLen, seed) @@ -121,6 +116,9 @@ func TestKubeletRestartsAndRestoresMap(ctx context.Context, c clientset.Interfac ginkgo.By("Restarting kubelet") KubeletCommand(ctx, KRestart, c, clientPod) + ginkgo.By("Wait 20s for the volume to become stable") + time.Sleep(20 * time.Second) + ginkgo.By("Testing that written pv is accessible.") CheckReadFromPath(f, clientPod, v1.PersistentVolumeBlock, false, volumePath, byteLen, seed) From c916f5e755ffc4d2cefdc294c5009a971138b860 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=83=A1=E7=8E=AE=E6=96=87?= Date: Sat, 30 Mar 2024 14:02:48 +0800 Subject: [PATCH 2/2] e2e/framework/node: use gomega.Eventually to poll --- test/e2e/framework/node/wait.go | 15 ++++++--------- test/e2e/storage/utils/pod.go | 4 +--- 2 files changed, 7 insertions(+), 12 deletions(-) diff --git a/test/e2e/framework/node/wait.go b/test/e2e/framework/node/wait.go index 7af8c4df285..cf8b3d388c9 100644 --- a/test/e2e/framework/node/wait.go +++ b/test/e2e/framework/node/wait.go @@ -22,6 +22,7 @@ import ( "regexp" "time" + "github.com/onsi/gomega" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -165,20 +166,16 @@ func WaitForNodeSchedulable(ctx context.Context, c clientset.Interface, name str // // To ensure the node status is posted by a restarted kubelet process, // after should be retrieved by [GetNodeHeartbeatTime] while the kubelet is down. -func WaitForNodeHeartbeatAfter(ctx context.Context, c clientset.Interface, name string, after metav1.Time, timeout time.Duration) bool { +func WaitForNodeHeartbeatAfter(ctx context.Context, c clientset.Interface, name string, after metav1.Time, timeout time.Duration) { framework.Logf("Waiting up to %v for node %s to send a heartbeat after %v", timeout, name, after) - for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + gomega.Eventually(ctx, func() (time.Time, error) { node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{}) if err != nil { framework.Logf("Couldn't get node %s", name) - continue + return time.Time{}, err } - if GetNodeHeartbeatTime(node).After(after.Time) { - return true - } - } - framework.Logf("Node %s didn't send a heartbeat after %v within %v", name, after, timeout) - return false + return GetNodeHeartbeatTime(node).Time, nil + }, timeout, poll).Should(gomega.BeTemporally(">", after.Time), "Node %s didn't send a heartbeat", name) } // CheckReady waits up to timeout for cluster to has desired size and diff --git a/test/e2e/storage/utils/pod.go b/test/e2e/storage/utils/pod.go index 552650b985e..603f86e2ce8 100644 --- a/test/e2e/storage/utils/pod.go +++ b/test/e2e/storage/utils/pod.go @@ -138,9 +138,7 @@ func KubeletCommand(ctx context.Context, kOp KubeletOpt, c clientset.Interface, runCmd("start") // Wait for next heartbeat, which must be sent by the new kubelet process. - if ok := e2enode.WaitForNodeHeartbeatAfter(ctx, c, pod.Spec.NodeName, heartbeatTime, NodeStateTimeout); !ok { - framework.Failf("Node %s failed to send a heartbeat after %v", pod.Spec.NodeName, heartbeatTime) - } + e2enode.WaitForNodeHeartbeatAfter(ctx, c, pod.Spec.NodeName, heartbeatTime, NodeStateTimeout) // Then wait until Node with new process becomes Ready. if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName)