diff --git a/api/swagger-spec/v1.json b/api/swagger-spec/v1.json index f952c0b8534..4d2b210775e 100644 --- a/api/swagger-spec/v1.json +++ b/api/swagger-spec/v1.json @@ -11213,7 +11213,7 @@ "items": { "$ref": "v1.EndpointSubset" }, - "description": "The set of all endpoints is the union of all subsets. Sets of addresses and ports that comprise a service." + "description": "The set of all endpoints is the union of all subsets. Addresses are placed into subsets according to the IPs they share. A single address with multiple ports, some of which are ready and some of which are not (because they come from different containers) will result in the address being displayed in different subsets for the different ports. No address will appear in both Addresses and NotReadyAddresses in the same subset. Sets of addresses and ports that comprise a service." } } }, @@ -11226,7 +11226,14 @@ "items": { "$ref": "v1.EndpointAddress" }, - "description": "IP addresses which offer the related ports." + "description": "IP addresses which offer the related ports that are marked as ready. These endpoints should be considered safe for load balancers and clients to utilize." + }, + "notReadyAddresses": { + "type": "array", + "items": { + "$ref": "v1.EndpointAddress" + }, + "description": "IP addresses which offer the related ports but are not currently marked as ready because they have not yet finished starting, have recently failed a readiness check, or have recently failed a liveness check." }, "ports": { "type": "array", diff --git a/pkg/api/deep_copy_generated.go b/pkg/api/deep_copy_generated.go index 385f36256dc..d8576505138 100644 --- a/pkg/api/deep_copy_generated.go +++ b/pkg/api/deep_copy_generated.go @@ -398,6 +398,16 @@ func deepCopy_api_EndpointSubset(in EndpointSubset, out *EndpointSubset, c *conv } else { out.Addresses = nil } + if in.NotReadyAddresses != nil { + out.NotReadyAddresses = make([]EndpointAddress, len(in.NotReadyAddresses)) + for i := range in.NotReadyAddresses { + if err := deepCopy_api_EndpointAddress(in.NotReadyAddresses[i], &out.NotReadyAddresses[i], c); err != nil { + return err + } + } + } else { + out.NotReadyAddresses = nil + } if in.Ports != nil { out.Ports = make([]EndpointPort, len(in.Ports)) for i := range in.Ports { diff --git a/pkg/api/endpoints/util.go b/pkg/api/endpoints/util.go index 58e661e95f2..bf9e0b2d3e1 100644 --- a/pkg/api/endpoints/util.go +++ b/pkg/api/endpoints/util.go @@ -34,70 +34,97 @@ import ( // form for things like comparison. The result is a newly allocated slice. func RepackSubsets(subsets []api.EndpointSubset) []api.EndpointSubset { // First map each unique port definition to the sets of hosts that - // offer it. The sets of hosts must be de-duped, using IP+UID as the key. - type addressKey struct { - ip string - uid types.UID - } + // offer it. allAddrs := map[addressKey]*api.EndpointAddress{} - portsToAddrs := map[api.EndpointPort]addressSet{} + portToAddrReadyMap := map[api.EndpointPort]addressSet{} for i := range subsets { - for j := range subsets[i].Ports { - epp := &subsets[i].Ports[j] + for _, port := range subsets[i].Ports { for k := range subsets[i].Addresses { - epa := &subsets[i].Addresses[k] - ak := addressKey{ip: epa.IP} - if epa.TargetRef != nil { - ak.uid = epa.TargetRef.UID - } - // Accumulate the address. - if allAddrs[ak] == nil { - // Make a copy so we don't write to the - // input args of this function. - p := &api.EndpointAddress{} - *p = *epa - allAddrs[ak] = p - } - // Remember that this port maps to this address. - if _, found := portsToAddrs[*epp]; !found { - portsToAddrs[*epp] = addressSet{} - } - portsToAddrs[*epp].Insert(allAddrs[ak]) + mapAddressByPort(&subsets[i].Addresses[k], port, true, allAddrs, portToAddrReadyMap) + } + for k := range subsets[i].NotReadyAddresses { + mapAddressByPort(&subsets[i].NotReadyAddresses[k], port, false, allAddrs, portToAddrReadyMap) } } } // Next, map the sets of hosts to the sets of ports they offer. // Go does not allow maps or slices as keys to maps, so we have - // to synthesize and artificial key and do a sort of 2-part + // to synthesize an artificial key and do a sort of 2-part // associative entity. type keyString string - addrSets := map[keyString]addressSet{} - addrSetsToPorts := map[keyString][]api.EndpointPort{} - for epp, addrs := range portsToAddrs { + keyToAddrReadyMap := map[keyString]addressSet{} + addrReadyMapKeyToPorts := map[keyString][]api.EndpointPort{} + for port, addrs := range portToAddrReadyMap { key := keyString(hashAddresses(addrs)) - addrSets[key] = addrs - addrSetsToPorts[key] = append(addrSetsToPorts[key], epp) + keyToAddrReadyMap[key] = addrs + addrReadyMapKeyToPorts[key] = append(addrReadyMapKeyToPorts[key], port) } // Next, build the N-to-M association the API wants. final := []api.EndpointSubset{} - for key, ports := range addrSetsToPorts { - addrs := []api.EndpointAddress{} - for k := range addrSets[key] { - addrs = append(addrs, *k) + for key, ports := range addrReadyMapKeyToPorts { + var readyAddrs, notReadyAddrs []api.EndpointAddress + for addr, ready := range keyToAddrReadyMap[key] { + if ready { + readyAddrs = append(readyAddrs, *addr) + } else { + notReadyAddrs = append(notReadyAddrs, *addr) + } } - final = append(final, api.EndpointSubset{Addresses: addrs, Ports: ports}) + final = append(final, api.EndpointSubset{Addresses: readyAddrs, NotReadyAddresses: notReadyAddrs, Ports: ports}) } // Finally, sort it. return SortSubsets(final) } -type addressSet map[*api.EndpointAddress]struct{} +// The sets of hosts must be de-duped, using IP+UID as the key. +type addressKey struct { + ip string + uid types.UID +} -func (set addressSet) Insert(addr *api.EndpointAddress) { - set[addr] = struct{}{} +// mapAddressByPort adds an address into a map by its ports, registering the address with a unique pointer, and preserving +// any existing ready state. +func mapAddressByPort(addr *api.EndpointAddress, port api.EndpointPort, ready bool, allAddrs map[addressKey]*api.EndpointAddress, portToAddrReadyMap map[api.EndpointPort]addressSet) *api.EndpointAddress { + // use addressKey to distinguish between two endpoints that are identical addresses + // but may have come from different hosts, for attribution. For instance, Mesos + // assigns pods the node IP, but the pods are distinct. + key := addressKey{ip: addr.IP} + if addr.TargetRef != nil { + key.uid = addr.TargetRef.UID + } + + // Accumulate the address. The full EndpointAddress structure is preserved for use when + // we rebuild the subsets so that the final TargetRef has all of the necessary data. + existingAddress := allAddrs[key] + if existingAddress == nil { + // Make a copy so we don't write to the + // input args of this function. + existingAddress = &api.EndpointAddress{} + *existingAddress = *addr + allAddrs[key] = existingAddress + } + + // Remember that this port maps to this address. + if _, found := portToAddrReadyMap[port]; !found { + portToAddrReadyMap[port] = addressSet{} + } + // if we have not yet recorded this port for this address, or if the previous + // state was ready, write the current ready state. not ready always trumps + // ready. + if wasReady, found := portToAddrReadyMap[port][existingAddress]; !found || wasReady { + portToAddrReadyMap[port][existingAddress] = ready + } + return existingAddress +} + +type addressSet map[*api.EndpointAddress]bool + +type addrReady struct { + addr *api.EndpointAddress + ready bool } func hashAddresses(addrs addressSet) string { @@ -105,16 +132,29 @@ func hashAddresses(addrs addressSet) string { // map key. Unfortunately, DeepHashObject is implemented in terms of // spew, and spew does not handle non-primitive map keys well. So // first we collapse it into a slice, sort the slice, then hash that. - slice := []*api.EndpointAddress{} - for k := range addrs { - slice = append(slice, k) + slice := make([]addrReady, 0, len(addrs)) + for k, ready := range addrs { + slice = append(slice, addrReady{k, ready}) } - sort.Sort(addrPtrsByIpAndUID(slice)) + sort.Sort(addrsReady(slice)) hasher := md5.New() util.DeepHashObject(hasher, slice) return hex.EncodeToString(hasher.Sum(nil)[0:]) } +func lessAddrReady(a, b addrReady) bool { + // ready is not significant to hashing since we can't have duplicate addresses + return LessEndpointAddress(a.addr, b.addr) +} + +type addrsReady []addrReady + +func (sl addrsReady) Len() int { return len(sl) } +func (sl addrsReady) Swap(i, j int) { sl[i], sl[j] = sl[j], sl[i] } +func (sl addrsReady) Less(i, j int) bool { + return lessAddrReady(sl[i], sl[j]) +} + func LessEndpointAddress(a, b *api.EndpointAddress) bool { ipComparison := bytes.Compare([]byte(a.IP), []byte(b.IP)) if ipComparison != 0 { @@ -143,6 +183,7 @@ func SortSubsets(subsets []api.EndpointSubset) []api.EndpointSubset { for i := range subsets { ss := &subsets[i] sort.Sort(addrsByIpAndUID(ss.Addresses)) + sort.Sort(addrsByIpAndUID(ss.NotReadyAddresses)) sort.Sort(portsByHash(ss.Ports)) } sort.Sort(subsetsByHash(subsets)) diff --git a/pkg/api/endpoints/util_test.go b/pkg/api/endpoints/util_test.go index 3f047db6da9..a2f960317df 100644 --- a/pkg/api/endpoints/util_test.go +++ b/pkg/api/endpoints/util_test.go @@ -52,6 +52,10 @@ func TestPackSubsets(t *testing.T) { name: "empty ports", given: []api.EndpointSubset{{Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{}}}, expect: []api.EndpointSubset{}, + }, { + name: "empty ports", + given: []api.EndpointSubset{{NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{}}}, + expect: []api.EndpointSubset{}, }, { name: "one set, one ip, one port", given: []api.EndpointSubset{{ @@ -62,6 +66,16 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{{Port: 111}}, }}, + }, { + name: "one set, one notReady ip, one port", + given: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, }, { name: "one set, one ip, one UID, one port", given: []api.EndpointSubset{{ @@ -72,6 +86,16 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, Ports: []api.EndpointPort{{Port: 111}}, }}, + }, { + name: "one set, one notReady ip, one UID, one port", + given: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, }, { name: "one set, one ip, empty UID, one port", given: []api.EndpointSubset{{ @@ -82,6 +106,16 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("")}}, Ports: []api.EndpointPort{{Port: 111}}, }}, + }, { + name: "one set, one notReady ip, empty UID, one port", + given: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, }, { name: "one set, two ips, one port", given: []api.EndpointSubset{{ @@ -92,6 +126,29 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "5.6.7.8"}}, Ports: []api.EndpointPort{{Port: 111}}, }}, + }, { + name: "one set, two mixed ips, one port", + given: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + NotReadyAddresses: []api.EndpointAddress{{IP: "5.6.7.8"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + NotReadyAddresses: []api.EndpointAddress{{IP: "5.6.7.8"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + }, { + name: "one set, two duplicate ips, one port, notReady is covered by ready", + given: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, }, { name: "one set, one ip, two ports", given: []api.EndpointSubset{{ @@ -125,6 +182,23 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &fooObjRef}}, Ports: []api.EndpointPort{{Port: 111}}, }}, + }, { + name: "one set, dup mixed ips with target-refs, one port", + given: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "1.2.3.4", TargetRef: &fooObjRef}, + }, + NotReadyAddresses: []api.EndpointAddress{ + {IP: "1.2.3.4", TargetRef: &barObjRef}, + }, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + // finding the same address twice is considered an error on input, only the first address+port + // reference is preserved + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &fooObjRef}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, }, { name: "one set, one ip, dup ports", given: []api.EndpointSubset{{ @@ -148,6 +222,19 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, Ports: []api.EndpointPort{{Port: 111}}, }}, + }, { + name: "two sets, dup mixed ip, dup port", + given: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, }, { name: "two sets, dup ip, two ports", given: []api.EndpointSubset{{ @@ -174,6 +261,22 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, Ports: []api.EndpointPort{{Port: 111}, {Port: 222}}, }}, + }, { + name: "two sets, dup mixed ip, dup uids, two ports", + given: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 222}}, + }}, + expect: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 222}}, + }}, }, { name: "two sets, two ips, dup port", given: []api.EndpointSubset{{ @@ -243,6 +346,32 @@ func TestPackSubsets(t *testing.T) { }, Ports: []api.EndpointPort{{Port: 111}}, }}, + }, { + name: "two sets, two mixed ips, two dup ip with uid, dup port, wrong order, ends up with split addresses", + given: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "5.6.7.8"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "5.6.7.8", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: podRef("uid-1")}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }}, + expect: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{ + {IP: "5.6.7.8"}, + }, + NotReadyAddresses: []api.EndpointAddress{ + {IP: "1.2.3.4"}, + {IP: "1.2.3.4", TargetRef: podRef("uid-1")}, + {IP: "5.6.7.8", TargetRef: podRef("uid-1")}, + }, + Ports: []api.EndpointPort{{Port: 111}}, + }}, }, { name: "two sets, two ips, two ports", given: []api.EndpointSubset{{ @@ -281,13 +410,35 @@ func TestPackSubsets(t *testing.T) { Addresses: []api.EndpointAddress{{IP: "1.2.3.5"}}, Ports: []api.EndpointPort{{Port: 222}, {Port: 333}}, }}, + }, { + name: "four sets, three mixed ips, three ports, jumbled", + given: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5"}}, + Ports: []api.EndpointPort{{Port: 222}}, + }, { + Addresses: []api.EndpointAddress{{IP: "1.2.3.6"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5"}}, + Ports: []api.EndpointPort{{Port: 333}}, + }}, + expect: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}}, + Ports: []api.EndpointPort{{Port: 111}}, + }, { + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5"}}, + Ports: []api.EndpointPort{{Port: 222}, {Port: 333}}, + }}, }, } for _, tc := range testCases { result := RepackSubsets(tc.given) if !reflect.DeepEqual(result, SortSubsets(tc.expect)) { - t.Errorf("case %q: expected %s, got %s", tc.name, spew.Sprintf("%v", SortSubsets(tc.expect)), spew.Sprintf("%v", result)) + t.Errorf("case %q: expected %s, got %s", tc.name, spew.Sprintf("%#v", SortSubsets(tc.expect)), spew.Sprintf("%#v", result)) } } } diff --git a/pkg/api/types.go b/pkg/api/types.go index 2cda72a3c36..64d214816ce 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1317,8 +1317,9 @@ type Endpoints struct { // a: [ 10.10.1.1:8675, 10.10.2.2:8675 ], // b: [ 10.10.1.1:309, 10.10.2.2:309 ] type EndpointSubset struct { - Addresses []EndpointAddress - Ports []EndpointPort + Addresses []EndpointAddress + NotReadyAddresses []EndpointAddress + Ports []EndpointPort } // EndpointAddress is a tuple that describes single IP address. diff --git a/pkg/api/v1/conversion_generated.go b/pkg/api/v1/conversion_generated.go index 09be630057e..6ad573edf92 100644 --- a/pkg/api/v1/conversion_generated.go +++ b/pkg/api/v1/conversion_generated.go @@ -463,6 +463,16 @@ func convert_api_EndpointSubset_To_v1_EndpointSubset(in *api.EndpointSubset, out } else { out.Addresses = nil } + if in.NotReadyAddresses != nil { + out.NotReadyAddresses = make([]EndpointAddress, len(in.NotReadyAddresses)) + for i := range in.NotReadyAddresses { + if err := convert_api_EndpointAddress_To_v1_EndpointAddress(&in.NotReadyAddresses[i], &out.NotReadyAddresses[i], s); err != nil { + return err + } + } + } else { + out.NotReadyAddresses = nil + } if in.Ports != nil { out.Ports = make([]EndpointPort, len(in.Ports)) for i := range in.Ports { @@ -2888,6 +2898,16 @@ func convert_v1_EndpointSubset_To_api_EndpointSubset(in *EndpointSubset, out *ap } else { out.Addresses = nil } + if in.NotReadyAddresses != nil { + out.NotReadyAddresses = make([]api.EndpointAddress, len(in.NotReadyAddresses)) + for i := range in.NotReadyAddresses { + if err := convert_v1_EndpointAddress_To_api_EndpointAddress(&in.NotReadyAddresses[i], &out.NotReadyAddresses[i], s); err != nil { + return err + } + } + } else { + out.NotReadyAddresses = nil + } if in.Ports != nil { out.Ports = make([]api.EndpointPort, len(in.Ports)) for i := range in.Ports { diff --git a/pkg/api/v1/deep_copy_generated.go b/pkg/api/v1/deep_copy_generated.go index 17ba6023fea..1376e12b393 100644 --- a/pkg/api/v1/deep_copy_generated.go +++ b/pkg/api/v1/deep_copy_generated.go @@ -413,6 +413,16 @@ func deepCopy_v1_EndpointSubset(in EndpointSubset, out *EndpointSubset, c *conve } else { out.Addresses = nil } + if in.NotReadyAddresses != nil { + out.NotReadyAddresses = make([]EndpointAddress, len(in.NotReadyAddresses)) + for i := range in.NotReadyAddresses { + if err := deepCopy_v1_EndpointAddress(in.NotReadyAddresses[i], &out.NotReadyAddresses[i], c); err != nil { + return err + } + } + } else { + out.NotReadyAddresses = nil + } if in.Ports != nil { out.Ports = make([]EndpointPort, len(in.Ports)) for i := range in.Ports { diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 0d15746a596..17258ae4fe7 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -1643,7 +1643,12 @@ type Endpoints struct { // More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata ObjectMeta `json:"metadata,omitempty"` - // The set of all endpoints is the union of all subsets. + // The set of all endpoints is the union of all subsets. Addresses are placed into + // subsets according to the IPs they share. A single address with multiple ports, + // some of which are ready and some of which are not (because they come from + // different containers) will result in the address being displayed in different + // subsets for the different ports. No address will appear in both Addresses and + // NotReadyAddresses in the same subset. // Sets of addresses and ports that comprise a service. Subsets []EndpointSubset `json:"subsets"` } @@ -1659,8 +1664,13 @@ type Endpoints struct { // a: [ 10.10.1.1:8675, 10.10.2.2:8675 ], // b: [ 10.10.1.1:309, 10.10.2.2:309 ] type EndpointSubset struct { - // IP addresses which offer the related ports. + // IP addresses which offer the related ports that are marked as ready. These endpoints + // should be considered safe for load balancers and clients to utilize. Addresses []EndpointAddress `json:"addresses,omitempty"` + // IP addresses which offer the related ports but are not currently marked as ready + // because they have not yet finished starting, have recently failed a readiness check, + // or have recently failed a liveness check. + NotReadyAddresses []EndpointAddress `json:"notReadyAddresses,omitempty"` // Port numbers available on the related IP addresses. Ports []EndpointPort `json:"ports,omitempty"` } diff --git a/pkg/api/v1/types_swagger_doc_generated.go b/pkg/api/v1/types_swagger_doc_generated.go index 23fb848fda7..e1b9544f44a 100644 --- a/pkg/api/v1/types_swagger_doc_generated.go +++ b/pkg/api/v1/types_swagger_doc_generated.go @@ -292,9 +292,10 @@ func (EndpointPort) SwaggerDoc() map[string]string { } var map_EndpointSubset = map[string]string{ - "": "EndpointSubset is a group of addresses with a common set of ports. The expanded set of endpoints is the Cartesian product of Addresses x Ports. For example, given:\n {\n Addresses: [{\"ip\": \"10.10.1.1\"}, {\"ip\": \"10.10.2.2\"}],\n Ports: [{\"name\": \"a\", \"port\": 8675}, {\"name\": \"b\", \"port\": 309}]\n }\nThe resulting set of endpoints can be viewed as:\n a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],\n b: [ 10.10.1.1:309, 10.10.2.2:309 ]", - "addresses": "IP addresses which offer the related ports.", - "ports": "Port numbers available on the related IP addresses.", + "": "EndpointSubset is a group of addresses with a common set of ports. The expanded set of endpoints is the Cartesian product of Addresses x Ports. For example, given:\n {\n Addresses: [{\"ip\": \"10.10.1.1\"}, {\"ip\": \"10.10.2.2\"}],\n Ports: [{\"name\": \"a\", \"port\": 8675}, {\"name\": \"b\", \"port\": 309}]\n }\nThe resulting set of endpoints can be viewed as:\n a: [ 10.10.1.1:8675, 10.10.2.2:8675 ],\n b: [ 10.10.1.1:309, 10.10.2.2:309 ]", + "addresses": "IP addresses which offer the related ports that are marked as ready. These endpoints should be considered safe for load balancers and clients to utilize.", + "notReadyAddresses": "IP addresses which offer the related ports but are not currently marked as ready because they have not yet finished starting, have recently failed a readiness check, or have recently failed a liveness check.", + "ports": "Port numbers available on the related IP addresses.", } func (EndpointSubset) SwaggerDoc() map[string]string { @@ -304,7 +305,7 @@ func (EndpointSubset) SwaggerDoc() map[string]string { var map_Endpoints = map[string]string{ "": "Endpoints is a collection of endpoints that implement the actual service. Example:\n Name: \"mysvc\",\n Subsets: [\n {\n Addresses: [{\"ip\": \"10.10.1.1\"}, {\"ip\": \"10.10.2.2\"}],\n Ports: [{\"name\": \"a\", \"port\": 8675}, {\"name\": \"b\", \"port\": 309}]\n },\n {\n Addresses: [{\"ip\": \"10.10.3.3\"}],\n Ports: [{\"name\": \"a\", \"port\": 93}, {\"name\": \"b\", \"port\": 76}]\n },\n ]", "metadata": "Standard object's metadata. More info: http://releases.k8s.io/HEAD/docs/devel/api-conventions.md#metadata", - "subsets": "The set of all endpoints is the union of all subsets. Sets of addresses and ports that comprise a service.", + "subsets": "The set of all endpoints is the union of all subsets. Addresses are placed into subsets according to the IPs they share. A single address with multiple ports, some of which are ready and some of which are not (because they come from different containers) will result in the address being displayed in different subsets for the different ports. No address will appear in both Addresses and NotReadyAddresses in the same subset. Sets of addresses and ports that comprise a service.", } func (Endpoints) SwaggerDoc() map[string]string { diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index ab06f869a51..f2332e885ee 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -319,11 +319,6 @@ func (e *EndpointController) syncService(key string) { continue } - if !api.IsPodReady(pod) { - glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) - continue - } - epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto} epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{ Kind: "Pod", @@ -332,7 +327,18 @@ func (e *EndpointController) syncService(key string) { UID: pod.ObjectMeta.UID, ResourceVersion: pod.ObjectMeta.ResourceVersion, }} - subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) + if api.IsPodReady(pod) { + subsets = append(subsets, api.EndpointSubset{ + Addresses: []api.EndpointAddress{epa}, + Ports: []api.EndpointPort{epp}, + }) + } else { + glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) + subsets = append(subsets, api.EndpointSubset{ + NotReadyAddresses: []api.EndpointAddress{epa}, + Ports: []api.EndpointPort{epp}, + }) + } } } subsets = endpoints.RepackSubsets(subsets) diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 9fb4e1bae55..14993a55b25 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -32,8 +32,8 @@ import ( "k8s.io/kubernetes/pkg/util" ) -func addPods(store cache.Store, namespace string, nPods int, nPorts int) { - for i := 0; i < nPods; i++ { +func addPods(store cache.Store, namespace string, nPods int, nPorts int, nNotReady int) { + for i := 0; i < nPods+nNotReady; i++ { p := &api.Pod{ TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()}, ObjectMeta: api.ObjectMeta{ @@ -54,6 +54,9 @@ func addPods(store cache.Store, namespace string, nPods int, nPorts int) { }, }, } + if i >= nPods { + p.Status.Conditions[0].Status = api.ConditionFalse + } for j := 0; j < nPorts; j++ { p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: 8080 + j}) @@ -298,7 +301,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) endpoints := NewEndpointController(client) - addPods(endpoints.podStore.Store, ns, 1, 1) + addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -321,6 +324,81 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) } +func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, + serverResponse{http.StatusOK, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []api.EndpointSubset{}, + }}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) + endpoints := NewEndpointController(client) + addPods(endpoints.podStore.Store, ns, 0, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []api.EndpointSubset{{ + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} + +func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, + serverResponse{http.StatusOK, &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []api.EndpointSubset{}, + }}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) + endpoints := NewEndpointController(client) + addPods(endpoints.podStore.Store, ns, 1, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") + data := runtime.EncodeOrDie(testapi.Default.Codec(), &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, + NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}}, + Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) + endpointsHandler.ValidateRequest(t, testapi.Default.ResourcePath("endpoints", ns, "foo"), "PUT", &data) +} + func TestSyncEndpointsItemsPreexisting(t *testing.T) { ns := "bar" testServer, endpointsHandler := makeTestServer(t, ns, @@ -338,7 +416,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) endpoints := NewEndpointController(client) - addPods(endpoints.podStore.Store, ns, 1, 1) + addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -378,7 +456,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) endpoints := NewEndpointController(client) - addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1) + addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, Spec: api.ServiceSpec{ @@ -397,8 +475,8 @@ func TestSyncEndpointsItems(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) endpoints := NewEndpointController(client) - addPods(endpoints.podStore.Store, ns, 3, 2) - addPods(endpoints.podStore.Store, "blah", 5, 2) // make sure these aren't found! + addPods(endpoints.podStore.Store, ns, 3, 2, 0) + addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found! endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -439,7 +517,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) endpoints := NewEndpointController(client) - addPods(endpoints.podStore.Store, ns, 3, 2) + addPods(endpoints.podStore.Store, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{ @@ -499,7 +577,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) endpoints := NewEndpointController(client) - addPods(endpoints.podStore.Store, ns, 1, 1) + addPods(endpoints.podStore.Store, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{ diff --git a/pkg/kubectl/rolling_updater.go b/pkg/kubectl/rolling_updater.go index 5aa4d63e387..0d86b4294de 100644 --- a/pkg/kubectl/rolling_updater.go +++ b/pkg/kubectl/rolling_updater.go @@ -353,7 +353,7 @@ func (r *RollingUpdater) scaleAndWaitWithScaler(rc *api.ReplicationController, r // pollForReadyPods polls oldRc and newRc each interval and returns the old // and new ready counts for their pods. If a pod is observed as being ready, -// it's considered ready even if it later becomes unready. +// it's considered ready even if it later becomes notReady. func (r *RollingUpdater) pollForReadyPods(interval, timeout time.Duration, oldRc, newRc *api.ReplicationController) (int, int, error) { controllers := []*api.ReplicationController{oldRc, newRc} oldReady := 0 diff --git a/pkg/kubelet/container/helpers.go b/pkg/kubelet/container/helpers.go index 346c0fdd9dc..0edb18e46ef 100644 --- a/pkg/kubelet/container/helpers.go +++ b/pkg/kubelet/container/helpers.go @@ -62,7 +62,7 @@ func ShouldContainerBeRestarted(container *api.Container, pod *api.Pod, podStatu } } - // Set dead containers to unready state. + // Set dead containers to notReady state. for _, c := range resultStatus { readinessManager.RemoveReadiness(TrimRuntimePrefix(c.ContainerID)) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index ece84197f53..4676d6f3981 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2119,26 +2119,26 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase { } } -// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an unready condition. +// getPodReadyCondition returns ready condition if all containers in a pod are ready, else it returns an notReady condition. func getPodReadyCondition(spec *api.PodSpec, statuses []api.ContainerStatus) []api.PodCondition { ready := []api.PodCondition{{ Type: api.PodReady, Status: api.ConditionTrue, }} - unready := []api.PodCondition{{ + notReady := []api.PodCondition{{ Type: api.PodReady, Status: api.ConditionFalse, }} if statuses == nil { - return unready + return notReady } for _, container := range spec.Containers { if containerStatus, ok := api.GetContainerStatus(statuses, container.Name); ok { if !containerStatus.Ready { - return unready + return notReady } } else { - return unready + return notReady } } return ready diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f10ae37c4b2..561b0a22293 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -1737,7 +1737,7 @@ func TestGetPodReadyCondition(t *testing.T) { Type: api.PodReady, Status: api.ConditionTrue, }} - unready := []api.PodCondition{{ + notReady := []api.PodCondition{{ Type: api.PodReady, Status: api.ConditionFalse, }} @@ -1749,7 +1749,7 @@ func TestGetPodReadyCondition(t *testing.T) { { spec: nil, info: nil, - expected: unready, + expected: notReady, }, { spec: &api.PodSpec{}, @@ -1763,7 +1763,7 @@ func TestGetPodReadyCondition(t *testing.T) { }, }, info: []api.ContainerStatus{}, - expected: unready, + expected: notReady, }, { spec: &api.PodSpec{ @@ -1799,7 +1799,7 @@ func TestGetPodReadyCondition(t *testing.T) { info: []api.ContainerStatus{ getReadyStatus("1234"), }, - expected: unready, + expected: notReady, }, { spec: &api.PodSpec{ @@ -1812,7 +1812,7 @@ func TestGetPodReadyCondition(t *testing.T) { getReadyStatus("1234"), getNotReadyStatus("5678"), }, - expected: unready, + expected: notReady, }, } diff --git a/test/e2e/kubectl.go b/test/e2e/kubectl.go index ba9df47c37b..4ab198f463c 100644 --- a/test/e2e/kubectl.go +++ b/test/e2e/kubectl.go @@ -33,9 +33,11 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + apierrs "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/wait" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -371,18 +373,20 @@ var _ = Describe("Kubectl client", func() { lookForStringInLog(ns, pod.Name, "redis-master", "The server is now ready to accept connections", podStartTimeout) }) validateService := func(name string, servicePort int, timeout time.Duration) { - endpointFound := false - for t := time.Now(); time.Since(t) < timeout; time.Sleep(poll) { + err := wait.Poll(poll, timeout, func() (bool, error) { endpoints, err := c.Endpoints(ns).Get(name) if err != nil { - Logf("Get endpoints failed (%v elapsed, ignoring for %v): %v", time.Since(t), poll, err) - continue + if apierrs.IsNotFound(err) { + err = nil + } + Logf("Get endpoints failed (interval %v): %v", poll, err) + return false, err } uidToPort := getContainerPortsByPodUID(endpoints) if len(uidToPort) == 0 { Logf("No endpoint found, retrying") - continue + return false, nil } if len(uidToPort) > 1 { Fail("Too many endpoints found") @@ -392,12 +396,10 @@ var _ = Describe("Kubectl client", func() { Failf("Wrong endpoint port: %d", port[0]) } } - endpointFound = true - break - } - if !endpointFound { - Failf("1 endpoint is expected") - } + return true, nil + }) + Expect(err).NotTo(HaveOccurred()) + service, err := c.Services(ns).Get(name) Expect(err).NotTo(HaveOccurred()) diff --git a/test/e2e/util.go b/test/e2e/util.go index 239738f6f7e..8bf253e7677 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -671,12 +671,22 @@ func waitForRCPodToDisappear(c *client.Client, ns, rcName, podName string) error func waitForService(c *client.Client, namespace, name string, exist bool, interval, timeout time.Duration) error { err := wait.Poll(interval, timeout, func() (bool, error) { _, err := c.Services(namespace).Get(name) - if err != nil { - Logf("Get service %s in namespace %s failed (%v).", name, namespace, err) - return !exist, nil - } else { + switch { + case err == nil: + if !exist { + return false, nil + } Logf("Service %s in namespace %s found.", name, namespace) - return exist, nil + return true, nil + case apierrs.IsNotFound(err): + if exist { + return false, nil + } + Logf("Service %s in namespace %s disappeared.", name, namespace) + return true, nil + default: + Logf("Get service %s in namespace %s failed: %v", name, namespace, err) + return false, nil } }) if err != nil {