From 9bd857ca047cbb5dc86b3232c37a7e0df4416535 Mon Sep 17 00:00:00 2001 From: Swetha Repakula Date: Tue, 6 Jul 2021 11:26:46 -0700 Subject: [PATCH] Truncate endpoints over a 1000 addresses * set `endpoints.kubernetes.io/over-capacity` to "truncated" when number of addresses has been truncated to a 1000 * ready addresses are prioritized over non-ready addresses * addresses are proportionally truncated across subsets --- .../endpoint/endpoints_controller.go | 98 +++++++-- .../endpoint/endpoints_controller_test.go | 194 +++++++++++++++--- 2 files changed, 244 insertions(+), 48 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 8e8c16d875a..87c515f2f22 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -19,6 +19,7 @@ package endpoint import ( "context" "fmt" + "math" "strconv" "time" @@ -78,6 +79,11 @@ const ( // This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it // subsequent releases. It will be removed no sooner than 1.13. TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints" + + // truncated is a possible value for `endpoints.kubernetes.io/over-capacity` annotation on an + // endpoint resource and indicates that the number of endpoints have been truncated to + // maxCapacity + truncated = "truncated" ) // NewEndpointController returns a new *Controller. @@ -534,8 +540,8 @@ func (e *Controller) syncService(key string) error { delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime) } - if overCapacity(newEndpoints.Subsets) { - newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning" + if truncateEndpoints(newEndpoints) { + newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated } else { delete(newEndpoints.Annotations, v1.EndpointsOverCapacity) } @@ -659,23 +665,85 @@ func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.E return epp } -// overCapacity returns true if there are more addresses in the provided subsets -// than the maxCapacity. -func overCapacity(subsets []v1.EndpointSubset) bool { +// capacityAnnotationSetCorrectly returns false if number of endpoints is greater than maxCapacity or +// returns true if underCapacity and the annotation is not set. +func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool { numEndpoints := 0 for _, subset := range subsets { numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses) } - return numEndpoints > maxCapacity -} - -// capacityAnnotationSetCorrectly returns true if overCapacity() is true and the -// EndpointsOverCapacity annotation is set to "warning" or if overCapacity() -// is false and the annotation is not set. -func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool { - val, ok := annotations[v1.EndpointsOverCapacity] - if overCapacity(subsets) { - return ok && val == "warning" + if numEndpoints > maxCapacity { + // If subsets are over capacity, they must be truncated so consider + // the annotation as not set correctly + return false } + _, ok := annotations[v1.EndpointsOverCapacity] return !ok } + +// truncateEndpoints by best effort will distribute the endpoints over the subsets based on the proportion +// of endpoints per subset and will prioritize Ready Endpoints over NotReady Endpoints. +func truncateEndpoints(endpoints *v1.Endpoints) bool { + totalReady := 0 + totalNotReady := 0 + for _, subset := range endpoints.Subsets { + totalReady += len(subset.Addresses) + totalNotReady += len(subset.NotReadyAddresses) + } + + if totalReady+totalNotReady <= maxCapacity { + return false + } + + truncateReady := false + max := maxCapacity - totalReady + numTotal := totalNotReady + if totalReady > maxCapacity { + truncateReady = true + max = maxCapacity + numTotal = totalReady + } + canBeAdded := max + + for i := range endpoints.Subsets { + subset := endpoints.Subsets[i] + numInSubset := len(subset.Addresses) + if !truncateReady { + numInSubset = len(subset.NotReadyAddresses) + } + + // The number of endpoints per subset will be based on the propotion of endpoints + // in this subset versus the total number of endpoints. The proportion of endpoints + // will be rounded up which most likely will lead to the last subset having less + // endpoints than the expected proportion. + toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(max))) + // If there is not enough endpoints for the last subset, ensure only the number up + // to the capacity are added + if toBeAdded > canBeAdded { + toBeAdded = canBeAdded + } + + if truncateReady { + // Truncate ready Addresses to allocated proportion and truncate all not ready + // addresses + subset.Addresses = addressSubset(subset.Addresses, toBeAdded) + subset.NotReadyAddresses = []v1.EndpointAddress{} + canBeAdded -= len(subset.Addresses) + } else { + // Only truncate the not ready addresses + subset.NotReadyAddresses = addressSubset(subset.NotReadyAddresses, toBeAdded) + canBeAdded -= len(subset.NotReadyAddresses) + } + endpoints.Subsets[i] = subset + } + return true +} + +// addressSubset takes a list of addresses and returns a subset if the length is greater +// than the maxNum. If less than the maxNum, the entire list is returned. +func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddress { + if len(addresses) <= maxNum { + return addresses + } + return addresses[0:maxNum] +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 3af2fcf9a47..3edb2e81203 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -2008,41 +2008,85 @@ func TestSyncEndpointsServiceNotFound(t *testing.T) { func TestSyncServiceOverCapacity(t *testing.T) { testCases := []struct { - name string - startingAnnotation *string - numExisting int - numDesired int - expectedAnnotation bool + name string + startingAnnotation *string + numExisting int + numDesired int + numDesiredNotReady int + numExpectedReady int + numExpectedNotReady int + expectedAnnotation bool }{{ - name: "empty", - startingAnnotation: nil, - numExisting: 0, - numDesired: 0, - expectedAnnotation: false, + name: "empty", + startingAnnotation: nil, + numExisting: 0, + numDesired: 0, + numExpectedReady: 0, + numExpectedNotReady: 0, + expectedAnnotation: false, }, { - name: "annotation added past capacity", - startingAnnotation: nil, - numExisting: maxCapacity - 1, - numDesired: maxCapacity + 1, - expectedAnnotation: true, + name: "annotation added past capacity, < than maxCapacity of Ready Addresses", + startingAnnotation: nil, + numExisting: maxCapacity - 1, + numDesired: maxCapacity - 3, + numDesiredNotReady: 4, + numExpectedReady: maxCapacity - 3, + numExpectedNotReady: 3, + expectedAnnotation: true, }, { - name: "annotation removed below capacity", - startingAnnotation: utilpointer.StringPtr("warning"), - numExisting: maxCapacity - 1, - numDesired: maxCapacity - 1, - expectedAnnotation: false, + name: "annotation added past capacity, maxCapacity of Ready Addresses ", + startingAnnotation: nil, + numExisting: maxCapacity - 1, + numDesired: maxCapacity, + numDesiredNotReady: 10, + numExpectedReady: maxCapacity, + numExpectedNotReady: 0, + expectedAnnotation: true, }, { - name: "annotation removed at capacity", - startingAnnotation: utilpointer.StringPtr("warning"), - numExisting: maxCapacity, - numDesired: maxCapacity, - expectedAnnotation: false, + name: "annotation removed below capacity", + startingAnnotation: utilpointer.StringPtr("truncated"), + numExisting: maxCapacity - 1, + numDesired: maxCapacity - 1, + numDesiredNotReady: 0, + numExpectedReady: maxCapacity - 1, + numExpectedNotReady: 0, + expectedAnnotation: false, }, { - name: "no endpoints change, annotation value corrected", - startingAnnotation: utilpointer.StringPtr("invalid"), - numExisting: maxCapacity + 1, - numDesired: maxCapacity + 1, - expectedAnnotation: true, + name: "annotation was set to warning previously, annotation removed at capacity", + startingAnnotation: utilpointer.StringPtr("warning"), + numExisting: maxCapacity, + numDesired: maxCapacity, + numDesiredNotReady: 0, + numExpectedReady: maxCapacity, + numExpectedNotReady: 0, + expectedAnnotation: false, + }, { + name: "annotation was set to warning previously but still over capacity", + startingAnnotation: utilpointer.StringPtr("warning"), + numExisting: maxCapacity + 1, + numDesired: maxCapacity + 1, + numDesiredNotReady: 0, + numExpectedReady: maxCapacity, + numExpectedNotReady: 0, + expectedAnnotation: true, + }, { + name: "annotation removed at capacity", + startingAnnotation: utilpointer.StringPtr("truncated"), + numExisting: maxCapacity, + numDesired: maxCapacity, + numDesiredNotReady: 0, + numExpectedReady: maxCapacity, + numExpectedNotReady: 0, + expectedAnnotation: false, + }, { + name: "no endpoints change, annotation value corrected", + startingAnnotation: utilpointer.StringPtr("invalid"), + numExisting: maxCapacity + 1, + numDesired: maxCapacity + 1, + numDesiredNotReady: 0, + numExpectedReady: maxCapacity, + numExpectedNotReady: 0, + expectedAnnotation: true, }} for _, tc := range testCases { @@ -2050,7 +2094,7 @@ func TestSyncServiceOverCapacity(t *testing.T) { ns := "test" client, c := newFakeController(0 * time.Second) - addPods(c.podStore, ns, tc.numDesired, 1, 0, ipv4only) + addPods(c.podStore, ns, tc.numDesired, 1, tc.numDesiredNotReady, ipv4only) pods := c.podStore.List() svc := &v1.Service{ @@ -2094,14 +2138,98 @@ func TestSyncServiceOverCapacity(t *testing.T) { if tc.expectedAnnotation { if !ok { t.Errorf("Expected EndpointsOverCapacity annotation to be set") - } else if actualAnnotation != "warning" { - t.Errorf("Expected EndpointsOverCapacity annotation to be 'warning', got %s", actualAnnotation) + } else if actualAnnotation != "truncated" { + t.Errorf("Expected EndpointsOverCapacity annotation to be 'truncated', got %s", actualAnnotation) } } else { if ok { t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation) } } + numActualReady := 0 + numActualNotReady := 0 + for _, subset := range actualEndpoints.Subsets { + numActualReady += len(subset.Addresses) + numActualNotReady += len(subset.NotReadyAddresses) + } + if numActualReady != tc.numExpectedReady { + t.Errorf("Unexpected number of actual ready Endpoints: got %d endpoints, want %d endpoints", numActualReady, tc.numExpectedReady) + } + if numActualNotReady != tc.numExpectedNotReady { + t.Errorf("Unexpected number of actual not ready Endpoints: got %d endpoints, want %d endpoints", numActualNotReady, tc.numExpectedNotReady) + } + }) + } +} + +func TestTruncateEndpoints(t *testing.T) { + testCases := []struct { + desc string + // subsetsReady, subsetsNotReady, expectedReady, expectedNotReady + // must all be the same length + subsetsReady []int + subsetsNotReady []int + expectedReady []int + expectedNotReady []int + }{{ + desc: "empty", + subsetsReady: []int{}, + subsetsNotReady: []int{}, + expectedReady: []int{}, + expectedNotReady: []int{}, + }, { + desc: "total endpoints < max capacity", + subsetsReady: []int{50, 100, 100, 100, 100}, + subsetsNotReady: []int{50, 100, 100, 100, 100}, + expectedReady: []int{50, 100, 100, 100, 100}, + expectedNotReady: []int{50, 100, 100, 100, 100}, + }, { + desc: "total endpoints = max capacity", + subsetsReady: []int{100, 100, 100, 100, 100}, + subsetsNotReady: []int{100, 100, 100, 100, 100}, + expectedReady: []int{100, 100, 100, 100, 100}, + expectedNotReady: []int{100, 100, 100, 100, 100}, + }, { + desc: "total ready endpoints < max capacity, but total endpoints > max capacity", + subsetsReady: []int{90, 110, 50, 10, 20}, + subsetsNotReady: []int{101, 200, 200, 201, 298}, + expectedReady: []int{90, 110, 50, 10, 20}, + expectedNotReady: []int{73, 144, 144, 145, 214}, + }, { + desc: "total ready endpoints > max capacity", + subsetsReady: []int{205, 400, 402, 400, 693}, + subsetsNotReady: []int{100, 200, 200, 200, 300}, + expectedReady: []int{98, 191, 192, 191, 328}, + expectedNotReady: []int{0, 0, 0, 0, 0}, + }} + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + var subsets []v1.EndpointSubset + for subsetIndex, numReady := range tc.subsetsReady { + subset := v1.EndpointSubset{} + for i := 0; i < numReady; i++ { + subset.Addresses = append(subset.Addresses, v1.EndpointAddress{}) + } + + numNotReady := tc.subsetsNotReady[subsetIndex] + for i := 0; i < numNotReady; i++ { + subset.NotReadyAddresses = append(subset.NotReadyAddresses, v1.EndpointAddress{}) + } + subsets = append(subsets, subset) + } + + endpoints := &v1.Endpoints{Subsets: subsets} + truncateEndpoints(endpoints) + + for i, subset := range endpoints.Subsets { + if len(subset.Addresses) != tc.expectedReady[i] { + t.Errorf("Unexpected number of actual ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.Addresses), tc.expectedReady[i]) + } + if len(subset.NotReadyAddresses) != tc.expectedNotReady[i] { + t.Errorf("Unexpected number of actual not ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.NotReadyAddresses), tc.expectedNotReady[i]) + } + } }) } }