From 76f76450ca764a3244cbc6dd8545dfc454ec8661 Mon Sep 17 00:00:00 2001 From: Jiatong Wang Date: Thu, 9 May 2019 22:22:32 -0700 Subject: [PATCH] Move service_util endpoints related to framework/endpoints Signed-off-by: Jiatong Wang --- test/e2e/framework/endpoints/BUILD | 9 +- test/e2e/framework/endpoints/ports.go | 136 ++++++++++++++++++++++++++ test/e2e/framework/service_util.go | 99 ------------------- test/e2e/kubectl/BUILD | 1 + test/e2e/kubectl/kubectl.go | 8 +- test/e2e/network/service.go | 34 ++++--- 6 files changed, 172 insertions(+), 115 deletions(-) create mode 100644 test/e2e/framework/endpoints/ports.go diff --git a/test/e2e/framework/endpoints/BUILD b/test/e2e/framework/endpoints/BUILD index 29eae19fe48..2310197e48c 100644 --- a/test/e2e/framework/endpoints/BUILD +++ b/test/e2e/framework/endpoints/BUILD @@ -2,14 +2,21 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["wait.go"], + srcs = [ + "ports.go", + "wait.go", + ], importpath = "k8s.io/kubernetes/test/e2e/framework/endpoints", visibility = ["//visibility:public"], deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//test/e2e/framework:go_default_library", + "//test/e2e/framework/log:go_default_library", + "//vendor/github.com/onsi/ginkgo:go_default_library", ], ) diff --git a/test/e2e/framework/endpoints/ports.go b/test/e2e/framework/endpoints/ports.go new file mode 100644 index 00000000000..c358e507fff --- /dev/null +++ b/test/e2e/framework/endpoints/ports.go @@ -0,0 +1,136 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This soak tests places a specified number of pods on each node and then +repeatedly sends queries to a service running on these pods via +a serivce +*/ + +package endpoints + +import ( + "fmt" + "sort" + "time" + + "github.com/onsi/ginkgo" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + clientset "k8s.io/client-go/kubernetes" + e2elog "k8s.io/kubernetes/test/e2e/framework/log" +) + +// ServiceStartTimeout is how long to wait for a service endpoint to be resolvable. +const ServiceStartTimeout = 3 * time.Minute + +// PortsByPodName is a map that maps pod name to container ports. +type PortsByPodName map[string][]int + +// PortsByPodUID is a map that maps pod UID to container ports. +type PortsByPodUID map[types.UID][]int + +// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. +func GetContainerPortsByPodUID(ep *v1.Endpoints) PortsByPodUID { + m := PortsByPodUID{} + for _, ss := range ep.Subsets { + for _, port := range ss.Ports { + for _, addr := range ss.Addresses { + containerPort := port.Port + if _, ok := m[addr.TargetRef.UID]; !ok { + m[addr.TargetRef.UID] = make([]int, 0) + } + m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort)) + } + } + } + return m +} + +func translatePodNameToUID(c clientset.Interface, ns string, expectedEndpoints PortsByPodName) (PortsByPodUID, error) { + portsByUID := make(PortsByPodUID) + for name, portList := range expectedEndpoints { + pod, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err) + } + portsByUID[pod.ObjectMeta.UID] = portList + } + return portsByUID, nil +} + +func validatePorts(ep PortsByPodUID, expectedEndpoints PortsByPodUID) error { + if len(ep) != len(expectedEndpoints) { + // should not happen because we check this condition before + return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints) + } + for podUID := range expectedEndpoints { + if _, ok := ep[podUID]; !ok { + return fmt.Errorf("endpoint %v not found", podUID) + } + if len(ep[podUID]) != len(expectedEndpoints[podUID]) { + return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID]) + } + sort.Ints(ep[podUID]) + sort.Ints(expectedEndpoints[podUID]) + for index := range ep[podUID] { + if ep[podUID][index] != expectedEndpoints[podUID][index] { + return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID]) + } + } + } + return nil +} + +// ValidateEndpointsPorts validates that the given service exists and is served by the given expectedEndpoints. +func ValidateEndpointsPorts(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) error { + ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints)) + i := 1 + for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) { + ep, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{}) + if err != nil { + e2elog.Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err) + continue + } + portsByPodUID := GetContainerPortsByPodUID(ep) + expectedPortsByPodUID, err := translatePodNameToUID(c, namespace, expectedEndpoints) + if err != nil { + return err + } + if len(portsByPodUID) == len(expectedEndpoints) { + err := validatePorts(portsByPodUID, expectedPortsByPodUID) + if err != nil { + return err + } + e2elog.Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)", + serviceName, namespace, expectedEndpoints, time.Since(start)) + return nil + } + if i%5 == 0 { + e2elog.Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start)) + } + i++ + } + if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}); err == nil { + for _, pod := range pods.Items { + e2elog.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp) + } + } else { + e2elog.Logf("Can't list pod debug info: %v", err) + } + return fmt.Errorf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout) +} diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index f2d99b54094..434f199dab9 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/sets" @@ -1238,104 +1237,6 @@ func UpdateService(c clientset.Interface, namespace, serviceName string, update return service, err } -// GetContainerPortsByPodUID returns a PortsByPodUID map on the given endpoints. -func GetContainerPortsByPodUID(endpoints *v1.Endpoints) PortsByPodUID { - m := PortsByPodUID{} - for _, ss := range endpoints.Subsets { - for _, port := range ss.Ports { - for _, addr := range ss.Addresses { - containerPort := port.Port - if _, ok := m[addr.TargetRef.UID]; !ok { - m[addr.TargetRef.UID] = make([]int, 0) - } - m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], int(containerPort)) - } - } - } - return m -} - -// PortsByPodName maps pod name to ports. -type PortsByPodName map[string][]int - -// PortsByPodUID maps UID to ports. -type PortsByPodUID map[types.UID][]int - -func translatePodNameToUIDOrFail(c clientset.Interface, ns string, expectedEndpoints PortsByPodName) PortsByPodUID { - portsByUID := make(PortsByPodUID) - - for name, portList := range expectedEndpoints { - pod, err := c.CoreV1().Pods(ns).Get(name, metav1.GetOptions{}) - if err != nil { - Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err) - } - portsByUID[pod.ObjectMeta.UID] = portList - } - // Logf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns) - return portsByUID -} - -func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) { - if len(endpoints) != len(expectedEndpoints) { - // should not happen because we check this condition before - Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints) - } - for podUID := range expectedEndpoints { - if _, ok := endpoints[podUID]; !ok { - Failf("endpoint %v not found", podUID) - } - if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) { - Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID]) - } - sort.Ints(endpoints[podUID]) - sort.Ints(expectedEndpoints[podUID]) - for index := range endpoints[podUID] { - if endpoints[podUID][index] != expectedEndpoints[podUID][index] { - Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID]) - } - } - } -} - -// ValidateEndpointsOrFail validates that the given service exists and is served by the given expectedEndpoints. -func ValidateEndpointsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints PortsByPodName) { - ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", ServiceStartTimeout, serviceName, namespace, expectedEndpoints)) - i := 1 - for start := time.Now(); time.Since(start) < ServiceStartTimeout; time.Sleep(1 * time.Second) { - endpoints, err := c.CoreV1().Endpoints(namespace).Get(serviceName, metav1.GetOptions{}) - if err != nil { - Logf("Get endpoints failed (%v elapsed, ignoring for 5s): %v", time.Since(start), err) - continue - } - // Logf("Found endpoints %v", endpoints) - - portsByPodUID := GetContainerPortsByPodUID(endpoints) - // Logf("Found port by pod UID %v", portsByPodUID) - - expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints) - if len(portsByPodUID) == len(expectedEndpoints) { - validatePortsOrFail(portsByPodUID, expectedPortsByPodUID) - Logf("successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)", - serviceName, namespace, expectedEndpoints, time.Since(start)) - return - } - - if i%5 == 0 { - Logf("Unexpected endpoints: found %v, expected %v (%v elapsed, will retry)", portsByPodUID, expectedEndpoints, time.Since(start)) - } - i++ - } - - if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{}); err == nil { - for _, pod := range pods.Items { - Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp) - } - } else { - Logf("Can't list pod debug info: %v", err) - } - Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, ServiceStartTimeout) -} - // StartServeHostnameService creates a replication controller that serves its // hostname and a service on top of it. func StartServeHostnameService(c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) { diff --git a/test/e2e/kubectl/BUILD b/test/e2e/kubectl/BUILD index f740c529be0..2880d843f03 100644 --- a/test/e2e/kubectl/BUILD +++ b/test/e2e/kubectl/BUILD @@ -33,6 +33,7 @@ go_library( "//test/e2e/common:go_default_library", "//test/e2e/framework:go_default_library", "//test/e2e/framework/auth:go_default_library", + "//test/e2e/framework/endpoints:go_default_library", "//test/e2e/framework/job:go_default_library", "//test/e2e/framework/log:go_default_library", "//test/e2e/framework/testfiles:go_default_library", diff --git a/test/e2e/kubectl/kubectl.go b/test/e2e/kubectl/kubectl.go index 6249da57d74..157f6d922ec 100644 --- a/test/e2e/kubectl/kubectl.go +++ b/test/e2e/kubectl/kubectl.go @@ -40,8 +40,6 @@ import ( "time" "github.com/elazarl/goproxy" - "sigs.k8s.io/yaml" - v1 "k8s.io/api/core/v1" rbacv1beta1 "k8s.io/api/rbac/v1beta1" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -60,6 +58,7 @@ import ( commonutils "k8s.io/kubernetes/test/e2e/common" "k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework/auth" + e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" jobutil "k8s.io/kubernetes/test/e2e/framework/job" e2elog "k8s.io/kubernetes/test/e2e/framework/log" "k8s.io/kubernetes/test/e2e/framework/testfiles" @@ -67,6 +66,7 @@ import ( testutils "k8s.io/kubernetes/test/utils" "k8s.io/kubernetes/test/utils/crd" uexec "k8s.io/utils/exec" + "sigs.k8s.io/yaml" "github.com/onsi/ginkgo" "github.com/onsi/gomega" @@ -1075,7 +1075,7 @@ metadata: }) validateService := func(name string, servicePort int, timeout time.Duration) { err := wait.Poll(framework.Poll, timeout, func() (bool, error) { - endpoints, err := c.CoreV1().Endpoints(ns).Get(name, metav1.GetOptions{}) + ep, err := c.CoreV1().Endpoints(ns).Get(name, metav1.GetOptions{}) if err != nil { // log the real error e2elog.Logf("Get endpoints failed (interval %v): %v", framework.Poll, err) @@ -1089,7 +1089,7 @@ metadata: return false, err } - uidToPort := framework.GetContainerPortsByPodUID(endpoints) + uidToPort := e2eendpoints.GetContainerPortsByPodUID(ep) if len(uidToPort) == 0 { e2elog.Logf("No endpoint found, retrying") return false, nil diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 187b07122a8..97ca2675c41 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -37,6 +37,7 @@ import ( cloudprovider "k8s.io/cloud-provider" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/test/e2e/framework" + e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints" e2elog "k8s.io/kubernetes/test/e2e/framework/log" "k8s.io/kubernetes/test/e2e/framework/providers/gce" e2essh "k8s.io/kubernetes/test/e2e/framework/ssh" @@ -138,7 +139,8 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err, "failed to create service with ServicePorts in namespace: %s", ns) - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) names := map[string]bool{} defer func() { @@ -153,19 +155,23 @@ var _ = SIGDescribe("Services", func() { framework.CreatePodOrFail(cs, ns, name1, labels, []v1.ContainerPort{{ContainerPort: 80}}) names[name1] = true - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{name1: {80}}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{name1: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) framework.CreatePodOrFail(cs, ns, name2, labels, []v1.ContainerPort{{ContainerPort: 80}}) names[name2] = true - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{name1: {80}, name2: {80}}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{name1: {80}, name2: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) framework.DeletePodOrFail(cs, ns, name1) delete(names, name1) - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{name2: {80}}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{name2: {80}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) framework.DeletePodOrFail(cs, ns, name2) delete(names, name2) - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) }) /* @@ -206,7 +212,8 @@ var _ = SIGDescribe("Services", func() { framework.ExpectNoError(err, "failed to create service with ServicePorts in namespace: %s", ns) port1 := 100 port2 := 101 - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) names := map[string]bool{} defer func() { @@ -234,19 +241,23 @@ var _ = SIGDescribe("Services", func() { framework.CreatePodOrFail(cs, ns, podname1, labels, containerPorts1) names[podname1] = true - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{podname1: {port1}}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{podname1: {port1}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) framework.CreatePodOrFail(cs, ns, podname2, labels, containerPorts2) names[podname2] = true - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{podname1: {port1}, podname2: {port2}}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{podname1: {port1}, podname2: {port2}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) framework.DeletePodOrFail(cs, ns, podname1) delete(names, podname1) - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{podname2: {port2}}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{podname2: {port2}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) framework.DeletePodOrFail(cs, ns, podname2) delete(names, podname2) - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{}) + err = e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) }) ginkgo.It("should preserve source pod IP for traffic thru service cluster IP", func() { @@ -297,7 +308,8 @@ var _ = SIGDescribe("Services", func() { }() // Waiting for service to expose endpoint. - framework.ValidateEndpointsOrFail(cs, ns, serviceName, framework.PortsByPodName{serverPodName: {servicePort}}) + err := e2eendpoints.ValidateEndpointsPorts(cs, ns, serviceName, e2eendpoints.PortsByPodName{serverPodName: {servicePort}}) + framework.ExpectNoError(err, "failed to validate endpoints for service %s in namespace: %s", serviceName, ns) ginkgo.By("Retrieve sourceip from a pod on the same node") sourceIP1, execPodIP1 := execSourceipTest(f, cs, ns, node1.Name, serviceIP, servicePort)