Allow headless svc without ports to have endpoints

As cited in
https://github.com/kubernetes/dns/issues/174 - this is documented to
work, and I don't see why it shouldn't work.  We allowed the definition
of headless services without ports, but apparently nobody tested it very
well.

Manually tested clusterIP services with no ports - validation error.

Manually tested services with negative ports - validation error.

New tests failed, output inspected and verified.  Now pass.
This commit is contained in:
Tim Hockin 2018-08-20 15:32:52 -07:00
parent e9de06d4df
commit 06b785ca52
7 changed files with 109 additions and 33 deletions

View File

@ -38,12 +38,12 @@ func RepackSubsets(subsets []api.EndpointSubset) []api.EndpointSubset {
allAddrs := map[addressKey]*api.EndpointAddress{}
portToAddrReadyMap := map[api.EndpointPort]addressSet{}
for i := range subsets {
for _, port := range subsets[i].Ports {
for k := range subsets[i].Addresses {
mapAddressByPort(&subsets[i].Addresses[k], port, true, allAddrs, portToAddrReadyMap)
}
for k := range subsets[i].NotReadyAddresses {
mapAddressByPort(&subsets[i].NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap)
if len(subsets[i].Ports) == 0 {
// Don't discard endpoints with no ports defined, use a sentinel.
mapAddressesByPort(&subsets[i], api.EndpointPort{Port: -1}, allAddrs, portToAddrReadyMap)
} else {
for _, port := range subsets[i].Ports {
mapAddressesByPort(&subsets[i], port, allAddrs, portToAddrReadyMap)
}
}
}
@ -58,7 +58,14 @@ func RepackSubsets(subsets []api.EndpointSubset) []api.EndpointSubset {
for port, addrs := range portToAddrReadyMap {
key := keyString(hashAddresses(addrs))
keyToAddrReadyMap[key] = addrs
addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port)
if port.Port > 0 { // avoid sentinels
addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port)
} else {
if _, found := addrReadyMapKeyToPorts[key]; !found {
// Force it to be present in the map
addrReadyMapKeyToPorts[key] = nil
}
}
}
// Next, build the N-to-M association the API wants.
@ -85,7 +92,17 @@ type addressKey struct {
uid types.UID
}
// mapAddressByPort adds an address into a map by its ports, registering the address with a unique pointer, and preserving
// mapAddressesByPort adds all ready and not-ready addresses into a map by a single port.
func mapAddressesByPort(subset *api.EndpointSubset, port api.EndpointPort, allAddrs map[addressKey]*api.EndpointAddress, portToAddrReadyMap map[api.EndpointPort]addressSet) {
for k := range subset.Addresses {
mapAddressByPort(&subset.Addresses[k], port, true, allAddrs, portToAddrReadyMap)
}
for k := range subset.NotReadyAddresses {
mapAddressByPort(&subset.NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap)
}
}
// mapAddressByPort adds one address into a map by port, registering the address with a unique pointer, and preserving
// any existing ready state.
func mapAddressByPort(addr *api.EndpointAddress, port api.EndpointPort, ready bool, allAddrs map[addressKey]*api.EndpointAddress, portToAddrReadyMap map[api.EndpointPort]addressSet) *api.EndpointAddress {
// use addressKey to distinguish between two endpoints that are identical addresses

View File

@ -51,11 +51,11 @@ func TestPackSubsets(t *testing.T) {
}, {
name: "empty ports",
given: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{}}},
expect: []api.EndpointSubset{},
expect: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}},
}, {
name: "empty ports",
given: []api.EndpointSubset{{NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{}}},
expect: []api.EndpointSubset{},
expect: []api.EndpointSubset{{NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}},
}, {
name: "one set, one ip, one port",
given: []api.EndpointSubset{{

View File

@ -38,12 +38,12 @@ func RepackSubsets(subsets []v1.EndpointSubset) []v1.EndpointSubset {
allAddrs := map[addressKey]*v1.EndpointAddress{}
portToAddrReadyMap := map[v1.EndpointPort]addressSet{}
for i := range subsets {
for _, port := range subsets[i].Ports {
for k := range subsets[i].Addresses {
mapAddressByPort(&subsets[i].Addresses[k], port, true, allAddrs, portToAddrReadyMap)
}
for k := range subsets[i].NotReadyAddresses {
mapAddressByPort(&subsets[i].NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap)
if len(subsets[i].Ports) == 0 {
// Don't discard endpoints with no ports defined, use a sentinel.
mapAddressesByPort(&subsets[i], v1.EndpointPort{Port: -1}, allAddrs, portToAddrReadyMap)
} else {
for _, port := range subsets[i].Ports {
mapAddressesByPort(&subsets[i], port, allAddrs, portToAddrReadyMap)
}
}
}
@ -58,7 +58,14 @@ func RepackSubsets(subsets []v1.EndpointSubset) []v1.EndpointSubset {
for port, addrs := range portToAddrReadyMap {
key := keyString(hashAddresses(addrs))
keyToAddrReadyMap[key] = addrs
addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port)
if port.Port > 0 { // avoid sentinels
addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port)
} else {
if _, found := addrReadyMapKeyToPorts[key]; !found {
// Force it to be present in the map
addrReadyMapKeyToPorts[key] = nil
}
}
}
// Next, build the N-to-M association the API wants.
@ -85,7 +92,17 @@ type addressKey struct {
uid types.UID
}
// mapAddressByPort adds an address into a map by its ports, registering the address with a unique pointer, and preserving
// mapAddressesByPort adds all ready and not-ready addresses into a map by a single port.
func mapAddressesByPort(subset *v1.EndpointSubset, port v1.EndpointPort, allAddrs map[addressKey]*v1.EndpointAddress, portToAddrReadyMap map[v1.EndpointPort]addressSet) {
for k := range subset.Addresses {
mapAddressByPort(&subset.Addresses[k], port, true, allAddrs, portToAddrReadyMap)
}
for k := range subset.NotReadyAddresses {
mapAddressByPort(&subset.NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap)
}
}
// mapAddressByPort adds one address into a map by port, registering the address with a unique pointer, and preserving
// any existing ready state.
func mapAddressByPort(addr *v1.EndpointAddress, port v1.EndpointPort, ready bool, allAddrs map[addressKey]*v1.EndpointAddress, portToAddrReadyMap map[v1.EndpointPort]addressSet) *v1.EndpointAddress {
// use addressKey to distinguish between two endpoints that are identical addresses

View File

@ -51,11 +51,11 @@ func TestPackSubsets(t *testing.T) {
}, {
name: "empty ports",
given: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []v1.EndpointPort{}}},
expect: []v1.EndpointSubset{},
expect: []v1.EndpointSubset{{Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}},
}, {
name: "empty ports",
given: []v1.EndpointSubset{{NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []v1.EndpointPort{}}},
expect: []v1.EndpointSubset{},
expect: []v1.EndpointSubset{{NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.4"}}, Ports: nil}},
}, {
name: "one set, one ip, one port",
given: []v1.EndpointSubset{{

View File

@ -472,9 +472,9 @@ func (e *EndpointController) syncService(key string) error {
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
subsets = endpoints.RepackSubsets(subsets)
}
}
subsets = endpoints.RepackSubsets(subsets)
// See if there's actually an update here.
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)

View File

@ -863,6 +863,34 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data)
}
func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
defer testServer.Close()
endpoints := newController(testServer.URL)
endpoints.serviceStore.Add(&v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: "None",
Ports: nil,
},
})
addPods(endpoints.podStore, ns, 1, 1, 0)
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 1)
data := runtime.EncodeOrDie(testapi.Default.Codec(), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: nil,
}},
})
endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, ""), "POST", &data)
}
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
// Just list all of the 3 false cases and 3 of the 12 true cases.
func TestShouldPodBeInEndpoints(t *testing.T) {

View File

@ -482,19 +482,33 @@ func formatEndpoints(endpoints *api.Endpoints, ports sets.String) string {
count := 0
for i := range endpoints.Subsets {
ss := &endpoints.Subsets[i]
for i := range ss.Ports {
port := &ss.Ports[i]
if ports == nil || ports.Has(port.Name) {
for i := range ss.Addresses {
if len(list) == max {
more = true
if len(ss.Ports) == 0 {
// It's possible to have headless services with no ports.
for i := range ss.Addresses {
if len(list) == max {
more = true
}
if !more {
list = append(list, ss.Addresses[i].IP)
}
count++
}
} else {
// "Normal" services with ports defined.
for i := range ss.Ports {
port := &ss.Ports[i]
if ports == nil || ports.Has(port.Name) {
for i := range ss.Addresses {
if len(list) == max {
more = true
}
addr := &ss.Addresses[i]
if !more {
hostPort := net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port)))
list = append(list, hostPort)
}
count++
}
addr := &ss.Addresses[i]
if !more {
hostPort := net.JoinHostPort(addr.IP, strconv.Itoa(int(port.Port)))
list = append(list, hostPort)
}
count++
}
}
}