Merge pull request #95065 from JornShen/replace_restarting_kas_kp_e2e_network_provider

refector service some e2e cases to make it runing in multi providers
This commit is contained in:
Kubernetes Prow Robot 2020-11-03 17:04:03 -08:00 committed by GitHub
commit e25f3d75b8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 74 additions and 126 deletions

View File

@ -584,6 +584,17 @@ func GetPodsInNamespace(c clientset.Interface, ns string, ignoreLabels map[strin
return filtered, nil return filtered, nil
} }
// GetPods return the label matched pods in the given ns
func GetPods(c clientset.Interface, ns string, matchLabels map[string]string) ([]v1.Pod, error) {
label := labels.SelectorFromSet(matchLabels)
listOpts := metav1.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), listOpts)
if err != nil {
return []v1.Pod{}, err
}
return pods.Items, nil
}
// GetPodSecretUpdateTimeout returns the timeout duration for updating pod secret. // GetPodSecretUpdateTimeout returns the timeout duration for updating pod secret.
func GetPodSecretUpdateTimeout(c clientset.Interface) time.Duration { func GetPodSecretUpdateTimeout(c clientset.Interface) time.Duration {
// With SecretManager(ConfigMapManager), we may have to wait up to full sync period + // With SecretManager(ConfigMapManager), we may have to wait up to full sync period +

View File

@ -8,6 +8,7 @@ go_library(
deps = [ deps = [
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/version:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",

View File

@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilversion "k8s.io/apimachinery/pkg/util/version" utilversion "k8s.io/apimachinery/pkg/util/version"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
@ -285,3 +286,25 @@ func RunIfSystemSpecNameIs(names ...string) {
} }
skipInternalf(1, "Skipped because system spec name %q is not in %v", framework.TestContext.SystemSpecName, names) skipInternalf(1, "Skipped because system spec name %q is not in %v", framework.TestContext.SystemSpecName, names)
} }
// SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem run if the component run as pods and client can delete them
func SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem(componentName string, c clientset.Interface, ns string, labelSet labels.Set) {
// verify if component run as pod
label := labels.SelectorFromSet(labelSet)
listOpts := metav1.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(ns).List(context.TODO(), listOpts)
framework.Logf("SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem: %v, %v", pods, err)
if err != nil {
skipInternalf(1, "Skipped because client failed to get component:%s pod err:%v", componentName, err)
}
if len(pods.Items) == 0 {
skipInternalf(1, "Skipped because component:%s is not running as pod.", componentName)
}
// verify if client can delete pod
pod := pods.Items[0]
if err := c.CoreV1().Pods(ns).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{DryRun: []string{metav1.DryRunAll}}); err != nil {
skipInternalf(1, "Skipped because client failed to delete component:%s pod, err:%v", componentName, err)
}
}

View File

@ -60,7 +60,6 @@ import (
e2erc "k8s.io/kubernetes/test/e2e/framework/rc" e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
e2eservice "k8s.io/kubernetes/test/e2e/framework/service" e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
"k8s.io/kubernetes/test/e2e/storage/utils" "k8s.io/kubernetes/test/e2e/storage/utils"
testutils "k8s.io/kubernetes/test/utils" testutils "k8s.io/kubernetes/test/utils"
imageutils "k8s.io/kubernetes/test/utils/image" imageutils "k8s.io/kubernetes/test/utils/image"
@ -83,6 +82,12 @@ const (
// AffinityConfirmCount is the number of needed continuous requests to confirm that // AffinityConfirmCount is the number of needed continuous requests to confirm that
// affinity is enabled. // affinity is enabled.
AffinityConfirmCount = 15 AffinityConfirmCount = 15
// label define which is used to find kube-proxy and kube-apiserver pod
kubeProxyLabelName = "kube-proxy"
clusterAddonLabelKey = "k8s-app"
kubeAPIServerLabelName = "kube-apiserver"
clusterComponentKey = "component"
) )
var ( var (
@ -703,45 +708,8 @@ func getServeHostnameService(name string) *v1.Service {
return svc return svc
} }
// restartKubeProxy restarts kube-proxy on the given host. // waitForAPIServerUp waits for the kube-apiserver to be up.
func restartKubeProxy(host string) error { func waitForAPIServerUp(c clientset.Interface) error {
// TODO: Make it work for all providers.
if !framework.ProviderIs("gce", "gke", "aws") {
return fmt.Errorf("unsupported provider for restartKubeProxy: %s", framework.TestContext.Provider)
}
// kubelet will restart the kube-proxy since it's running in a static pod
framework.Logf("Killing kube-proxy on node %v", host)
result, err := e2essh.SSH("sudo pkill kube-proxy", host, framework.TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
return fmt.Errorf("couldn't restart kube-proxy: %v", err)
}
// wait for kube-proxy to come back up
sshCmd := "sudo /bin/sh -c 'pgrep kube-proxy | wc -l'"
err = wait.Poll(5*time.Second, 60*time.Second, func() (bool, error) {
framework.Logf("Waiting for kubeproxy to come back up with %v on %v", sshCmd, host)
result, err := e2essh.SSH(sshCmd, host, framework.TestContext.Provider)
if err != nil {
return false, err
}
if result.Code != 0 {
e2essh.LogResult(result)
return false, fmt.Errorf("failed to run command, exited %d", result.Code)
}
if result.Stdout == "0\n" {
return false, nil
}
framework.Logf("kube-proxy is back up.")
return true, nil
})
if err != nil {
return fmt.Errorf("kube-proxy didn't recover: %v", err)
}
return nil
}
// waitForApiserverUp waits for the kube-apiserver to be up.
func waitForApiserverUp(c clientset.Interface) error {
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
body, err := c.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).Raw() body, err := c.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()).Raw()
if err == nil && string(body) == "ok" { if err == nil && string(body) == "ok" {
@ -1101,10 +1069,10 @@ var _ = SIGDescribe("Services", func() {
}) })
ginkgo.It("should work after restarting kube-proxy [Disruptive]", func() { ginkgo.It("should work after restarting kube-proxy [Disruptive]", func() {
// TODO: use the ServiceTestJig here kubeProxyLabelSet := map[string]string{clusterAddonLabelKey: kubeProxyLabelName}
e2eskipper.SkipUnlessProviderIs("gce", "gke") e2eskipper.SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem(kubeProxyLabelName, cs, metav1.NamespaceSystem, kubeProxyLabelSet)
e2eskipper.SkipUnlessSSHKeyPresent()
// TODO: use the ServiceTestJig here
ns := f.Namespace.Name ns := f.Namespace.Name
numPods, servicePort := 3, defaultServeHostnameServicePort numPods, servicePort := 3, defaultServeHostnameServicePort
@ -1127,18 +1095,10 @@ var _ = SIGDescribe("Services", func() {
framework.Failf("VIPs conflict: %v", svc1IP) framework.Failf("VIPs conflict: %v", svc1IP)
} }
hosts, err := e2essh.NodeSSHHosts(cs)
framework.ExpectNoError(err, "failed to find external/internal IPs for every node")
if len(hosts) == 0 {
framework.Failf("No ssh-able nodes")
}
host := hosts[0]
framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames1, svc1IP, servicePort)) framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames1, svc1IP, servicePort))
framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames2, svc2IP, servicePort)) framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames2, svc2IP, servicePort))
ginkgo.By(fmt.Sprintf("Restarting kube-proxy on %v", host)) if err := restartComponent(cs, kubeProxyLabelName, metav1.NamespaceSystem, kubeProxyLabelSet); err != nil {
if err := restartKubeProxy(host); err != nil {
framework.Failf("error restarting kube-proxy: %v", err) framework.Failf("error restarting kube-proxy: %v", err)
} }
framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames1, svc1IP, servicePort)) framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames1, svc1IP, servicePort))
@ -1146,12 +1106,14 @@ var _ = SIGDescribe("Services", func() {
}) })
ginkgo.It("should work after restarting apiserver [Disruptive]", func() { ginkgo.It("should work after restarting apiserver [Disruptive]", func() {
// TODO: use the ServiceTestJig here
e2eskipper.SkipUnlessProviderIs("gce", "gke")
e2eskipper.SkipUnlessSSHKeyPresent()
if !framework.ProviderIs("gke") {
e2eskipper.SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem(kubeAPIServerLabelName, cs, metav1.NamespaceSystem, map[string]string{clusterComponentKey: kubeAPIServerLabelName})
}
// TODO: use the ServiceTestJig here
ns := f.Namespace.Name ns := f.Namespace.Name
numPods, servicePort := 3, 80 numPods, servicePort := 3, defaultServeHostnameServicePort
svc1 := "restart-apiserver-1" svc1 := "restart-apiserver-1"
svc2 := "restart-apiserver-2" svc2 := "restart-apiserver-2"
@ -1170,7 +1132,7 @@ var _ = SIGDescribe("Services", func() {
framework.Failf("error restarting apiserver: %v", err) framework.Failf("error restarting apiserver: %v", err)
} }
ginkgo.By("Waiting for apiserver to come up by polling /healthz") ginkgo.By("Waiting for apiserver to come up by polling /healthz")
if err := waitForApiserverUp(cs); err != nil { if err := waitForAPIServerUp(cs); err != nil {
framework.Failf("error while waiting for apiserver up: %v", err) framework.Failf("error while waiting for apiserver up: %v", err)
} }
framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames1, svc1IP, servicePort)) framework.ExpectNoError(verifyServeHostnameServiceUp(cs, ns, podNames1, svc1IP, servicePort))
@ -3739,24 +3701,9 @@ func validateEndpointsPorts(c clientset.Interface, namespace, serviceName string
return nil return nil
} }
// restartApiserver restarts the kube-apiserver.
func restartApiserver(namespace string, cs clientset.Interface) error { func restartApiserver(namespace string, cs clientset.Interface) error {
// TODO: Make it work for all providers. if framework.ProviderIs("gke") {
if !framework.ProviderIs("gce", "gke", "aws") { // GKE use a same-version master upgrade to teardown/recreate master.
return fmt.Errorf("unsupported provider for RestartApiserver: %s", framework.TestContext.Provider)
}
if framework.ProviderIs("gce", "aws") {
initialRestartCount, err := getApiserverRestartCount(cs)
if err != nil {
return fmt.Errorf("failed to get apiserver's restart count: %v", err)
}
if err := sshRestartMaster(); err != nil {
return fmt.Errorf("failed to restart apiserver: %v", err)
}
return waitForApiserverRestarted(cs, initialRestartCount)
}
// GKE doesn't allow ssh access, so use a same-version master
// upgrade to teardown/recreate master.
v, err := cs.Discovery().ServerVersion() v, err := cs.Discovery().ServerVersion()
if err != nil { if err != nil {
return err return err
@ -3764,59 +3711,25 @@ func restartApiserver(namespace string, cs clientset.Interface) error {
return framework.MasterUpgradeGKE(namespace, v.GitVersion[1:]) // strip leading 'v' return framework.MasterUpgradeGKE(namespace, v.GitVersion[1:]) // strip leading 'v'
} }
func sshRestartMaster() error { return restartComponent(cs, kubeAPIServerLabelName, metav1.NamespaceSystem, map[string]string{clusterComponentKey: kubeAPIServerLabelName})
if !framework.ProviderIs("gce", "aws") {
return fmt.Errorf("unsupported provider for sshRestartMaster: %s", framework.TestContext.Provider)
}
var command string
if framework.ProviderIs("gce") {
command = "pidof kube-apiserver | xargs sudo kill"
} else {
command = "sudo /etc/init.d/kube-apiserver restart"
}
framework.Logf("Restarting master via ssh, running: %v", command)
result, err := e2essh.SSH(command, net.JoinHostPort(framework.APIAddress(), e2essh.SSHPort), framework.TestContext.Provider)
if err != nil || result.Code != 0 {
e2essh.LogResult(result)
return fmt.Errorf("couldn't restart apiserver: %v", err)
}
return nil
} }
// waitForApiserverRestarted waits until apiserver's restart count increased. // restartComponent restarts component static pod
func waitForApiserverRestarted(c clientset.Interface, initialRestartCount int32) error { func restartComponent(cs clientset.Interface, cName, ns string, matchLabels map[string]string) error {
for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) { pods, err := e2epod.GetPods(cs, ns, matchLabels)
restartCount, err := getApiserverRestartCount(c)
if err != nil { if err != nil {
framework.Logf("Failed to get apiserver's restart count: %v", err) return fmt.Errorf("failed to get %s's pods, err: %v", cName, err)
continue
} }
if restartCount > initialRestartCount { if len(pods) == 0 {
framework.Logf("Apiserver has restarted.") return fmt.Errorf("%s pod count is 0", cName)
return nil
}
framework.Logf("Waiting for apiserver restart count to increase")
}
return fmt.Errorf("timed out waiting for apiserver to be restarted")
} }
func getApiserverRestartCount(c clientset.Interface) (int32, error) { if err := e2epod.DeletePodsWithGracePeriod(cs, pods, 0); err != nil {
label := labels.SelectorFromSet(labels.Set(map[string]string{"component": "kube-apiserver"})) return fmt.Errorf("failed to restart component: %s, err: %v", cName, err)
listOpts := metav1.ListOptions{LabelSelector: label.String()}
pods, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), listOpts)
if err != nil {
return -1, err
} }
if len(pods.Items) != 1 {
return -1, fmt.Errorf("unexpected number of apiserver pod: %d", len(pods.Items)) _, err = e2epod.PodsCreatedByLabel(cs, ns, cName, int32(len(pods)), labels.SelectorFromSet(matchLabels))
} return err
for _, s := range pods.Items[0].Status.ContainerStatuses {
if s.Name != "kube-apiserver" {
continue
}
return s.RestartCount, nil
}
return -1, fmt.Errorf("Failed to find kube-apiserver container in pod")
} }
var _ = SIGDescribe("SCTP [Feature:SCTP] [LinuxOnly]", func() { var _ = SIGDescribe("SCTP [Feature:SCTP] [LinuxOnly]", func() {