e2e/storage: speed up kubectl commands

Speed up stopping by not waiting for Node not ready, `systemctl` will ensure
kubelet process stopped before return. This should save 40s per case.

Since stop command does not wait for not ready, start command needs to wait for
the next heartbeat to ensure we are checking status from new process.

implement restart by stop then start, to get heartbeat time when kubelet is
down. And we do not need to sleep 30s now. The sleep is moved to callers, since
they still need them to ensure the volume does not disappear.

Dropped support for non-systemd system.
This commit is contained in:
胡玮文 2024-03-22 16:57:06 +08:00
parent 95a6f2e4dc
commit f3f44f70bf
4 changed files with 73 additions and 67 deletions

View File

@ -495,6 +495,16 @@ func hasNonblockingTaint(node *v1.Node, nonblockingTaints string) bool {
return false
}
// GetNodeHeartbeatTime returns the timestamp of the last status update of the node.
func GetNodeHeartbeatTime(node *v1.Node) metav1.Time {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady {
return condition.LastHeartbeatTime
}
}
return metav1.Time{}
}
// PodNodePairs return podNode pairs for all pods in a namespace
func PodNodePairs(ctx context.Context, c clientset.Interface, ns string) ([]PodNode, error) {
var result []PodNode

View File

@ -160,6 +160,27 @@ func WaitForNodeSchedulable(ctx context.Context, c clientset.Interface, name str
return false
}
// WaitForNodeHeartbeatAfter waits up to timeout for node to send the next
// heartbeat after the given timestamp.
//
// To ensure the node status is posted by a restarted kubelet process,
// after should be retrieved by [GetNodeHeartbeatTime] while the kubelet is down.
func WaitForNodeHeartbeatAfter(ctx context.Context, c clientset.Interface, name string, after metav1.Time, timeout time.Duration) bool {
framework.Logf("Waiting up to %v for node %s to send a heartbeat after %v", timeout, name, after)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
framework.Logf("Couldn't get node %s", name)
continue
}
if GetNodeHeartbeatTime(node).After(after.Time) {
return true
}
}
framework.Logf("Node %s didn't send a heartbeat after %v within %v", name, after, timeout)
return false
}
// CheckReady waits up to timeout for cluster to has desired size and
// there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes.
func CheckReady(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) {

View File

@ -24,7 +24,6 @@ import (
"path"
"regexp"
"strings"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
@ -99,76 +98,54 @@ func StartPodLogs(ctx context.Context, f *framework.Framework, driverNamespace *
// KubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod and waits
// for the desired statues..
// - First issues the command via `systemctl`
// - If `systemctl` returns stderr "command not found, issues the command via `service`
// - If `service` also returns stderr "command not found", the test is aborted.
// Allowed kubeletOps are `KStart`, `KStop`, and `KRestart`
func KubeletCommand(ctx context.Context, kOp KubeletOpt, c clientset.Interface, pod *v1.Pod) {
command := ""
systemctlPresent := false
kubeletPid := ""
nodeIP, err := getHostAddress(ctx, c, pod)
framework.ExpectNoError(err)
nodeIP = nodeIP + ":22"
framework.Logf("Checking if systemctl command is present")
sshResult, err := e2essh.SSH(ctx, "systemctl --version", nodeIP, framework.TestContext.Provider)
framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName))
if !strings.Contains(sshResult.Stderr, "command not found") {
command = fmt.Sprintf("systemctl %s kubelet", string(kOp))
systemctlPresent = true
} else {
command = fmt.Sprintf("service kubelet %s", string(kOp))
}
commandTemplate := "systemctl %s kubelet"
sudoPresent := isSudoPresent(ctx, nodeIP, framework.TestContext.Provider)
if sudoPresent {
command = fmt.Sprintf("sudo %s", command)
}
if kOp == KRestart {
kubeletPid = getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent)
commandTemplate = "sudo " + commandTemplate
}
runCmd := func(cmd string) {
command := fmt.Sprintf(commandTemplate, cmd)
framework.Logf("Attempting `%s`", command)
sshResult, err = e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider)
sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider)
framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName))
e2essh.LogResult(sshResult)
gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", string(kOp), sshResult)
gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", cmd, sshResult)
}
if kOp == KStop || kOp == KRestart {
runCmd("stop")
}
if kOp == KStop {
if ok := e2enode.WaitForNodeToBeNotReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok {
framework.Failf("Node %s failed to enter NotReady state", pod.Spec.NodeName)
}
}
if kOp == KRestart {
// Wait for a minute to check if kubelet Pid is getting changed
isPidChanged := false
for start := time.Now(); time.Since(start) < 1*time.Minute; time.Sleep(2 * time.Second) {
if ctx.Err() != nil {
framework.Fail("timed out waiting for Kubelet POD change")
}
kubeletPidAfterRestart := getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent)
if kubeletPid != kubeletPidAfterRestart {
isPidChanged = true
break
}
}
if !isPidChanged {
framework.Fail("Kubelet PID remained unchanged after restarting Kubelet")
return
}
framework.Logf("Noticed that kubelet PID is changed. Waiting for 30 Seconds for Kubelet to come back")
time.Sleep(30 * time.Second)
if kOp == KStart && getKubeletRunning(ctx, nodeIP) {
framework.Logf("Kubelet is already running on node %q", pod.Spec.NodeName)
// Just skip. Or we cannot get a new heartbeat in time.
return
}
if kOp == KStart || kOp == KRestart {
// For kubelet start and restart operations, Wait until Node becomes Ready
node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
framework.ExpectNoError(err)
heartbeatTime := e2enode.GetNodeHeartbeatTime(node)
runCmd("start")
// Wait for next heartbeat, which must be sent by the new kubelet process.
if ok := e2enode.WaitForNodeHeartbeatAfter(ctx, c, pod.Spec.NodeName, heartbeatTime, NodeStateTimeout); !ok {
framework.Failf("Node %s failed to send a heartbeat after %v", pod.Spec.NodeName, heartbeatTime)
}
// Then wait until Node with new process becomes Ready.
if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok {
framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName)
}
}
}
// getHostAddress gets the node for a pod and returns the first
// address. Returns an error if the node the pod is on doesn't have an

