diff --git a/contrib/mesos/pkg/scheduler/meta/annotations.go b/contrib/mesos/pkg/scheduler/meta/annotations.go index dc8dde231e7..7c9ee089380 100644 --- a/contrib/mesos/pkg/scheduler/meta/annotations.go +++ b/contrib/mesos/pkg/scheduler/meta/annotations.go @@ -30,4 +30,5 @@ const ( PortMappingKeyFormat = PortMappingKeyPrefix + "%s_%d" PortNameMappingKeyPrefix = "k8s.mesosphere.io/portName_" PortNameMappingKeyFormat = PortNameMappingKeyPrefix + "%s_%s" + ContainerPortKeyFormat = "k8s.mesosphere.io/containerPort_%s_%s_%d" ) diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 324e459e0ae..ca9489757f3 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -243,6 +243,7 @@ func (e *endpointController) worker() { } } +// HACK(sttts): add annotations to the endpoint about the respective container ports func (e *endpointController) syncService(key string) { startTime := time.Now() defer func() { @@ -287,6 +288,7 @@ func (e *endpointController) syncService(key string) { } subsets := []api.EndpointSubset{} + containerPortAnnotations := map[string]string{} // by : for i := range pods.Items { pod := &pods.Items[i] @@ -295,7 +297,7 @@ func (e *endpointController) syncService(key string) { portName := servicePort.Name portProto := servicePort.Protocol - portNum, err := findPort(pod, servicePort) + portNum, containerPort, err := findPort(pod, servicePort) if err != nil { glog.V(4).Infof("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) continue @@ -320,6 +322,7 @@ func (e *endpointController) syncService(key string) { ResourceVersion: pod.ObjectMeta.ResourceVersion, }} subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) + containerPortAnnotations[fmt.Sprintf(meta.ContainerPortKeyFormat, portProto, pod.Status.HostIP, portNum)] = strconv.Itoa(containerPort) } } subsets = endpoints.RepackSubsets(subsets) @@ -348,6 +351,13 @@ func (e *endpointController) syncService(key string) { newEndpoints.Subsets = subsets newEndpoints.Labels = service.Labels + if newEndpoints.Annotations == nil { + newEndpoints.Annotations = map[string]string{} + } + for hostIpPort, containerPort := range containerPortAnnotations { + newEndpoints.Annotations[hostIpPort] = containerPort + } + if len(currentEndpoints.ResourceVersion) == 0 { // No previous endpoints, create them _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) @@ -389,8 +399,8 @@ func (e *endpointController) checkLeftoverEndpoints() { // string up in all named ports in all containers in the target pod. If no // match is found, fail. // -// HACK(jdef): return the HostPort instead of the ContainerPort for generic mesos compat. -func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { +// HACK(jdef): return the HostPort in addition to the ContainerPort for generic mesos compatibility +func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, int, error) { portName := svcPort.TargetPort switch portName.Kind { case util.IntstrString: @@ -398,7 +408,8 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.Name == name && port.Protocol == svcPort.Protocol { - return findMappedPortName(pod, port.Protocol, name) + hostPort, err := findMappedPortName(pod, port.Protocol, name) + return hostPort, port.ContainerPort, err } } } @@ -412,12 +423,13 @@ func findPort(pod *api.Pod, svcPort *api.ServicePort) (int, error) { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.ContainerPort == p && port.Protocol == svcPort.Protocol { - return findMappedPort(pod, port.Protocol, p) + hostPort, err := findMappedPort(pod, port.Protocol, p) + return hostPort, port.ContainerPort, err } } } } - return 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) + return 0, 0, fmt.Errorf("no suitable port for manifest: %s", pod.UID) } func findMappedPort(pod *api.Pod, protocol api.Protocol, port int) (int, error) {