Truncate endpoints over a 1000 addresses

* set `endpoints.kubernetes.io/over-capacity` to "truncated" when
 number of addresses has been truncated to a 1000
 * ready addresses are prioritized over non-ready addresses
 * addresses are proportionally truncated across subsets
This commit is contained in:
Swetha Repakula 2021-07-06 11:26:46 -07:00
parent 006d5b8539
commit 9bd857ca04
2 changed files with 244 additions and 48 deletions

View File

@ -19,6 +19,7 @@ package endpoint
import (
"context"
"fmt"
"math"
"strconv"
"time"
@ -78,6 +79,11 @@ const (
// This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it
// subsequent releases. It will be removed no sooner than 1.13.
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
// truncated is a possible value for `endpoints.kubernetes.io/over-capacity` annotation on an
// endpoint resource and indicates that the number of endpoints have been truncated to
// maxCapacity
truncated = "truncated"
)
// NewEndpointController returns a new *Controller.
@ -534,8 +540,8 @@ func (e *Controller) syncService(key string) error {
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
}
if overCapacity(newEndpoints.Subsets) {
newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning"
if truncateEndpoints(newEndpoints) {
newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
} else {
delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
}
@ -659,23 +665,85 @@ func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.E
return epp
}
// overCapacity returns true if there are more addresses in the provided subsets
// than the maxCapacity.
func overCapacity(subsets []v1.EndpointSubset) bool {
// capacityAnnotationSetCorrectly returns false if number of endpoints is greater than maxCapacity or
// returns true if underCapacity and the annotation is not set.
func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
numEndpoints := 0
for _, subset := range subsets {
numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses)
}
return numEndpoints > maxCapacity
}
// capacityAnnotationSetCorrectly returns true if overCapacity() is true and the
// EndpointsOverCapacity annotation is set to "warning" or if overCapacity()
// is false and the annotation is not set.
func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
val, ok := annotations[v1.EndpointsOverCapacity]
if overCapacity(subsets) {
return ok && val == "warning"
if numEndpoints > maxCapacity {
// If subsets are over capacity, they must be truncated so consider
// the annotation as not set correctly
return false
}
_, ok := annotations[v1.EndpointsOverCapacity]
return !ok
}
// truncateEndpoints by best effort will distribute the endpoints over the subsets based on the proportion
// of endpoints per subset and will prioritize Ready Endpoints over NotReady Endpoints.
func truncateEndpoints(endpoints *v1.Endpoints) bool {
totalReady := 0
totalNotReady := 0
for _, subset := range endpoints.Subsets {
totalReady += len(subset.Addresses)
totalNotReady += len(subset.NotReadyAddresses)
}
if totalReady+totalNotReady <= maxCapacity {
return false
}
truncateReady := false
max := maxCapacity - totalReady
numTotal := totalNotReady
if totalReady > maxCapacity {
truncateReady = true
max = maxCapacity
numTotal = totalReady
}
canBeAdded := max
for i := range endpoints.Subsets {
subset := endpoints.Subsets[i]
numInSubset := len(subset.Addresses)
if !truncateReady {
numInSubset = len(subset.NotReadyAddresses)
}
// The number of endpoints per subset will be based on the propotion of endpoints
// in this subset versus the total number of endpoints. The proportion of endpoints
// will be rounded up which most likely will lead to the last subset having less
// endpoints than the expected proportion.
toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(max)))
// If there is not enough endpoints for the last subset, ensure only the number up
// to the capacity are added
if toBeAdded > canBeAdded {
toBeAdded = canBeAdded
}
if truncateReady {
// Truncate ready Addresses to allocated proportion and truncate all not ready
// addresses
subset.Addresses = addressSubset(subset.Addresses, toBeAdded)
subset.NotReadyAddresses = []v1.EndpointAddress{}
canBeAdded -= len(subset.Addresses)
} else {
// Only truncate the not ready addresses
subset.NotReadyAddresses = addressSubset(subset.NotReadyAddresses, toBeAdded)
canBeAdded -= len(subset.NotReadyAddresses)
}
endpoints.Subsets[i] = subset
}
return true
}
// addressSubset takes a list of addresses and returns a subset if the length is greater
// than the maxNum. If less than the maxNum, the entire list is returned.
func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddress {
if len(addresses) <= maxNum {
return addresses
}
return addresses[0:maxNum]
}