View File

@ -73,24 +73,16 @@ func VerifyFSGroupInPod(f *framework.Framework, filePath, expectedFSGroup string
gomega.Expect(expectedFSGroup).To(gomega.Equal(fsGroupResult), "Expected fsGroup of %s, got %s", expectedFSGroup, fsGroupResult)
}
// getKubeletMainPid return the Main PID of the Kubelet Process
func getKubeletMainPid(ctx context.Context, nodeIP string, sudoPresent bool, systemctlPresent bool) string {
command := ""
if systemctlPresent {
command = "systemctl status kubelet | grep 'Main PID'"
} else {
command = "service kubelet status | grep 'Main PID'"
}
if sudoPresent {
command = fmt.Sprintf("sudo %s", command)
}
// getKubeletRunning return if the kubelet is running or not
func getKubeletRunning(ctx context.Context, nodeIP string) bool {
command := "systemctl show kubelet --property ActiveState --value"
framework.Logf("Attempting `%s`", command)
sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider)
framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", nodeIP))
e2essh.LogResult(sshResult)
gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet PID")
gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet Main PID should not be Empty")
return sshResult.Stdout
gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet status")
gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet status should not be Empty")
return strings.TrimSpace(sshResult.Stdout) == "active"
}
// TestKubeletRestartsAndRestoresMount tests that a volume mounted to a pod remains mounted after a kubelet restarts
@ -104,6 +96,9 @@ func TestKubeletRestartsAndRestoresMount(ctx context.Context, c clientset.Interf
ginkgo.By("Restarting kubelet")
KubeletCommand(ctx, KRestart, c, clientPod)
ginkgo.By("Wait 20s for the volume to become stable")
time.Sleep(20 * time.Second)
ginkgo.By("Testing that written file is accessible.")
CheckReadFromPath(f, clientPod, v1.PersistentVolumeFilesystem, false, volumePath, byteLen, seed)
@ -121,6 +116,9 @@ func TestKubeletRestartsAndRestoresMap(ctx context.Context, c clientset.Interfac
ginkgo.By("Restarting kubelet")
KubeletCommand(ctx, KRestart, c, clientPod)
ginkgo.By("Wait 20s for the volume to become stable")
time.Sleep(20 * time.Second)
ginkgo.By("Testing that written pv is accessible.")
CheckReadFromPath(f, clientPod, v1.PersistentVolumeBlock, false, volumePath, byteLen, seed)