mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #103520 from swetharepakula/truncate-endpoints
Truncate endpoints over a 1000 addresses
This commit is contained in:
commit
16af282ee7
@ -19,6 +19,7 @@ package endpoint
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@ -78,6 +79,11 @@ const (
|
||||
// This field is deprecated. v1.Service.PublishNotReadyAddresses will replace it
|
||||
// subsequent releases. It will be removed no sooner than 1.13.
|
||||
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
|
||||
|
||||
// truncated is a possible value for `endpoints.kubernetes.io/over-capacity` annotation on an
|
||||
// endpoint resource and indicates that the number of endpoints have been truncated to
|
||||
// maxCapacity
|
||||
truncated = "truncated"
|
||||
)
|
||||
|
||||
// NewEndpointController returns a new *Controller.
|
||||
@ -534,8 +540,8 @@ func (e *Controller) syncService(key string) error {
|
||||
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
|
||||
}
|
||||
|
||||
if overCapacity(newEndpoints.Subsets) {
|
||||
newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning"
|
||||
if truncateEndpoints(newEndpoints) {
|
||||
newEndpoints.Annotations[v1.EndpointsOverCapacity] = truncated
|
||||
} else {
|
||||
delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
|
||||
}
|
||||
@ -659,23 +665,85 @@ func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.E
|
||||
return epp
|
||||
}
|
||||
|
||||
// overCapacity returns true if there are more addresses in the provided subsets
|
||||
// than the maxCapacity.
|
||||
func overCapacity(subsets []v1.EndpointSubset) bool {
|
||||
// capacityAnnotationSetCorrectly returns false if number of endpoints is greater than maxCapacity or
|
||||
// returns true if underCapacity and the annotation is not set.
|
||||
func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
|
||||
numEndpoints := 0
|
||||
for _, subset := range subsets {
|
||||
numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses)
|
||||
}
|
||||
return numEndpoints > maxCapacity
|
||||
}
|
||||
|
||||
// capacityAnnotationSetCorrectly returns true if overCapacity() is true and the
|
||||
// EndpointsOverCapacity annotation is set to "warning" or if overCapacity()
|
||||
// is false and the annotation is not set.
|
||||
func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
|
||||
val, ok := annotations[v1.EndpointsOverCapacity]
|
||||
if overCapacity(subsets) {
|
||||
return ok && val == "warning"
|
||||
if numEndpoints > maxCapacity {
|
||||
// If subsets are over capacity, they must be truncated so consider
|
||||
// the annotation as not set correctly
|
||||
return false
|
||||
}
|
||||
_, ok := annotations[v1.EndpointsOverCapacity]
|
||||
return !ok
|
||||
}
|
||||
|
||||
// truncateEndpoints by best effort will distribute the endpoints over the subsets based on the proportion
|
||||
// of endpoints per subset and will prioritize Ready Endpoints over NotReady Endpoints.
|
||||
func truncateEndpoints(endpoints *v1.Endpoints) bool {
|
||||
totalReady := 0
|
||||
totalNotReady := 0
|
||||
for _, subset := range endpoints.Subsets {
|
||||
totalReady += len(subset.Addresses)
|
||||
totalNotReady += len(subset.NotReadyAddresses)
|
||||
}
|
||||
|
||||
if totalReady+totalNotReady <= maxCapacity {
|
||||
return false
|
||||
}
|
||||
|
||||
truncateReady := false
|
||||
max := maxCapacity - totalReady
|
||||
numTotal := totalNotReady
|
||||
if totalReady > maxCapacity {
|
||||
truncateReady = true
|
||||
max = maxCapacity
|
||||
numTotal = totalReady
|
||||
}
|
||||
canBeAdded := max
|
||||
|
||||
for i := range endpoints.Subsets {
|
||||
subset := endpoints.Subsets[i]
|
||||
numInSubset := len(subset.Addresses)
|
||||
if !truncateReady {
|
||||
numInSubset = len(subset.NotReadyAddresses)
|
||||
}
|
||||
|
||||
// The number of endpoints per subset will be based on the propotion of endpoints
|
||||
// in this subset versus the total number of endpoints. The proportion of endpoints
|
||||
// will be rounded up which most likely will lead to the last subset having less
|
||||
// endpoints than the expected proportion.
|
||||
toBeAdded := int(math.Ceil((float64(numInSubset) / float64(numTotal)) * float64(max)))
|
||||
// If there is not enough endpoints for the last subset, ensure only the number up
|
||||
// to the capacity are added
|
||||
if toBeAdded > canBeAdded {
|
||||
toBeAdded = canBeAdded
|
||||
}
|
||||
|
||||
if truncateReady {
|
||||
// Truncate ready Addresses to allocated proportion and truncate all not ready
|
||||
// addresses
|
||||
subset.Addresses = addressSubset(subset.Addresses, toBeAdded)
|
||||
subset.NotReadyAddresses = []v1.EndpointAddress{}
|
||||
canBeAdded -= len(subset.Addresses)
|
||||
} else {
|
||||
// Only truncate the not ready addresses
|
||||
subset.NotReadyAddresses = addressSubset(subset.NotReadyAddresses, toBeAdded)
|
||||
canBeAdded -= len(subset.NotReadyAddresses)
|
||||
}
|
||||
endpoints.Subsets[i] = subset
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// addressSubset takes a list of addresses and returns a subset if the length is greater
|
||||
// than the maxNum. If less than the maxNum, the entire list is returned.
|
||||
func addressSubset(addresses []v1.EndpointAddress, maxNum int) []v1.EndpointAddress {
|
||||
if len(addresses) <= maxNum {
|
||||
return addresses
|
||||
}
|
||||
return addresses[0:maxNum]
|
||||
}
|
||||
|
@ -2008,41 +2008,85 @@ func TestSyncEndpointsServiceNotFound(t *testing.T) {
|
||||
|
||||
func TestSyncServiceOverCapacity(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
startingAnnotation *string
|
||||
numExisting int
|
||||
numDesired int
|
||||
expectedAnnotation bool
|
||||
name string
|
||||
startingAnnotation *string
|
||||
numExisting int
|
||||
numDesired int
|
||||
numDesiredNotReady int
|
||||
numExpectedReady int
|
||||
numExpectedNotReady int
|
||||
expectedAnnotation bool
|
||||
}{{
|
||||
name: "empty",
|
||||
startingAnnotation: nil,
|
||||
numExisting: 0,
|
||||
numDesired: 0,
|
||||
expectedAnnotation: false,
|
||||
name: "empty",
|
||||
startingAnnotation: nil,
|
||||
numExisting: 0,
|
||||
numDesired: 0,
|
||||
numExpectedReady: 0,
|
||||
numExpectedNotReady: 0,
|
||||
expectedAnnotation: false,
|
||||
}, {
|
||||
name: "annotation added past capacity",
|
||||
startingAnnotation: nil,
|
||||
numExisting: maxCapacity - 1,
|
||||
numDesired: maxCapacity + 1,
|
||||
expectedAnnotation: true,
|
||||
name: "annotation added past capacity, < than maxCapacity of Ready Addresses",
|
||||
startingAnnotation: nil,
|
||||
numExisting: maxCapacity - 1,
|
||||
numDesired: maxCapacity - 3,
|
||||
numDesiredNotReady: 4,
|
||||
numExpectedReady: maxCapacity - 3,
|
||||
numExpectedNotReady: 3,
|
||||
expectedAnnotation: true,
|
||||
}, {
|
||||
name: "annotation removed below capacity",
|
||||
startingAnnotation: utilpointer.StringPtr("warning"),
|
||||
numExisting: maxCapacity - 1,
|
||||
numDesired: maxCapacity - 1,
|
||||
expectedAnnotation: false,
|
||||
name: "annotation added past capacity, maxCapacity of Ready Addresses ",
|
||||
startingAnnotation: nil,
|
||||
numExisting: maxCapacity - 1,
|
||||
numDesired: maxCapacity,
|
||||
numDesiredNotReady: 10,
|
||||
numExpectedReady: maxCapacity,
|
||||
numExpectedNotReady: 0,
|
||||
expectedAnnotation: true,
|
||||
}, {
|
||||
name: "annotation removed at capacity",
|
||||
startingAnnotation: utilpointer.StringPtr("warning"),
|
||||
numExisting: maxCapacity,
|
||||
numDesired: maxCapacity,
|
||||
expectedAnnotation: false,
|
||||
name: "annotation removed below capacity",
|
||||
startingAnnotation: utilpointer.StringPtr("truncated"),
|
||||
numExisting: maxCapacity - 1,
|
||||
numDesired: maxCapacity - 1,
|
||||
numDesiredNotReady: 0,
|
||||
numExpectedReady: maxCapacity - 1,
|
||||
numExpectedNotReady: 0,
|
||||
expectedAnnotation: false,
|
||||
}, {
|
||||
name: "no endpoints change, annotation value corrected",
|
||||
startingAnnotation: utilpointer.StringPtr("invalid"),
|
||||
numExisting: maxCapacity + 1,
|
||||
numDesired: maxCapacity + 1,
|
||||
expectedAnnotation: true,
|
||||
name: "annotation was set to warning previously, annotation removed at capacity",
|
||||
startingAnnotation: utilpointer.StringPtr("warning"),
|
||||
numExisting: maxCapacity,
|
||||
numDesired: maxCapacity,
|
||||
numDesiredNotReady: 0,
|
||||
numExpectedReady: maxCapacity,
|
||||
numExpectedNotReady: 0,
|
||||
expectedAnnotation: false,
|
||||
}, {
|
||||
name: "annotation was set to warning previously but still over capacity",
|
||||
startingAnnotation: utilpointer.StringPtr("warning"),
|
||||
numExisting: maxCapacity + 1,
|
||||
numDesired: maxCapacity + 1,
|
||||
numDesiredNotReady: 0,
|
||||
numExpectedReady: maxCapacity,
|
||||
numExpectedNotReady: 0,
|
||||
expectedAnnotation: true,
|
||||
}, {
|
||||
name: "annotation removed at capacity",
|
||||
startingAnnotation: utilpointer.StringPtr("truncated"),
|
||||
numExisting: maxCapacity,
|
||||
numDesired: maxCapacity,
|
||||
numDesiredNotReady: 0,
|
||||
numExpectedReady: maxCapacity,
|
||||
numExpectedNotReady: 0,
|
||||
expectedAnnotation: false,
|
||||
}, {
|
||||
name: "no endpoints change, annotation value corrected",
|
||||
startingAnnotation: utilpointer.StringPtr("invalid"),
|
||||
numExisting: maxCapacity + 1,
|
||||
numDesired: maxCapacity + 1,
|
||||
numDesiredNotReady: 0,
|
||||
numExpectedReady: maxCapacity,
|
||||
numExpectedNotReady: 0,
|
||||
expectedAnnotation: true,
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
@ -2050,7 +2094,7 @@ func TestSyncServiceOverCapacity(t *testing.T) {
|
||||
ns := "test"
|
||||
client, c := newFakeController(0 * time.Second)
|
||||
|
||||
addPods(c.podStore, ns, tc.numDesired, 1, 0, ipv4only)
|
||||
addPods(c.podStore, ns, tc.numDesired, 1, tc.numDesiredNotReady, ipv4only)
|
||||
pods := c.podStore.List()
|
||||
|
||||
svc := &v1.Service{
|
||||
@ -2094,14 +2138,98 @@ func TestSyncServiceOverCapacity(t *testing.T) {
|
||||
if tc.expectedAnnotation {
|
||||
if !ok {
|
||||
t.Errorf("Expected EndpointsOverCapacity annotation to be set")
|
||||
} else if actualAnnotation != "warning" {
|
||||
t.Errorf("Expected EndpointsOverCapacity annotation to be 'warning', got %s", actualAnnotation)
|
||||
} else if actualAnnotation != "truncated" {
|
||||
t.Errorf("Expected EndpointsOverCapacity annotation to be 'truncated', got %s", actualAnnotation)
|
||||
}
|
||||
} else {
|
||||
if ok {
|
||||
t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation)
|
||||
}
|
||||
}
|
||||
numActualReady := 0
|
||||
numActualNotReady := 0
|
||||
for _, subset := range actualEndpoints.Subsets {
|
||||
numActualReady += len(subset.Addresses)
|
||||
numActualNotReady += len(subset.NotReadyAddresses)
|
||||
}
|
||||
if numActualReady != tc.numExpectedReady {
|
||||
t.Errorf("Unexpected number of actual ready Endpoints: got %d endpoints, want %d endpoints", numActualReady, tc.numExpectedReady)
|
||||
}
|
||||
if numActualNotReady != tc.numExpectedNotReady {
|
||||
t.Errorf("Unexpected number of actual not ready Endpoints: got %d endpoints, want %d endpoints", numActualNotReady, tc.numExpectedNotReady)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestTruncateEndpoints(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
// subsetsReady, subsetsNotReady, expectedReady, expectedNotReady
|
||||
// must all be the same length
|
||||
subsetsReady []int
|
||||
subsetsNotReady []int
|
||||
expectedReady []int
|
||||
expectedNotReady []int
|
||||
}{{
|
||||
desc: "empty",
|
||||
subsetsReady: []int{},
|
||||
subsetsNotReady: []int{},
|
||||
expectedReady: []int{},
|
||||
expectedNotReady: []int{},
|
||||
}, {
|
||||
desc: "total endpoints < max capacity",
|
||||
subsetsReady: []int{50, 100, 100, 100, 100},
|
||||
subsetsNotReady: []int{50, 100, 100, 100, 100},
|
||||
expectedReady: []int{50, 100, 100, 100, 100},
|
||||
expectedNotReady: []int{50, 100, 100, 100, 100},
|
||||
}, {
|
||||
desc: "total endpoints = max capacity",
|
||||
subsetsReady: []int{100, 100, 100, 100, 100},
|
||||
subsetsNotReady: []int{100, 100, 100, 100, 100},
|
||||
expectedReady: []int{100, 100, 100, 100, 100},
|
||||
expectedNotReady: []int{100, 100, 100, 100, 100},
|
||||
}, {
|
||||
desc: "total ready endpoints < max capacity, but total endpoints > max capacity",
|
||||
subsetsReady: []int{90, 110, 50, 10, 20},
|
||||
subsetsNotReady: []int{101, 200, 200, 201, 298},
|
||||
expectedReady: []int{90, 110, 50, 10, 20},
|
||||
expectedNotReady: []int{73, 144, 144, 145, 214},
|
||||
}, {
|
||||
desc: "total ready endpoints > max capacity",
|
||||
subsetsReady: []int{205, 400, 402, 400, 693},
|
||||
subsetsNotReady: []int{100, 200, 200, 200, 300},
|
||||
expectedReady: []int{98, 191, 192, 191, 328},
|
||||
expectedNotReady: []int{0, 0, 0, 0, 0},
|
||||
}}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
var subsets []v1.EndpointSubset
|
||||
for subsetIndex, numReady := range tc.subsetsReady {
|
||||
subset := v1.EndpointSubset{}
|
||||
for i := 0; i < numReady; i++ {
|
||||
subset.Addresses = append(subset.Addresses, v1.EndpointAddress{})
|
||||
}
|
||||
|
||||
numNotReady := tc.subsetsNotReady[subsetIndex]
|
||||
for i := 0; i < numNotReady; i++ {
|
||||
subset.NotReadyAddresses = append(subset.NotReadyAddresses, v1.EndpointAddress{})
|
||||
}
|
||||
subsets = append(subsets, subset)
|
||||
}
|
||||
|
||||
endpoints := &v1.Endpoints{Subsets: subsets}
|
||||
truncateEndpoints(endpoints)
|
||||
|
||||
for i, subset := range endpoints.Subsets {
|
||||
if len(subset.Addresses) != tc.expectedReady[i] {
|
||||
t.Errorf("Unexpected number of actual ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.Addresses), tc.expectedReady[i])
|
||||
}
|
||||
if len(subset.NotReadyAddresses) != tc.expectedNotReady[i] {
|
||||
t.Errorf("Unexpected number of actual not ready Endpoints for subset %d: got %d endpoints, want %d endpoints", i, len(subset.NotReadyAddresses), tc.expectedNotReady[i])
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user