Annotate endpoints in mesos endpoint controller with container ports

The EndpointPort struct only stores one port: the port which is used
to connect to the container from outside. In the case of the Mesos
endpoint controller this is the host port. The container port is not part
of the endpoint structure at all.

A number of e2e tests need the container port information to validate correct
endpoint creation. Therefore this patch annotates the Endpoint struct with a
number of annotations mapping "<HostIP>:<HostPort>" to "<ContainerPort>". In a
follow-up commit these annotations are used to validate endpoints in a Mesos
setup.
This commit is contained in:
Dr. Stefan Schimanski 2015-08-04 10:31:00 +02:00
parent 8848e26154
commit c55e7bf731
2 changed files with 19 additions and 6 deletions

View File

@ -30,4 +30,5 @@ const (
PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d" PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d"
PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_"
PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s" PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s"
ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d"
) )

View File

@ -243,6 +243,7 @@ func (e *endpointController) worker() {
} }
} }
// HACK(sttts): add annotations to the endpoint about the respective container ports
func (e *endpointController) syncService(key string) { func (e *endpointController) syncService(key string) {
startTime := time.Now() startTime := time.Now()
defer func() { defer func() {
@ -287,6 +288,7 @@ func (e *endpointController) syncService(key string) {
} }
subsets := []api.EndpointSubset{} subsets := []api.EndpointSubset{}
containerPortAnnotations := map[string]string{} // by <HostIP>:<Port>
for i := range pods.Items { for i := range pods.Items {
pod := &pods.Items[i] pod := &pods.Items[i]
@ -295,7 +297,7 @@ func (e *endpointController) syncService(key string) {
portName := servicePort.Name portName := servicePort.Name
portProto := servicePort.Protocol portProto := servicePort.Protocol
portNum, err := findPort(pod, servicePort) portNum, containerPort, err := findPort(pod, servicePort)
if err != nil { if err != nil {
glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue continue
@ -320,6 +322,7 @@ func (e *endpointController) syncService(key string) {
ResourceVersion: pod.ObjectMeta.ResourceVersion, ResourceVersion: pod.ObjectMeta.ResourceVersion,
}} }}
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
containerPortAnnotations[fmt.Sprintf(meta.ContainerPortKeyFormat, portProto, pod.Status.HostIP, portNum)] = strconv.Itoa(containerPort)
} }
} }
subsets = endpoints.RepackSubsets(subsets) subsets = endpoints.RepackSubsets(subsets)
@ -348,6 +351,13 @@ func (e *endpointController) syncService(key string) {
newEndpoints.Subsets = subsets newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels newEndpoints.Labels = service.Labels
if newEndpoints.Annotations == nil {
newEndpoints.Annotations = map[string]string{}
}
for hostIpPort, containerPort := range containerPortAnnotations {
newEndpoints.Annotations[hostIpPort] = containerPort
}
if len(currentEndpoints.ResourceVersion) == 0 { if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them // No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
@ -389,8 +399,8 @@ func (e *endpointController) checkLeftoverEndpoints() {
// string up in all named ports in all containers in the target pod. If no // string up in all named ports in all containers in the target pod. If no
// match is found, fail. // match is found, fail.
// //
// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat. // HACK(jdef): return the HostPort in addition to the ContainerPort for generic mesos compatibility
func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, int, error) {
portName := svcPort.TargetPort portName := svcPort.TargetPort
switch portName.Kind { switch portName.Kind {
case util.IntstrString: case util.IntstrString:
@ -398,7 +408,8 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) {
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
for _, port := range container.Ports { for _, port := range container.Ports {
if port.Name == name && port.Protocol == svcPort.Protocol { if port.Name == name && port.Protocol == svcPort.Protocol {
return findMappedPortName(pod, port.Protocol, name) hostPort, err := findMappedPortName(pod, port.Protocol, name)
return hostPort, port.ContainerPort, err
} }
} }
} }
@ -412,12 +423,13 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) {
for _, container := range pod.Spec.Containers { for _, container := range pod.Spec.Containers {
for _, port := range container.Ports { for _, port := range container.Ports {
if port.ContainerPort == p && port.Protocol == svcPort.Protocol { if port.ContainerPort == p && port.Protocol == svcPort.Protocol {
return findMappedPort(pod, port.Protocol, p) hostPort, err := findMappedPort(pod, port.Protocol, p)
return hostPort, port.ContainerPort, err
} }
} }
} }
} }
return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) return 0, 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID)
} }
func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) { func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) {