Fix for DaemonRestart [Disruptive] tests

Signed-off-by: Davanum Srinivas <davanum@gmail.com>
This commit is contained in:
Davanum Srinivas 2024-08-29 13:36:25 -04:00
parent df577d7fbc
commit 0a124e28b6
No known key found for this signature in database
GPG Key ID: 80D83A796103BF59
2 changed files with 91 additions and 37 deletions

View File

@ -40,6 +40,7 @@ import (
e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
testfwk "k8s.io/kubernetes/test/integration/framework"
testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image"
admissionapi "k8s.io/pod-security-admission/api"
@ -106,17 +107,20 @@ func (r *RestartDaemonConfig) waitUp(ctx context.Context) {
var healthzCheck string
if r.enableHTTPS {
healthzCheck = fmt.Sprintf(
"curl -sk -o %v -I -w \"%%{http_code}\" https://localhost:%v/healthz", nullDev, r.healthzPort)
"curl -sk -o %v -I -w \"%%{http_code}\" https://127.0.0.1:%v/healthz", nullDev, r.healthzPort)
} else {
healthzCheck = fmt.Sprintf(
"curl -s -o %v -I -w \"%%{http_code}\" http://localhost:%v/healthz", nullDev, r.healthzPort)
"curl -s -o %v -I -w \"%%{http_code}\" http://127.0.0.1:%v/healthz", nullDev, r.healthzPort)
}
err := wait.PollUntilContextTimeout(ctx, r.pollInterval, r.pollTimeout, false, func(ctx context.Context) (bool, error) {
result, err := e2essh.NodeExec(ctx, r.nodeName, healthzCheck, framework.TestContext.Provider)
if err != nil {
return false, err
}
e2essh.LogResult(result)
if result.Code == 0 {
httpCode, err := strconv.Atoi(result.Stdout)
if err != nil {
@ -274,49 +278,57 @@ var _ = SIGDescribe("DaemonRestart", framework.WithDisruptive(), func() {
// Requires master ssh access.
e2eskipper.SkipUnlessProviderIs("gce", "aws")
restarter := NewRestartConfig(
framework.APIAddress(), "kube-controller", ports.KubeControllerManagerPort, restartPollInterval, restartTimeout, true)
restarter.restart(ctx)
nodes, err := getControlPlaneNodes(ctx, f.ClientSet)
framework.ExpectNoError(err)
for i := range nodes.Items {
// The intent is to ensure the replication controller manager has observed and reported status of
// the replication controller at least once since the manager restarted, so that we can determine
// that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC
// to the same size achieves this, because the scale operation advances the RC's sequence number
// and awaits it to be observed and reported back in the RC's status.
e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods, true)
restarter := NewRestartConfig(
getFirstIPforNode(&nodes.Items[i]), "kube-controller", ports.KubeControllerManagerPort, restartPollInterval, restartTimeout, true)
restarter.restart(ctx)
// Only check the keys, the pods can be different if the kubelet updated it.
// TODO: Can it really?
existingKeys := sets.NewString()
newKeys := sets.NewString()
for _, k := range existingPods.ListKeys() {
existingKeys.Insert(k)
}
for _, k := range newPods.ListKeys() {
newKeys.Insert(k)
}
if len(newKeys.List()) != len(existingKeys.List()) ||
!newKeys.IsSuperset(existingKeys) {
framework.Failf("RcManager created/deleted pods after restart \n\n %+v", tracker)
// The intent is to ensure the replication controller manager has observed and reported status of
// the replication controller at least once since the manager restarted, so that we can determine
// that it had the opportunity to create/delete pods, if it were going to do so. Scaling the RC
// to the same size achieves this, because the scale operation advances the RC's sequence number
// and awaits it to be observed and reported back in the RC's status.
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods, true))
// Only check the keys, the pods can be different if the kubelet updated it.
// TODO: Can it really?
existingKeys := sets.NewString()
newKeys := sets.NewString()
for _, k := range existingPods.ListKeys() {
existingKeys.Insert(k)
}
for _, k := range newPods.ListKeys() {
newKeys.Insert(k)
}
if len(newKeys.List()) != len(existingKeys.List()) ||
!newKeys.IsSuperset(existingKeys) {
framework.Failf("RcManager created/deleted pods after restart \n\n %+v", tracker)
}
}
})
ginkgo.It("Scheduler should continue assigning pods to nodes across restart", func(ctx context.Context) {
// Requires master ssh access.
e2eskipper.SkipUnlessProviderIs("gce", "aws")
restarter := NewRestartConfig(
framework.APIAddress(), "kube-scheduler", kubeschedulerconfig.DefaultKubeSchedulerPort, restartPollInterval, restartTimeout, true)
nodes, err := getControlPlaneNodes(ctx, f.ClientSet)
framework.ExpectNoError(err)
for i := range nodes.Items {
restarter := NewRestartConfig(
getFirstIPforNode(&nodes.Items[i]), "kube-scheduler", kubeschedulerconfig.DefaultKubeSchedulerPort, restartPollInterval, restartTimeout, true)
// Create pods while the scheduler is down and make sure the scheduler picks them up by
// scaling the rc to the same size.
restarter.waitUp(ctx)
restarter.kill(ctx)
// This is best effort to try and create pods while the scheduler is down,
// since we don't know exactly when it is restarted after the kill signal.
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, false))
restarter.waitUp(ctx)
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, true))
// Create pods while the scheduler is down and make sure the scheduler picks them up by
// scaling the rc to the same size.
restarter.waitUp(ctx)
restarter.kill(ctx)
// This is best effort to try and create pods while the scheduler is down,
// since we don't know exactly when it is restarted after the kill signal.
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, false))
restarter.waitUp(ctx)
framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, true))
}
})
ginkgo.It("Kubelet should not restart containers across restart", func(ctx context.Context) {
@ -331,7 +343,7 @@ var _ = SIGDescribe("DaemonRestart", framework.WithDisruptive(), func() {
}
for _, ip := range nodeIPs {
restarter := NewRestartConfig(
ip, "kubelet", ports.KubeletReadOnlyPort, restartPollInterval, restartTimeout, false)
ip, "kubelet", ports.KubeletHealthzPort, restartPollInterval, restartTimeout, false)
restarter.restart(ctx)
}
postRestarts, badNodes := getContainerRestarts(ctx, f.ClientSet, ns, labelSelector)
@ -355,3 +367,42 @@ var _ = SIGDescribe("DaemonRestart", framework.WithDisruptive(), func() {
}
})
})
func getFirstIPforNode(node *v1.Node) string {
var ips []string
ips = append(ips, getAddresses(node, v1.NodeExternalIP)...)
if len(ips) == 0 {
// If ExternalIP isn't set, assume the test programs can reach the InternalIP
ips = append(ips, getAddresses(node, v1.NodeInternalIP)...)
}
if len(ips) == 0 {
framework.Failf("did not find any ip(s) for node: %v", node)
}
return ips[0]
}
func getAddresses(node *v1.Node, addressType v1.NodeAddressType) (ips []string) {
for j := range node.Status.Addresses {
nodeAddress := &node.Status.Addresses[j]
if nodeAddress.Type == addressType && nodeAddress.Address != "" {
ips = append(ips, nodeAddress.Address)
}
}
return
}
func getControlPlaneNodes(ctx context.Context, c clientset.Interface) (nodes *v1.NodeList, err error) {
nodes, err = c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
testfwk.Filter(nodes, func(node v1.Node) bool {
_, isMaster := node.Labels["node-role.kubernetes.io/master"]
_, isControlPlane := node.Labels["node-role.kubernetes.io/control-plane"]
return isMaster || isControlPlane
})
if len(nodes.Items) == 0 {
return nil, fmt.Errorf("there are currently no ready, schedulable control plane nodes in the cluster")
}
return nodes, nil
}

View File

@ -68,6 +68,9 @@ func GetSigner(provider string) (ssh.Signer, error) {
switch provider {
case "gce", "gke", "kubemark":
keyfile = os.Getenv("GCE_SSH_KEY")
if keyfile == "" {
keyfile = os.Getenv("GCE_SSH_PRIVATE_KEY_FILE")
}
if keyfile == "" {
keyfile = "google_compute_engine"
}