Merge pull request #12410 from mesosphere/non-unique-endpoint-ip-no-port-names

Fix e2e endpoints tests on Mesos
This commit is contained in:
Alex Robinson 2015-08-10 09:49:55 -07:00
commit 1ad9015566
4 changed files with 93 additions and 52 deletions

View File

@ -30,4 +30,5 @@ const (
PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d"
PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_"
PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s"
ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d"
)

View File

@ -243,6 +243,7 @@ func (e *endpointController) worker() {
}
}
// HACK(sttts): add annotations to the endpoint about the respective container ports
func (e *endpointController) syncService(key string) {
startTime := time.Now()
defer func() {
@ -287,6 +288,7 @@ func (e *endpointController) syncService(key string) {
}
subsets := []api.EndpointSubset{}
containerPortAnnotations := map[string]string{} // by <HostIP>:<Port>
for i := range pods.Items {
pod := &pods.Items[i]
@ -295,7 +297,7 @@ func (e *endpointController) syncService(key string) {
portName := servicePort.Name
portProto := servicePort.Protocol
portNum, err := findPort(pod, servicePort)
portNum, containerPort, err := findPort(pod, servicePort)
if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
@ -320,6 +322,7 @@ func (e *endpointController) syncService(key string) {
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
containerPortAnnotations[fmt.Sprintf(meta.ContainerPortKeyFormat, portProto, pod.Status.HostIP, portNum)] = strconv.Itoa(containerPort)
}
}
subsets = endpoints.RepackSubsets(subsets)
@ -348,6 +351,13 @@ func (e *endpointController) syncService(key string) {
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = map[string]string{}
}
for hostIpPort, containerPort := range containerPortAnnotations {
newEndpoints.Annotations[hostIpPort] = containerPort
}
if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
@ -389,8 +399,8 @@ func (e *endpointController) checkLeftoverEndpoints() {
// string up in all named ports in all containers in the target pod. If no
// match is found, fail.
//
// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat.
func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) {
// HACK(jdef): return the HostPort in addition to the ContainerPort for generic mesos compatibility
func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, int, error) {
portName := svcPort.TargetPort
switch portName.Kind {
case util.IntstrString:
@ -398,7 +408,8 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.Name == name && port.Protocol == svcPort.Protocol {
return findMappedPortName(pod, port.Protocol, name)
hostPort, err := findMappedPortName(pod, port.Protocol, name)
return hostPort, port.ContainerPort, err
}
}
}
@ -412,12 +423,13 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.ContainerPort == p && port.Protocol == svcPort.Protocol {
return findMappedPort(pod, port.Protocol, p)
hostPort, err := findMappedPort(pod, port.Protocol, p)
return hostPort, port.ContainerPort, err
}
}
}
}
return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
return 0, 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
}
func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) {

View File

@ -348,12 +348,15 @@ var _ = Describe("Kubectl client", func() {
endpoints, err := c.Endpoints(ns).Get(name)
Expect(err).NotTo(HaveOccurred())
ipToPort := getPortsByIp(endpoints.Subsets)
if len(ipToPort) != 1 {
Logf("No IP found, retrying")
uidToPort := getContainerPortsByPodUID(endpoints)
if len(uidToPort) == 0 {
Logf("No endpoint found, retrying")
continue
}
for _, port := range ipToPort {
if len(uidToPort) > 1 {
Fail("To many endpoints found")
}
for _, port := range uidToPort {
if port[0] != redisPort {
Failf("Wrong endpoint port: %d", port[0])
}

View File

@ -22,6 +22,7 @@ import (
"math/rand"
"net/http"
"sort"
"strconv"
"strings"
"time"
@ -32,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/client"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/wait"
)
@ -101,7 +103,7 @@ var _ = Describe("Services", func() {
_, err := c.Services(ns).Create(service)
Expect(err).NotTo(HaveOccurred())
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
var names []string
defer func() {
@ -115,25 +117,25 @@ var _ = Describe("Services", func() {
addEndpointPodOrFail(c, ns, name1, labels, []api.ContainerPort{{ContainerPort: 80}})
names = append(names, name1)
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name1: {80}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}})
name2 := "test2"
addEndpointPodOrFail(c, ns, name2, labels, []api.ContainerPort{{ContainerPort: 80}})
names = append(names, name2)
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name1: {80}, name2: {80}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name1: {80}, name2: {80}})
err = c.Pods(ns).Delete(name1, nil)
Expect(err).NotTo(HaveOccurred())
names = []string{name2}
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{name2: {80}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{name2: {80}})
err = c.Pods(ns).Delete(name2, nil)
Expect(err).NotTo(HaveOccurred())
names = []string{}
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
})
It("should serve multiport endpoints from pods", func() {
@ -175,7 +177,7 @@ var _ = Describe("Services", func() {
Expect(err).NotTo(HaveOccurred())
port1 := 100
port2 := 101
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
var names []string
defer func() {
@ -201,35 +203,35 @@ var _ = Describe("Services", func() {
podname1 := "podname1"
addEndpointPodOrFail(c, ns, podname1, labels, containerPorts1)
names = append(names, podname1)
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}})
podname2 := "podname2"
addEndpointPodOrFail(c, ns, podname2, labels, containerPorts2)
names = append(names, podname2)
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}, podname2: {port2}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}})
podname3 := "podname3"
addEndpointPodOrFail(c, ns, podname3, labels, append(containerPorts1, containerPorts2...))
names = append(names, podname3)
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname1: {port1}, podname2: {port2}, podname3: {port1, port2}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname1: {port1}, podname2: {port2}, podname3: {port1, port2}})
err = c.Pods(ns).Delete(podname1, nil)
Expect(err).NotTo(HaveOccurred())
names = []string{podname2, podname3}
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname2: {port2}, podname3: {port1, port2}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname2: {port2}, podname3: {port1, port2}})
err = c.Pods(ns).Delete(podname2, nil)
Expect(err).NotTo(HaveOccurred())
names = []string{podname3}
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{podname3: {port1, port2}})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{podname3: {port1, port2}})
err = c.Pods(ns).Delete(podname3, nil)
Expect(err).NotTo(HaveOccurred())
names = []string{}
validateEndpointsOrFail(c, ns, serviceName, map[string][]int{})
validateEndpointsOrFail(c, ns, serviceName, PortsByPodName{})
})
It("should be able to up and down services", func() {
@ -1022,60 +1024,82 @@ func validateUniqueOrFail(s []string) {
}
}
func getPortsByIp(subsets []api.EndpointSubset) map[string][]int {
m := make(map[string][]int)
for _, ss := range subsets {
func getContainerPortsByPodUID(endpoints *api.Endpoints) PortsByPodUID {
m := PortsByPodUID{}
for _, ss := range endpoints.Subsets {
for _, port := range ss.Ports {
for _, addr := range ss.Addresses {
Logf("Found IP %v and port %v", addr.IP, port.Port)
if _, ok := m[addr.IP]; !ok {
m[addr.IP] = make([]int, 0)
containerPort := port.Port
hostPort := port.Port
// use endpoint annotations to recover the container port in a Mesos setup
// compare contrib/mesos/pkg/service/endpoints_controller.syncService
if providerIs("mesos/docker") {
key := fmt.Sprintf("k8s.mesosphere.io/containerPort_%s_%s_%d", port.Protocol, addr.IP, hostPort)
containerPortString := endpoints.Annotations[key]
if containerPortString == "" {
continue
}
var err error
containerPort, err = strconv.Atoi(containerPortString)
if err != nil {
continue
}
Logf("Mapped mesos host port %d to container port %d via annotation %s=%s", hostPort, containerPort, key, containerPortString)
}
m[addr.IP] = append(m[addr.IP], port.Port)
Logf("Found pod %v, host port %d and container port %d", addr.TargetRef.UID, hostPort, containerPort)
if _, ok := m[addr.TargetRef.UID]; !ok {
m[addr.TargetRef.UID] = make([]int, 0)
}
m[addr.TargetRef.UID] = append(m[addr.TargetRef.UID], containerPort)
}
}
}
return m
}
func translatePodNameToIpOrFail(c *client.Client, ns string, expectedEndpoints map[string][]int) map[string][]int {
portsByIp := make(map[string][]int)
type PortsByPodName map[string][]int
type PortsByPodUID map[types.UID][]int
func translatePodNameToUIDOrFail(c *client.Client, ns string, expectedEndpoints PortsByPodName) PortsByPodUID {
portsByUID := make(PortsByPodUID)
for name, portList := range expectedEndpoints {
pod, err := c.Pods(ns).Get(name)
if err != nil {
Failf("failed to get pod %s, that's pretty weird. validation failed: %s", name, err)
}
portsByIp[pod.Status.PodIP] = portList
portsByUID[pod.ObjectMeta.UID] = portList
By(fmt.Sprintf(""))
}
By(fmt.Sprintf("successfully translated pod names to ips: %v -> %v on namespace %s", expectedEndpoints, portsByIp, ns))
return portsByIp
By(fmt.Sprintf("successfully translated pod names to UIDs: %v -> %v on namespace %s", expectedEndpoints, portsByUID, ns))
return portsByUID
}
func validatePortsOrFail(endpoints map[string][]int, expectedEndpoints map[string][]int) {
func validatePortsOrFail(endpoints PortsByPodUID, expectedEndpoints PortsByPodUID) {
if len(endpoints) != len(expectedEndpoints) {
// should not happen because we check this condition before
Failf("invalid number of endpoints got %v, expected %v", endpoints, expectedEndpoints)
}
for ip := range expectedEndpoints {
if _, ok := endpoints[ip]; !ok {
Failf("endpoint %v not found", ip)
for podUID := range expectedEndpoints {
if _, ok := endpoints[podUID]; !ok {
Failf("endpoint %v not found", podUID)
}
if len(endpoints[ip]) != len(expectedEndpoints[ip]) {
Failf("invalid list of ports for ip %v. Got %v, expected %v", ip, endpoints[ip], expectedEndpoints[ip])
if len(endpoints[podUID]) != len(expectedEndpoints[podUID]) {
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
}
sort.Ints(endpoints[ip])
sort.Ints(expectedEndpoints[ip])
for index := range endpoints[ip] {
if endpoints[ip][index] != expectedEndpoints[ip][index] {
Failf("invalid list of ports for ip %v. Got %v, expected %v", ip, endpoints[ip], expectedEndpoints[ip])
sort.Ints(endpoints[podUID])
sort.Ints(expectedEndpoints[podUID])
for index := range endpoints[podUID] {
if endpoints[podUID][index] != expectedEndpoints[podUID][index] {
Failf("invalid list of ports for uid %v. Got %v, expected %v", podUID, endpoints[podUID], expectedEndpoints[podUID])
}
}
}
}
func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints map[string][]int) {
func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, expectedEndpoints PortsByPodName) {
By(fmt.Sprintf("Waiting up to %v for service %s in namespace %s to expose endpoints %v", serviceStartTimeout, serviceName, namespace, expectedEndpoints))
for start := time.Now(); time.Since(start) < serviceStartTimeout; time.Sleep(5 * time.Second) {
endpoints, err := c.Endpoints(namespace).Get(serviceName)
@ -1085,16 +1109,17 @@ func validateEndpointsOrFail(c *client.Client, namespace, serviceName string, ex
}
Logf("Found endpoints %v", endpoints)
portsByIp := getPortsByIp(endpoints.Subsets)
Logf("Found ports by ip %v", portsByIp)
portsByPodUID := getContainerPortsByPodUID(endpoints)
Logf("Found port by pod UID %v", portsByPodUID)
if len(portsByIp) == len(expectedEndpoints) {
expectedPortsByIp := translatePodNameToIpOrFail(c, namespace, expectedEndpoints)
validatePortsOrFail(portsByIp, expectedPortsByIp)
expectedPortsByPodUID := translatePodNameToUIDOrFail(c, namespace, expectedEndpoints)
if len(portsByPodUID) == len(expectedEndpoints) {
validatePortsOrFail(portsByPodUID, expectedPortsByPodUID)
By(fmt.Sprintf("Successfully validated that service %s in namespace %s exposes endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, time.Since(start)))
return
}
Logf("Unexpected number of endpoints: found %v, expected %v (%v elapsed, ignoring for 5s)", portsByIp, expectedEndpoints, time.Since(start))
Logf("Unexpected number of endpoints: found %v, expected %v (%v elapsed, ignoring for 5s)", portsByPodUID, expectedEndpoints, time.Since(start))
}
Failf("Timed out waiting for service %s in namespace %s to expose endpoints %v (%v elapsed)", serviceName, namespace, expectedEndpoints, serviceStartTimeout)
}