Merge pull request #124028 from huww98/kubelet-speedup

e2e/storage: speed up kubelet commands
This commit is contained in:
Kubernetes Prow Robot 2024-08-26 08:13:32 +01:00 committed by GitHub
commit 9fe06620f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 68 additions and 67 deletions

View File

@ -495,6 +495,16 @@ func hasNonblockingTaint(node *v1.Node, nonblockingTaints string) bool {
return false return false
} }
// GetNodeHeartbeatTime returns the timestamp of the last status update of the node.
func GetNodeHeartbeatTime(node *v1.Node) metav1.Time {
for _, condition := range node.Status.Conditions {
if condition.Type == v1.NodeReady {
return condition.LastHeartbeatTime
}
}
return metav1.Time{}
}
// PodNodePairs return podNode pairs for all pods in a namespace // PodNodePairs return podNode pairs for all pods in a namespace
func PodNodePairs(ctx context.Context, c clientset.Interface, ns string) ([]PodNode, error) { func PodNodePairs(ctx context.Context, c clientset.Interface, ns string) ([]PodNode, error) {
var result []PodNode var result []PodNode

View File

@ -22,6 +22,7 @@ import (
"regexp" "regexp"
"time" "time"
"github.com/onsi/gomega"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/fields"
@ -160,6 +161,23 @@ func WaitForNodeSchedulable(ctx context.Context, c clientset.Interface, name str
return false return false
} }
// WaitForNodeHeartbeatAfter waits up to timeout for node to send the next
// heartbeat after the given timestamp.
//
// To ensure the node status is posted by a restarted kubelet process,
// after should be retrieved by [GetNodeHeartbeatTime] while the kubelet is down.
func WaitForNodeHeartbeatAfter(ctx context.Context, c clientset.Interface, name string, after metav1.Time, timeout time.Duration) {
framework.Logf("Waiting up to %v for node %s to send a heartbeat after %v", timeout, name, after)
gomega.Eventually(ctx, func() (time.Time, error) {
node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
if err != nil {
framework.Logf("Couldn't get node %s", name)
return time.Time{}, err
}
return GetNodeHeartbeatTime(node).Time, nil
}, timeout, poll).Should(gomega.BeTemporally(">", after.Time), "Node %s didn't send a heartbeat", name)
}
// CheckReady waits up to timeout for cluster to has desired size and // CheckReady waits up to timeout for cluster to has desired size and
// there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes. // there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes.
func CheckReady(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) { func CheckReady(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) {

View File

@ -24,7 +24,6 @@ import (
"path" "path"
"regexp" "regexp"
"strings" "strings"
"time"
"github.com/onsi/ginkgo/v2" "github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega" "github.com/onsi/gomega"
@ -99,74 +98,50 @@ func StartPodLogs(ctx context.Context, f *framework.Framework, driverNamespace *
// KubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod and waits // KubeletCommand performs `start`, `restart`, or `stop` on the kubelet running on the node of the target pod and waits
// for the desired statues.. // for the desired statues..
// - First issues the command via `systemctl`
// - If `systemctl` returns stderr "command not found, issues the command via `service`
// - If `service` also returns stderr "command not found", the test is aborted.
// Allowed kubeletOps are `KStart`, `KStop`, and `KRestart` // Allowed kubeletOps are `KStart`, `KStop`, and `KRestart`
func KubeletCommand(ctx context.Context, kOp KubeletOpt, c clientset.Interface, pod *v1.Pod) { func KubeletCommand(ctx context.Context, kOp KubeletOpt, c clientset.Interface, pod *v1.Pod) {
command := ""
systemctlPresent := false
kubeletPid := ""
nodeIP, err := getHostAddress(ctx, c, pod) nodeIP, err := getHostAddress(ctx, c, pod)
framework.ExpectNoError(err) framework.ExpectNoError(err)
nodeIP = nodeIP + ":22" nodeIP = nodeIP + ":22"
framework.Logf("Checking if systemctl command is present") commandTemplate := "systemctl %s kubelet"
sshResult, err := e2essh.SSH(ctx, "systemctl --version", nodeIP, framework.TestContext.Provider)
framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName))
if !strings.Contains(sshResult.Stderr, "command not found") {
command = fmt.Sprintf("systemctl %s kubelet", string(kOp))
systemctlPresent = true
} else {
command = fmt.Sprintf("service kubelet %s", string(kOp))
}
sudoPresent := isSudoPresent(ctx, nodeIP, framework.TestContext.Provider) sudoPresent := isSudoPresent(ctx, nodeIP, framework.TestContext.Provider)
if sudoPresent { if sudoPresent {
command = fmt.Sprintf("sudo %s", command) commandTemplate = "sudo " + commandTemplate
} }
if kOp == KRestart { runCmd := func(cmd string) {
kubeletPid = getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent) command := fmt.Sprintf(commandTemplate, cmd)
framework.Logf("Attempting `%s`", command)
sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider)
framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName))
e2essh.LogResult(sshResult)
gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", cmd, sshResult)
} }
framework.Logf("Attempting `%s`", command) if kOp == KStop || kOp == KRestart {
sshResult, err = e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) runCmd("stop")
framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName)) }
e2essh.LogResult(sshResult)
gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", string(kOp), sshResult)
if kOp == KStop { if kOp == KStop {
if ok := e2enode.WaitForNodeToBeNotReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { return
framework.Failf("Node %s failed to enter NotReady state", pod.Spec.NodeName)
}
} }
if kOp == KRestart {
// Wait for a minute to check if kubelet Pid is getting changed
isPidChanged := false
for start := time.Now(); time.Since(start) < 1*time.Minute; time.Sleep(2 * time.Second) {
if ctx.Err() != nil {
framework.Fail("timed out waiting for Kubelet POD change")
}
kubeletPidAfterRestart := getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent)
if kubeletPid != kubeletPidAfterRestart {
isPidChanged = true
break
}
}
if !isPidChanged {
framework.Fail("Kubelet PID remained unchanged after restarting Kubelet")
}
framework.Logf("Noticed that kubelet PID is changed. Waiting for 30 Seconds for Kubelet to come back") if kOp == KStart && getKubeletRunning(ctx, nodeIP) {
time.Sleep(30 * time.Second) framework.Logf("Kubelet is already running on node %q", pod.Spec.NodeName)
// Just skip. Or we cannot get a new heartbeat in time.
return
} }
if kOp == KStart || kOp == KRestart {
// For kubelet start and restart operations, Wait until Node becomes Ready node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok { framework.ExpectNoError(err)
framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName) heartbeatTime := e2enode.GetNodeHeartbeatTime(node)
}
runCmd("start")
// Wait for next heartbeat, which must be sent by the new kubelet process.
e2enode.WaitForNodeHeartbeatAfter(ctx, c, pod.Spec.NodeName, heartbeatTime, NodeStateTimeout)
// Then wait until Node with new process becomes Ready.
if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok {
framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName)
} }
} }

