diff --git a/contrib/mesos/pkg/scheduler/meta/annotations.go b/contrib/mesos/pkg/scheduler/meta/annotations.go index dc8dde231e7..7c9ee089380 100644 --- a/contrib/mesos/pkg/scheduler/meta/annotations.go +++ b/contrib/mesos/pkg/scheduler/meta/annotations.go @@ -30,4 +30,5 @@ const ( PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d" PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s" + ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d" ) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 324e459e0ae..ca9489757f3 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -243,6 +243,7 @@ func (e *endpointController) worker() { } } +// HACK(sttts): add annotations to the endpoint about the respective container ports func (e *endpointController) syncService(key string) { startTime := time.Now() defer func() { @@ -287,6 +288,7 @@ func (e *endpointController) syncService(key string) { } subsets := []api.EndpointSubset{} + containerPortAnnotations := map[string]string{} // by : for i := range pods.Items { pod := &pods.Items[i] @@ -295,7 +297,7 @@ func (e *endpointController) syncService(key string) { portName := servicePort.Name portProto := servicePort.Protocol - portNum, err := findPort(pod, servicePort) + portNum, containerPort, err := findPort(pod, servicePort) if err != nil { glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) continue @@ -320,6 +322,7 @@ func (e *endpointController) syncService(key string) { ResourceVersion: pod.ObjectMeta.ResourceVersion, }} subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) + containerPortAnnotations[fmt.Sprintf(meta.ContainerPortKeyFormat, portProto, pod.Status.HostIP, portNum)] = strconv.Itoa(containerPort) } } subsets = endpoints.RepackSubsets(subsets) @@ -348,6 +351,13 @@ func (e *endpointController) syncService(key string) { newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels + if newEndpoints.Annotations == nil { + newEndpoints.Annotations = map[string]string{} + } + for hostIpPort, containerPort := range containerPortAnnotations { + newEndpoints.Annotations[hostIpPort] = containerPort + } + if len(currentEndpoints.ResourceVersion) == 0 { // No previous endpoints, create them _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) @@ -389,8 +399,8 @@ func (e *endpointController) checkLeftoverEndpoints() { // string up in all named ports in all containers in the target pod. If no // match is found, fail. // -// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat. -func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { +// HACK(jdef): return the HostPort in addition to the ContainerPort for generic mesos compatibility +func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, int, error) { portName := svcPort.TargetPort switch portName.Kind { case util.IntstrString: @@ -398,7 +408,8 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.Name == name && port.Protocol == svcPort.Protocol { - return findMappedPortName(pod, port.Protocol, name) + hostPort, err := findMappedPortName(pod, port.Protocol, name) + return hostPort, port.ContainerPort, err } } } @@ -412,12 +423,13 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.ContainerPort == p && port.Protocol == svcPort.Protocol { - return findMappedPort(pod, port.Protocol, p) + hostPort, err := findMappedPort(pod, port.Protocol, p) + return hostPort, port.ContainerPort, err } } } } - return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) + return 0, 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) } func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) { diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index 1555b37bb45..39df40e6a86 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -348,12 +348,15 @@ var _ = Describe("Kubectl client", func() { endpoints, err := c.Endpoints(ns).Get(name) Expect(err).NotTo(HaveOccurred()) - ipToPort := getPortsByIp(endpoints.Subsets) - if len(ipToPort) != 1 { - Logf("No IP found, retrying") + uidToPort := getContainerPortsByPodUID(endpoints) + if len(uidToPort) == 0 { + Logf("No endpoint found, retrying") continue } - for _, port := range ipToPort { + if len(uidToPort) > 1 { + Fail("To many endpoints found") + } + for _, port := range uidToPort { if port[0] != redisPort { Failf("Wrong endpoint port: %d", port[0]) } diff --git a/test/e2e/service.go b/test/e2e/service.go index 982f6239a01..5a18e576bea 100644 --- a/test/e2e/service.go +++ b/test/e2e/service.go @@ -22,6 +22,7 @@ import ( "math/rand" "net/http" "sort" + "strconv" "strings" "time" @@ -32,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/client" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util/wait" ) @@ -101,7 +103,7 @@ var _ = Describe("Services", func() { _, err := c.Services(ns).Create(service) Expect(err).NotTo(HaveOccurred()) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) var names []string defer func() { @@ -115,25 +117,25 @@ var _ = Describe("Services", func() { addEndpointPodOrFail(c, ns, name1, labels, []api.ContainerPort{{ContainerPort: 80}}) names = append(names, name1) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name1: {80}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}}) name2 := "test2" addEndpointPodOrFail(c, ns, name2, labels, []api.ContainerPort{{ContainerPort: 80}}) names = append(names, name2) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name1: {80}, name2: {80}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}, name2: {80}}) err = c.Pods(ns).Delete(name1, nil) Expect(err).NotTo(HaveOccurred()) names = []string{name2} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name2: {80}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name2: {80}}) err = c.Pods(ns).Delete(name2, nil) Expect(err).NotTo(HaveOccurred()) names = []string{} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) }) It("should serve multiport endpoints from pods", func() { @@ -175,7 +177,7 @@ var _ = Describe("Services", func() { Expect(err).NotTo(HaveOccurred()) port1 := 100 port2 := 101 - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) var names []string defer func() { @@ -201,35 +203,35 @@ var _ = Describe("Services", func() { podname1 := "podname1" addEndpointPodOrFail(c, ns, podname1, labels, containerPorts1) names = append(names, podname1) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}}) podname2 := "podname2" addEndpointPodOrFail(c, ns, podname2, labels, containerPorts2) names = append(names, podname2) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}, podname2: {port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}}) podname3 := "podname3" addEndpointPodOrFail(c, ns, podname3, labels, append(containerPorts1, containerPorts2...)) names = append(names, podname3) - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}, podname2: {port2}, podname3: {port1, port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}, podname3: {port1, port2}}) err = c.Pods(ns).Delete(podname1, nil) Expect(err).NotTo(HaveOccurred()) names = []string{podname2, podname3} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname2: {port2}, podname3: {port1, port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname2: {port2}, podname3: {port1, port2}}) err = c.Pods(ns).Delete(podname2, nil) Expect(err).NotTo(HaveOccurred()) names = []string{podname3} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname3: {port1, port2}}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname3: {port1, port2}}) err = c.Pods(ns).Delete(podname3, nil) Expect(err).NotTo(HaveOccurred()) names = []string{} - validateEndpointsOrFail(c, ns, serviceName, map[string][]int{}) + validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{}) }) It("should be able to up and down services", func() { @@ -1022,60 +1024,82 @@ func validateUniqueOrFail(s []string) { } } -func getPortsByIp(subsets []api.EndpointSubset) map[string][]int { - m := make(map[string][]int) - for _, ss := range subsets { +func getContainerPortsByPodUID(endpoints *api.Endpoints) PortsByPodUID { + m := PortsByPodUID{} + for _, ss := range endpoints.Subsets { for _, port := range ss.Ports { for _, addr := range ss.Addresses { - Logf("Found IP %v and port %v", addr.IP, port.Port) - if _, ok := m[addr.IP]; !ok { - m[addr.IP] = make([]int, 0) + containerPort := port.Port + hostPort := port.Port + + // use endpoint annotations to recover the container port in a Mesos setup + // compare contrib/mesos/pkg/service/endpoints_controller.syncService + if providerIs("mesos/docker") { + key := fmt.Sprintf("k8s.mesosphere.io/containerPort_%s_%s_%d", port.Protocol, addr.IP, hostPort) + containerPortString := endpoints.Annotations[key] + if containerPortString == "" { + continue + } + var err error + containerPort, err = strconv.Atoi(containerPortString) + if err != nil { + continue + } + Logf("Mapped mesos host port %d to container port %d via annotation %s=%s", hostPort, containerPort, key, containerPortString) } - m[addr.IP] = append(m[addr.IP], port.Port) + + Logf("Found pod %v, host port %d and container port %d", addr.TargetRef.UID, hostPort, containerPort) + if _, ok := m[addr.TargetRef.UID]; !ok { + m[addr.TargetRef.UID] = make([]int, 0) + } + m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], containerPort) } } } return m } -func translatePodNameToIpOrFail(c *client.Client, ns string, expectedEndpoints map[string][]int) map[string][]int { - portsByIp := make(map[string][]int) +type PortsByPodName map[string][]int +type PortsByPodUID map[types.UID][]int + +func translatePodNameToUIDOrFail(c *client.Client, ns string, expectedEndpoints PortsByPodName) PortsByPodUID { + portsByUID := make(PortsByPodUID) for name, portList := range expectedEndpoints { pod, err := c.Pods(ns).Get(name) if err != nil { Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err) } - portsByIp[pod.Status.PodIP] = portList + portsByUID[pod.ObjectMeta.UID] = portList By(fmt.Sprintf("")) } - By(fmt.Sprintf("successfully translated pod names to ips: %v -> %v on namespace %s", expectedEndpoints, portsByIp, ns)) - return portsByIp + By(fmt.Sprintf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns)) + return portsByUID } -func validatePortsOrFail(endpoints map[string][]int, expectedEndpoints map[string][]int) { +func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) { if len(endpoints) != len(expectedEndpoints) { // should not happen because we check this condition before Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints) } - for ip := range expectedEndpoints { - if _, ok := endpoints[ip]; !ok { - Failf("endpoint %v not found", ip) + for podUID := range expectedEndpoints { + if _, ok := endpoints[podUID]; !ok { + Failf("endpoint %v not found", podUID) } - if len(endpoints[ip]) != len(expectedEndpoints[ip]) { - Failf("invalid list of ports for ip %v. Got %v, expected %v", ip, endpoints[ip], expectedEndpoints[ip]) + if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) { + Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID]) } - sort.Ints(endpoints[ip]) - sort.Ints(expectedEndpoints[ip]) - for index := range endpoints[ip] { - if endpoints[ip][index] != expectedEndpoints[ip][index] { - Failf("invalid list of ports for ip %v. Got %v, expected %v", ip, endpoints[ip], expectedEndpoints[ip]) + sort.Ints(endpoints[podUID]) + sort.Ints(expectedEndpoints[podUID]) + for index := range endpoints[podUID] { + if endpoints[podUID][index] != expectedEndpoints[podUID][index] { + Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID]) } } } } -func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints map[string][]int) { +func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints PortsByPodName) { By(fmt.Sprintf("Waiting up to %v for service %s in namespace %s to expose endpoints %v", serviceStartTimeout, serviceName, namespace, expectedEndpoints)) for start := time.Now(); time.Since(start) < serviceStartTimeout; time.Sleep(5 * time.Second) { endpoints, err := c.Endpoints(namespace).Get(serviceName) @@ -1085,16 +1109,17 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex } Logf("Found endpoints %v", endpoints) - portsByIp := getPortsByIp(endpoints.Subsets) - Logf("Found ports by ip %v", portsByIp) + portsByPodUID := getContainerPortsByPodUID(endpoints) + Logf("Found port by pod UID %v", portsByPodUID) - if len(portsByIp) == len(expectedEndpoints) { - expectedPortsByIp := translatePodNameToIpOrFail(c, namespace, expectedEndpoints) - validatePortsOrFail(portsByIp, expectedPortsByIp) + expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints) + if len(portsByPodUID) == len(expectedEndpoints) { + validatePortsOrFail(portsByPodUID, expectedPortsByPodUID) By(fmt.Sprintf("Successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, time.Since(start))) return } - Logf("Unexpected number of endpoints: found %v, expected %v (%v elapsed, ignoring for 5s)", portsByIp, expectedEndpoints, time.Since(start)) + + Logf("Unexpected number of endpoints: found %v, expected %v (%v elapsed, ignoring for 5s)", portsByPodUID, expectedEndpoints, time.Since(start)) } Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, serviceStartTimeout) }