diff --git a/pkg/api/endpoints/util.go b/pkg/api/endpoints/util.go index 8fa72b56819..c834a18f3f1 100644 --- a/pkg/api/endpoints/util.go +++ b/pkg/api/endpoints/util.go @@ -38,12 +38,12 @@ func RepackSubsets(subsets []api.EndpointSubset) []api.EndpointSubset { allAddrs := map[addressKey]*api.EndpointAddress{} portToAddrReadyMap := map[api.EndpointPort]addressSet{} for i := range subsets { - for _, port := range subsets[i].Ports { - for k := range subsets[i].Addresses { - mapAddressByPort(&subsets[i].Addresses[k], port, true, allAddrs, portToAddrReadyMap) - } - for k := range subsets[i].NotReadyAddresses { - mapAddressByPort(&subsets[i].NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap) + if len(subsets[i].Ports) == 0 { + // Don't discard endpoints with no ports defined, use a sentinel. + mapAddressesByPort(&subsets[i], api.EndpointPort{Port: -1}, allAddrs, portToAddrReadyMap) + } else { + for _, port := range subsets[i].Ports { + mapAddressesByPort(&subsets[i], port, allAddrs, portToAddrReadyMap) } } } @@ -58,7 +58,14 @@ func RepackSubsets(subsets []api.EndpointSubset) []api.EndpointSubset { for port, addrs := range portToAddrReadyMap { key := keyString(hashAddresses(addrs)) keyToAddrReadyMap[key] = addrs - addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port) + if port.Port > 0 { // avoid sentinels + addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port) + } else { + if _, found := addrReadyMapKeyToPorts[key]; !found { + // Force it to be present in the map + addrReadyMapKeyToPorts[key] = nil + } + } } // Next, build the N-to-M association the API wants. @@ -85,7 +92,17 @@ type addressKey struct { uid types.UID } -// mapAddressByPort adds an address into a map by its ports, registering the address with a unique pointer, and preserving +// mapAddressesByPort adds all ready and not-ready addresses into a map by a single port. +func mapAddressesByPort(subset *api.EndpointSubset, port api.EndpointPort, allAddrs map[addressKey]*api.EndpointAddress, portToAddrReadyMap map[api.EndpointPort]addressSet) { + for k := range subset.Addresses { + mapAddressByPort(&subset.Addresses[k], port, true, allAddrs, portToAddrReadyMap) + } + for k := range subset.NotReadyAddresses { + mapAddressByPort(&subset.NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap) + } +} + +// mapAddressByPort adds one address into a map by port, registering the address with a unique pointer, and preserving // any existing ready state. func mapAddressByPort(addr *api.EndpointAddress, port api.EndpointPort, ready bool, allAddrs map[addressKey]*api.EndpointAddress, portToAddrReadyMap map[api.EndpointPort]addressSet) *api.EndpointAddress { // use addressKey to distinguish between two endpoints that are identical addresses diff --git a/pkg/api/endpoints/util_test.go b/pkg/api/endpoints/util_test.go index f35ea9ea5f8..df242a01a10 100644 --- a/pkg/api/endpoints/util_test.go +++ b/pkg/api/endpoints/util_test.go @@ -51,11 +51,11 @@ func TestPackSubsets(t *testing.T) { }, { name: "empty ports", given: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{}}}, - expect: []api.EndpointSubset{}, + expect: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}}, }, { name: "empty ports", given: []api.EndpointSubset{{NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{}}}, - expect: []api.EndpointSubset{}, + expect: []api.EndpointSubset{{NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}}, }, { name: "one set, one ip, one port", given: []api.EndpointSubset{{ diff --git a/pkg/api/v1/endpoints/util.go b/pkg/api/v1/endpoints/util.go index 833af440c32..db551c68a6f 100644 --- a/pkg/api/v1/endpoints/util.go +++ b/pkg/api/v1/endpoints/util.go @@ -38,12 +38,12 @@ func RepackSubsets(subsets []v1.EndpointSubset) []v1.EndpointSubset { allAddrs := map[addressKey]*v1.EndpointAddress{} portToAddrReadyMap := map[v1.EndpointPort]addressSet{} for i := range subsets { - for _, port := range subsets[i].Ports { - for k := range subsets[i].Addresses { - mapAddressByPort(&subsets[i].Addresses[k], port, true, allAddrs, portToAddrReadyMap) - } - for k := range subsets[i].NotReadyAddresses { - mapAddressByPort(&subsets[i].NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap) + if len(subsets[i].Ports) == 0 { + // Don't discard endpoints with no ports defined, use a sentinel. + mapAddressesByPort(&subsets[i], v1.EndpointPort{Port: -1}, allAddrs, portToAddrReadyMap) + } else { + for _, port := range subsets[i].Ports { + mapAddressesByPort(&subsets[i], port, allAddrs, portToAddrReadyMap) } } } @@ -58,7 +58,14 @@ func RepackSubsets(subsets []v1.EndpointSubset) []v1.EndpointSubset { for port, addrs := range portToAddrReadyMap { key := keyString(hashAddresses(addrs)) keyToAddrReadyMap[key] = addrs - addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port) + if port.Port > 0 { // avoid sentinels + addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port) + } else { + if _, found := addrReadyMapKeyToPorts[key]; !found { + // Force it to be present in the map + addrReadyMapKeyToPorts[key] = nil + } + } } // Next, build the N-to-M association the API wants. @@ -85,7 +92,17 @@ type addressKey struct { uid types.UID } -// mapAddressByPort adds an address into a map by its ports, registering the address with a unique pointer, and preserving +// mapAddressesByPort adds all ready and not-ready addresses into a map by a single port. +func mapAddressesByPort(subset *v1.EndpointSubset, port v1.EndpointPort, allAddrs map[addressKey]*v1.EndpointAddress, portToAddrReadyMap map[v1.EndpointPort]addressSet) { + for k := range subset.Addresses { + mapAddressByPort(&subset.Addresses[k], port, true, allAddrs, portToAddrReadyMap) + } + for k := range subset.NotReadyAddresses { + mapAddressByPort(&subset.NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap) + } +} + +// mapAddressByPort adds one address into a map by port, registering the address with a unique pointer, and preserving // any existing ready state. func mapAddressByPort(addr *v1.EndpointAddress, port v1.EndpointPort, ready bool, allAddrs map[addressKey]*v1.EndpointAddress, portToAddrReadyMap map[v1.EndpointPort]addressSet) *v1.EndpointAddress { // use addressKey to distinguish between two endpoints that are identical addresses diff --git a/pkg/api/v1/endpoints/util_test.go b/pkg/api/v1/endpoints/util_test.go index 0a6df65a308..5b50e267612 100644 --- a/pkg/api/v1/endpoints/util_test.go +++ b/pkg/api/v1/endpoints/util_test.go @@ -51,11 +51,11 @@ func TestPackSubsets(t *testing.T) { }, { name: "empty ports", given: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []v1.EndpointPort{}}}, - expect: []v1.EndpointSubset{}, + expect: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}}, }, { name: "empty ports", given: []v1.EndpointSubset{{NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []v1.EndpointPort{}}}, - expect: []v1.EndpointSubset{}, + expect: []v1.EndpointSubset{{NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}}, }, { name: "one set, one ip, one port", given: []v1.EndpointSubset{{ diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index ce6fa2a6154..61c0f920547 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -472,9 +472,9 @@ func (e *EndpointController) syncService(key string) error { totalReadyEps = totalReadyEps + readyEps totalNotReadyEps = totalNotReadyEps + notReadyEps } - subsets = endpoints.RepackSubsets(subsets) } } + subsets = endpoints.RepackSubsets(subsets) // See if there's actually an update here. currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name) diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 0f9f2d319d6..24446913b8c 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -863,6 +863,34 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) } +func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { + ns := metav1.NamespaceDefault + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + endpoints := newController(testServer.URL) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + ClusterIP: "None", + Ports: nil, + }, + }) + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.syncService(ns + "/foo") + endpointsHandler.ValidateRequestCount(t, 1) + data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: nil, + }}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data) +} + // There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here. // Just list all of the 3 false cases and 3 of the 12 true cases. func TestShouldPodBeInEndpoints(t *testing.T) { diff --git a/pkg/printers/internalversion/printers.go b/pkg/printers/internalversion/printers.go index 1ef024267ca..311d8d92c5d 100644 --- a/pkg/printers/internalversion/printers.go +++ b/pkg/printers/internalversion/printers.go @@ -482,19 +482,33 @@ func formatEndpoints(endpoints *api.Endpoints, ports sets.String) string { count := 0 for i := range endpoints.Subsets { ss := &endpoints.Subsets[i] - for i := range ss.Ports { - port := &ss.Ports[i] - if ports == nil || ports.Has(port.Name) { - for i := range ss.Addresses { - if len(list) == max { - more = true + if len(ss.Ports) == 0 { + // It's possible to have headless services with no ports. + for i := range ss.Addresses { + if len(list) == max { + more = true + } + if !more { + list = append(list, ss.Addresses[i].IP) + } + count++ + } + } else { + // "Normal" services with ports defined. + for i := range ss.Ports { + port := &ss.Ports[i] + if ports == nil || ports.Has(port.Name) { + for i := range ss.Addresses { + if len(list) == max { + more = true + } + addr := &ss.Addresses[i] + if !more { + hostPort := net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))) + list = append(list, hostPort) + } + count++ } - addr := &ss.Addresses[i] - if !more { - hostPort := net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port))) - list = append(list, hostPort) - } - count++ } } }