From 8848e26154c8d1b24b416d36184408efe4208cd6 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 26 Jun 2015 10:08:48 +0200 Subject: [PATCH 1/4] Validate endpoints in e2e tests by pod UID and port Before this patch endpoints were validated by container IP and port. Depending on the endpoint controller logic neither of the two must match for a valid endpoint (e.g. in a Mesos setup). This patch checks that the endpoint targetRef points to the right pod by UID, instead of comparing IPs. A later patch will make sure the compared port is the actual container port, not the host port. /xref mesosphere/kubernetes-mesos#365 --- test/e2e/kubectl.go | 11 +++--- test/e2e/service.go | 87 ++++++++++++++++++++++++--------------------- 2 files changed, 53 insertions(+), 45 deletions(-) diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index 6a21e668b34..1be0a480d9b 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -348,12 +348,15 @@ var _ = Describe("Kubectl client", func() { endpoints, err := c.Endpoints(ns).Get(name) Expect(err).NotTo(HaveOccurred()) - ipToPort := getPortsByIp(endpoints.Subsets) - if len(ipToPort) != 1 { - Logf("No IP found, retrying") + uidToPort := getPortsByPodUID(endpoints.Subsets) + if len(uidToPort) == 0 { + Logf("No endpoint found, retrying") continue } - for _, port := range ipToPort { + if len(uidToPort) > 1 { + Fail("To many endpoints found") + } + for _, port := range uidToPort { if port[0] != redisPort { Failf("Wrong endpoint port: %d", port[0]) } diff --git a/test/e2e/service.go b/test/e2e/service.go index 982f6239a01..eec65508674 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/client" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" ) @@ -101,7 +102,7 @@ var _ = Describe("Services", func() { _, err := c.Services(ns).Create(service) Expect(err).NotTo(HaveOccurred()) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) var names []string defer func() { @@ -115,25 +116,25 @@ var _ = Describe("Services", func() { addEndpointPodOrFail(c, ns, name1, labels, []api.ContainerPort{{ContainerPort: 80}}) names = append(names, name1) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name1: {80}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}}) name2 := "test2" addEndpointPodOrFail(c, ns, name2, labels, []api.ContainerPort{{ContainerPort: 80}}) names = append(names, name2) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name1: {80}, name2: {80}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}, name2: {80}}) err = c.Pods(ns).Delete(name1, nil) Expect(err).NotTo(HaveOccurred()) names = []string{name2} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name2: {80}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name2: {80}}) err = c.Pods(ns).Delete(name2, nil) Expect(err).NotTo(HaveOccurred()) names = []string{} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) }) It("should serve multiport endpoints from pods", func() { @@ -175,7 +176,7 @@ var _ = Describe("Services", func() { Expect(err).NotTo(HaveOccurred()) port1 := 100 port2 := 101 - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) var names []string defer func() { @@ -201,35 +202,35 @@ var _ = Describe("Services", func() { podname1 := "podname1" addEndpointPodOrFail(c, ns, podname1, labels, containerPorts1) names = append(names, podname1) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}}) podname2 := "podname2" addEndpointPodOrFail(c, ns, podname2, labels, containerPorts2) names = append(names, podname2) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}, podname2: {port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}}) podname3 := "podname3" addEndpointPodOrFail(c, ns, podname3, labels, append(containerPorts1, containerPorts2...)) names = append(names, podname3) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}, podname2: {port2}, podname3: {port1, port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}, podname3: {port1, port2}}) err = c.Pods(ns).Delete(podname1, nil) Expect(err).NotTo(HaveOccurred()) names = []string{podname2, podname3} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname2: {port2}, podname3: {port1, port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname2: {port2}, podname3: {port1, port2}}) err = c.Pods(ns).Delete(podname2, nil) Expect(err).NotTo(HaveOccurred()) names = []string{podname3} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname3: {port1, port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname3: {port1, port2}}) err = c.Pods(ns).Delete(podname3, nil) Expect(err).NotTo(HaveOccurred()) names = []string{} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) }) It("should be able to up and down services", func() { @@ -1022,60 +1023,63 @@ func validateUniqueOrFail(s []string) { } } -func getPortsByIp(subsets []api.EndpointSubset) map[string][]int { - m := make(map[string][]int) +func getPortsByPodUID(subsets []api.EndpointSubset) PortsByPodUID { + m := PortsByPodUID{} for _, ss := range subsets { for _, port := range ss.Ports { for _, addr := range ss.Addresses { - Logf("Found IP %v and port %v", addr.IP, port.Port) - if _, ok := m[addr.IP]; !ok { - m[addr.IP] = make([]int, 0) + Logf("Found pod %v and port %v", addr.TargetRef.UID, port.Port) + if _, ok := m[addr.TargetRef.UID]; !ok { + m[addr.TargetRef.UID] = make([]int, 0) } - m[addr.IP] = append(m[addr.IP], port.Port) + m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], port.Port) } } } return m } -func translatePodNameToIpOrFail(c *client.Client, ns string, expectedEndpoints map[string][]int) map[string][]int { - portsByIp := make(map[string][]int) +type PortsByPodName map[string][]int +type PortsByPodUID map[types.UID][]int + +func translatePodNameToUIDOrFail(c *client.Client, ns string, expectedEndpoints PortsByPodName) PortsByPodUID { + portsByUID := make(PortsByPodUID) for name, portList := range expectedEndpoints { pod, err := c.Pods(ns).Get(name) if err != nil { Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err) } - portsByIp[pod.Status.PodIP] = portList + portsByUID[pod.ObjectMeta.UID] = portList By(fmt.Sprintf("")) } - By(fmt.Sprintf("successfully translated pod names to ips: %v -> %v on namespace %s", expectedEndpoints, portsByIp, ns)) - return portsByIp + By(fmt.Sprintf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns)) + return portsByUID } -func validatePortsOrFail(endpoints map[string][]int, expectedEndpoints map[string][]int) { +func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) { if len(endpoints) != len(expectedEndpoints) { // should not happen because we check this condition before Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints) } - for ip := range expectedEndpoints { - if _, ok := endpoints[ip]; !ok { - Failf("endpoint %v not found", ip) + for podUID := range expectedEndpoints { + if _, ok := endpoints[podUID]; !ok { + Failf("endpoint %v not found", podUID) } - if len(endpoints[ip]) != len(expectedEndpoints[ip]) { - Failf("invalid list of ports for ip %v. Got %v, expected %v", ip, endpoints[ip], expectedEndpoints[ip]) + if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) { + Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID]) } - sort.Ints(endpoints[ip]) - sort.Ints(expectedEndpoints[ip]) - for index := range endpoints[ip] { - if endpoints[ip][index] != expectedEndpoints[ip][index] { - Failf("invalid list of ports for ip %v. Got %v, expected %v", ip, endpoints[ip], expectedEndpoints[ip]) + sort.Ints(endpoints[podUID]) + sort.Ints(expectedEndpoints[podUID]) + for index := range endpoints[podUID] { + if endpoints[podUID][index] != expectedEndpoints[podUID][index] { + Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID]) } } } } -func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints map[string][]int) { +func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints PortsByPodName) { By(fmt.Sprintf("Waiting up to %v for service %s in namespace %s to expose endpoints %v", serviceStartTimeout, serviceName, namespace, expectedEndpoints)) for start := time.Now(); time.Since(start) < serviceStartTimeout; time.Sleep(5 * time.Second) { endpoints, err := c.Endpoints(namespace).Get(serviceName) @@ -1085,16 +1089,17 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex } Logf("Found endpoints %v", endpoints) - portsByIp := getPortsByIp(endpoints.Subsets) - Logf("Found ports by ip %v", portsByIp) + portsByPodUID := getPortsByPodUID(endpoints.Subsets) + Logf("Found port by pod UID %v", portsByPodUID) - if len(portsByIp) == len(expectedEndpoints) { - expectedPortsByIp := translatePodNameToIpOrFail(c, namespace, expectedEndpoints) - validatePortsOrFail(portsByIp, expectedPortsByIp) + expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints) + if len(portsByPodUID) == len(expectedEndpoints) { + validatePortsOrFail(portsByPodUID, expectedPortsByPodUID) By(fmt.Sprintf("Successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, time.Since(start))) return } - Logf("Unexpected number of endpoints: found %v, expected %v (%v elapsed, ignoring for 5s)", portsByIp, expectedEndpoints, time.Since(start)) + + Logf("Unexpected number of endpoints: found %v, expected %v (%v elapsed, ignoring for 5s)", portsByPodUID, expectedEndpoints, time.Since(start)) } Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, serviceStartTimeout) } From c55e7bf731f54dfa4a99e93f5964f5e00c74f9e9 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 4 Aug 2015 10:31:00 +0200 Subject: [PATCH 2/4] Annotate endpoints in mesos endpoint controller with container ports The EndpointPort struct only stores one port: the port which is used to connect to the container from outside. In the case of the Mesos endpoint controller this is the host port. The container port is not part of the endpoint structure at all. A number of e2e tests need the container port information to validate correct endpoint creation. Therefore this patch annotates the Endpoint struct with a number of annotations mapping ":" to "". In a follow-up commit these annotations are used to validate endpoints in a Mesos setup. --- .../mesos/pkg/scheduler/meta/annotations.go | 1 + .../mesos/pkg/service/endpoints_controller.go | 24 ++++++++++++++----- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/contrib/mesos/pkg/scheduler/meta/annotations.go b/contrib/mesos/pkg/scheduler/meta/annotations.go index dc8dde231e7..7c9ee089380 100644 --- a/contrib/mesos/pkg/scheduler/meta/annotations.go +++ b/contrib/mesos/pkg/scheduler/meta/annotations.go @@ -30,4 +30,5 @@ const ( PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d" PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s" + ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d" ) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 324e459e0ae..ca9489757f3 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -243,6 +243,7 @@ func (e *endpointController) worker() { } } +// HACK(sttts): add annotations to the endpoint about the respective container ports func (e *endpointController) syncService(key string) { startTime := time.Now() defer func() { @@ -287,6 +288,7 @@ func (e *endpointController) syncService(key string) { } subsets := []api.EndpointSubset{} + containerPortAnnotations := map[string]string{} // by : for i := range pods.Items { pod := &pods.Items[i] @@ -295,7 +297,7 @@ func (e *endpointController) syncService(key string) { portName := servicePort.Name portProto := servicePort.Protocol - portNum, err := findPort(pod, servicePort) + portNum, containerPort, err := findPort(pod, servicePort) if err != nil { glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) continue @@ -320,6 +322,7 @@ func (e *endpointController) syncService(key string) { ResourceVersion: pod.ObjectMeta.ResourceVersion, }} subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) + containerPortAnnotations[fmt.Sprintf(meta.ContainerPortKeyFormat, portProto, pod.Status.HostIP, portNum)] = strconv.Itoa(containerPort) } } subsets = endpoints.RepackSubsets(subsets) @@ -348,6 +351,13 @@ func (e *endpointController) syncService(key string) { newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels + if newEndpoints.Annotations == nil { + newEndpoints.Annotations = map[string]string{} + } + for hostIpPort, containerPort := range containerPortAnnotations { + newEndpoints.Annotations[hostIpPort] = containerPort + } + if len(currentEndpoints.ResourceVersion) == 0 { // No previous endpoints, create them _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) @@ -389,8 +399,8 @@ func (e *endpointController) checkLeftoverEndpoints() { // string up in all named ports in all containers in the target pod. If no // match is found, fail. // -// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat. -func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { +// HACK(jdef): return the HostPort in addition to the ContainerPort for generic mesos compatibility +func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, int, error) { portName := svcPort.TargetPort switch portName.Kind { case util.IntstrString: @@ -398,7 +408,8 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.Name == name && port.Protocol == svcPort.Protocol { - return findMappedPortName(pod, port.Protocol, name) + hostPort, err := findMappedPortName(pod, port.Protocol, name) + return hostPort, port.ContainerPort, err } } } @@ -412,12 +423,13 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.ContainerPort == p && port.Protocol == svcPort.Protocol { - return findMappedPort(pod, port.Protocol, p) + hostPort, err := findMappedPort(pod, port.Protocol, p) + return hostPort, port.ContainerPort, err } } } } - return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) + return 0, 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) } func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) { From f04f31f799d64eb84d1e34de481ff23bb0b7a472 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Tue, 4 Aug 2015 15:24:37 +0200 Subject: [PATCH 3/4] Use endpoint annotation to recover container ports in e2e tests on Mesos --- test/e2e/kubectl.go | 2 +- test/e2e/service.go | 31 ++++++++++++++++++++++++++----- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index 1be0a480d9b..70586d185a5 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -348,7 +348,7 @@ var _ = Describe("Kubectl client", func() { endpoints, err := c.Endpoints(ns).Get(name) Expect(err).NotTo(HaveOccurred()) - uidToPort := getPortsByPodUID(endpoints.Subsets) + uidToPort := getContainerPortsByPodUID(endpoints) if len(uidToPort) == 0 { Logf("No endpoint found, retrying") continue diff --git a/test/e2e/service.go b/test/e2e/service.go index eec65508674..91aaac14458 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -22,6 +22,7 @@ import ( "math/rand" "net/http" "sort" + "strconv" "strings" "time" @@ -1023,16 +1024,36 @@ func validateUniqueOrFail(s []string) { } } -func getPortsByPodUID(subsets []api.EndpointSubset) PortsByPodUID { +func getContainerPortsByPodUID(endpoints *api.Endpoints) PortsByPodUID { m := PortsByPodUID{} - for _, ss := range subsets { + for _, ss := range endpoints.Subsets { for _, port := range ss.Ports { for _, addr := range ss.Addresses { - Logf("Found pod %v and port %v", addr.TargetRef.UID, port.Port) + containerPort := port.Port + hostPort := port.Port + + // use endpoint annotations to recover the container port in a Mesos setup + // compare contrib/mesos/pkg/service/endpoints_controller.syncService + // TODO(sttts): add ContainerPort to EndpointPort struct, defaulting to (host) Port + if providerIs("mesos/docker") { + key := fmt.Sprintf("k8s.mesosphere.io/containerPort_%s_%s_%d", port.Protocol, addr.IP, hostPort) + containerPortString := endpoints.Annotations[key] + if containerPortString == "" { + continue + } + var err error + containerPort, err = strconv.Atoi(containerPortString) + if err != nil { + continue + } + Logf("Mapped mesos host port %d to container port %d via annotation %s=%s", hostPort, containerPort, key, containerPortString) + } + + Logf("Found pod %v, host port %d and container port %d", addr.TargetRef.UID, hostPort, containerPort) if _, ok := m[addr.TargetRef.UID]; !ok { m[addr.TargetRef.UID] = make([]int, 0) } - m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], port.Port) + m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], containerPort) } } } @@ -1089,7 +1110,7 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex } Logf("Found endpoints %v", endpoints) - portsByPodUID := getPortsByPodUID(endpoints.Subsets) + portsByPodUID := getContainerPortsByPodUID(endpoints) Logf("Found port by pod UID %v", portsByPodUID) expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints) From 4786705f8e4e6e19f1052fefc0ad17f26af02475 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Fri, 7 Aug 2015 22:08:50 +0200 Subject: [PATCH 4/4] Move TODO from source into an github issue --- test/e2e/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/e2e/service.go b/test/e2e/service.go index 91aaac14458..5a18e576bea 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -1034,7 +1034,6 @@ func getContainerPortsByPodUID(endpoints *api.Endpoints) PortsByPodUID { // use endpoint annotations to recover the container port in a Mesos setup // compare contrib/mesos/pkg/service/endpoints_controller.syncService - // TODO(sttts): add ContainerPort to EndpointPort struct, defaulting to (host) Port if providerIs("mesos/docker") { key := fmt.Sprintf("k8s.mesosphere.io/containerPort_%s_%s_%d", port.Protocol, addr.IP, hostPort) containerPortString := endpoints.Annotations[key]