View File

@ -2008,41 +2008,85 @@ func TestSyncEndpointsServiceNotFound(t *testing.T) {
func TestSyncServiceOverCapacity(t *testing.T) {
testCases := []struct {
name string
startingAnnotation *string
numExisting int
numDesired int
expectedAnnotation bool
name string
startingAnnotation *string
numExisting int
numDesired int
numDesiredNotReady int
numExpectedReady int
numExpectedNotReady int
expectedAnnotation bool
}{{
name: "empty",
startingAnnotation: nil,
numExisting: 0,
numDesired: 0,
expectedAnnotation: false,
name: "empty",
startingAnnotation: nil,
numExisting: 0,
numDesired: 0,
numExpectedReady: 0,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "annotation added past capacity",
startingAnnotation: nil,
numExisting: maxCapacity - 1,
numDesired: maxCapacity + 1,
expectedAnnotation: true,
name: "annotation added past capacity, < than maxCapacity of Ready Addresses",
startingAnnotation: nil,
numExisting: maxCapacity - 1,
numDesired: maxCapacity - 3,
numDesiredNotReady: 4,
numExpectedReady: maxCapacity - 3,
numExpectedNotReady: 3,
expectedAnnotation: true,
}, {
name: "annotation removed below capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity - 1,
numDesired: maxCapacity - 1,
expectedAnnotation: false,
name: "annotation added past capacity, maxCapacity of Ready Addresses ",
startingAnnotation: nil,
numExisting: maxCapacity - 1,
numDesired: maxCapacity,
numDesiredNotReady: 10,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: true,
}, {
name: "annotation removed at capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity,
numDesired: maxCapacity,
expectedAnnotation: false,
name: "annotation removed below capacity",
startingAnnotation: utilpointer.StringPtr("truncated"),
numExisting: maxCapacity - 1,
numDesired: maxCapacity - 1,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity - 1,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "no endpoints change, annotation value corrected",
startingAnnotation: utilpointer.StringPtr("invalid"),
numExisting: maxCapacity + 1,
numDesired: maxCapacity + 1,
expectedAnnotation: true,
name: "annotation was set to warning previously, annotation removed at capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity,
numDesired: maxCapacity,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "annotation was set to warning previously but still over capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity + 1,
numDesired: maxCapacity + 1,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: true,
}, {
name: "annotation removed at capacity",
startingAnnotation: utilpointer.StringPtr("truncated"),
numExisting: maxCapacity,
numDesired: maxCapacity,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: false,
}, {
name: "no endpoints change, annotation value corrected",
startingAnnotation: utilpointer.StringPtr("invalid"),
numExisting: maxCapacity + 1,
numDesired: maxCapacity + 1,
numDesiredNotReady: 0,
numExpectedReady: maxCapacity,
numExpectedNotReady: 0,
expectedAnnotation: true,
}}
for _, tc := range testCases {
@ -2050,7 +2094,7 @@ func TestSyncServiceOverCapacity(t *testing.T) {
ns := "test"
client, c := newFakeController(0 * time.Second)
addPods(c.podStore, ns, tc.numDesired, 1, 0, ipv4only)
addPods(c.podStore, ns, tc.numDesired, 1, tc.numDesiredNotReady, ipv4only)
pods := c.podStore.List()
svc := &v1.Service{
@ -2094,14 +2138,98 @@ func TestSyncServiceOverCapacity(t *testing.T) {
if tc.expectedAnnotation {
if !ok {
t.Errorf("Expected EndpointsOverCapacity annotation to be set")
} else if actualAnnotation != "warning" {
t.Errorf("Expected EndpointsOverCapacity annotation to be 'warning', got %s", actualAnnotation)
} else if actualAnnotation != "truncated" {
t.Errorf("Expected EndpointsOverCapacity annotation to be 'truncated', got %s", actualAnnotation)
}
} else {
if ok {
t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation)
}
}
numActualReady := 0
numActualNotReady := 0
for _, subset := range actualEndpoints.Subsets {
numActualReady += len(subset.Addresses)
numActualNotReady += len(subset.NotReadyAddresses)
}
if numActualReady != tc.numExpectedReady {
t.Errorf("Unexpected number of actual ready Endpoints: got %d endpoints, want %d endpoints", numActualReady, tc.numExpectedReady)
}
if numActualNotReady != tc.numExpectedNotReady {
t.Errorf("Unexpected number of actual not ready Endpoints: got %d endpoints, want %d endpoints", numActualNotReady, tc.numExpectedNotReady)
}
})
}
}
func TestTruncateEndpoints(t *testing.T) {
testCases := []struct {
desc string
// subsetsReady, subsetsNotReady, expectedReady, expectedNotReady
// must all be the same length
subsetsReady []int
subsetsNotReady []int
expectedReady []int
expectedNotReady []int
}{{
desc: "empty",
subsetsReady: []int{},
subsetsNotReady: []int{},
expectedReady: []int{},
expectedNotReady: []int{},
}, {
desc: "total endpoints < max capacity",
subsetsReady: []int{50, 100, 100, 100, 100},
subsetsNotReady: []int{50, 100, 100, 100, 100},
expectedReady: []int{50, 100, 100, 100, 100},
expectedNotReady: []int{50, 100, 100, 100, 100},
}, {
desc: "total endpoints = max capacity",
subsetsReady: []int{100, 100, 100, 100, 100},
subsetsNotReady: []int{100, 100, 100, 100, 100},
expectedReady: []int{100, 100, 100, 100, 100},
expectedNotReady: []int{100, 100, 100, 100, 100},
}, {
desc: "total ready endpoints < max capacity, but total endpoints > max capacity",
subsetsReady: []int{90, 110, 50, 10, 20},
subsetsNotReady: []int{101, 200, 200, 201, 298},
expectedReady: []int{90, 110, 50, 10, 20},
expectedNotReady: []int{73, 144, 144, 145, 214},
}, {
desc: "total ready endpoints > max capacity",
subsetsReady: []int{205, 400, 402, 400, 693},
subsetsNotReady: []int{100, 200, 200, 200, 300},
expectedReady: []int{98, 191, 192, 191, 328},
expectedNotReady: []int{0, 0, 0, 0, 0},
}}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
var subsets []v1.EndpointSubset
for subsetIndex, numReady := range tc.subsetsReady {
subset := v1.EndpointSubset{}
for i := 0; i < numReady; i++ {
subset.Addresses = append(subset.Addresses, v1.EndpointAddress{})
}
numNotReady := tc.subsetsNotReady[subsetIndex]
for i := 0; i < numNotReady; i++ {
subset.NotReadyAddresses = append(subset.NotReadyAddresses, v1.EndpointAddress{})
}
subsets = append(subsets, subset)
}
endpoints := &v1.Endpoints{Subsets: subsets}
truncateEndpoints(endpoints)
for i, subset := range endpoints.Subsets {
if len(subset.Addresses) != tc.expectedReady[i] {
t.Errorf("Unexpected number of actual ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.Addresses), tc.expectedReady[i])
}
if len(subset.NotReadyAddresses) != tc.expectedNotReady[i] {
t.Errorf("Unexpected number of actual not ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.NotReadyAddresses), tc.expectedNotReady[i])
}
}
})
}
}