View File

@ -73,24 +73,16 @@ func VerifyFSGroupInPod(f *framework.Framework, filePath, expectedFSGroup string
gomega.Expect(expectedFSGroup).To(gomega.Equal(fsGroupResult), "Expected fsGroup of %s, got %s", expectedFSGroup, fsGroupResult) gomega.Expect(expectedFSGroup).To(gomega.Equal(fsGroupResult), "Expected fsGroup of %s, got %s", expectedFSGroup, fsGroupResult)
} }
// getKubeletMainPid return the Main PID of the Kubelet Process // getKubeletRunning return if the kubelet is running or not
func getKubeletMainPid(ctx context.Context, nodeIP string, sudoPresent bool, systemctlPresent bool) string { func getKubeletRunning(ctx context.Context, nodeIP string) bool {
command := "" command := "systemctl show kubelet --property ActiveState --value"
if systemctlPresent {
command = "systemctl status kubelet | grep 'Main PID'"
} else {
command = "service kubelet status | grep 'Main PID'"
}
if sudoPresent {
command = fmt.Sprintf("sudo %s", command)
}
framework.Logf("Attempting `%s`", command) framework.Logf("Attempting `%s`", command)
sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider) sshResult, err := e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider)
framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", nodeIP)) framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", nodeIP))
e2essh.LogResult(sshResult) e2essh.LogResult(sshResult)
gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet PID") gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to get kubelet status")
gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet Main PID should not be Empty") gomega.Expect(sshResult.Stdout).NotTo(gomega.BeEmpty(), "Kubelet status should not be Empty")
return sshResult.Stdout return strings.TrimSpace(sshResult.Stdout) == "active"
} }
// TestKubeletRestartsAndRestoresMount tests that a volume mounted to a pod remains mounted after a kubelet restarts // TestKubeletRestartsAndRestoresMount tests that a volume mounted to a pod remains mounted after a kubelet restarts
@ -104,6 +96,9 @@ func TestKubeletRestartsAndRestoresMount(ctx context.Context, c clientset.Interf
ginkgo.By("Restarting kubelet") ginkgo.By("Restarting kubelet")
KubeletCommand(ctx, KRestart, c, clientPod) KubeletCommand(ctx, KRestart, c, clientPod)
ginkgo.By("Wait 20s for the volume to become stable")
time.Sleep(20 * time.Second)
ginkgo.By("Testing that written file is accessible.") ginkgo.By("Testing that written file is accessible.")
CheckReadFromPath(f, clientPod, v1.PersistentVolumeFilesystem, false, volumePath, byteLen, seed) CheckReadFromPath(f, clientPod, v1.PersistentVolumeFilesystem, false, volumePath, byteLen, seed)
@ -121,6 +116,9 @@ func TestKubeletRestartsAndRestoresMap(ctx context.Context, c clientset.Interfac
ginkgo.By("Restarting kubelet") ginkgo.By("Restarting kubelet")
KubeletCommand(ctx, KRestart, c, clientPod) KubeletCommand(ctx, KRestart, c, clientPod)
ginkgo.By("Wait 20s for the volume to become stable")
time.Sleep(20 * time.Second)
ginkgo.By("Testing that written pv is accessible.") ginkgo.By("Testing that written pv is accessible.")
CheckReadFromPath(f, clientPod, v1.PersistentVolumeBlock, false, volumePath, byteLen, seed) CheckReadFromPath(f, clientPod, v1.PersistentVolumeBlock, false, volumePath, byteLen, seed)