Merge pull request #99975 from robscott/endpoints-over-capacity

Adding new EndpointsOverCapacity annotation for Endpoints controller
This commit is contained in:
Kubernetes Prow Robot 2021-03-09 16:08:48 -08:00 committed by GitHub
commit 05c4febbe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 175 additions and 1 deletions

View File

@ -101,6 +101,14 @@ const (
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// EndpointsOverCapacity will be set on an Endpoints resource when it
// exceeds the maximum capacity of 1000 addresses. Inititially the Endpoints
// controller will set this annotation with a value of "warning". In a
// future release, the controller may set this annotation with a value of
// "truncated" to indicate that any addresses exceeding the limit of 1000
// have been truncated from the Endpoints resource.
EndpointsOverCapacity = "endpoints.kubernetes.io/over-capacity"
// MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated
// list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode.
// This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or

View File

@ -60,6 +60,11 @@ const (
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
// maxCapacity represents the maximum number of addresses that should be
// stored in an Endpoints resource. In a future release, this controller
// may truncate endpoints exceeding this length.
maxCapacity = 1000
// TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints
// controller should go ahead and create endpoints for unready pods. This annotation is
// currently only used by StatefulSets, where we need the pod to be DNS
@ -510,7 +515,8 @@ func (e *Controller) syncService(key string) error {
}
if !createEndpoints &&
apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) {
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return nil
}
@ -528,6 +534,12 @@ func (e *Controller) syncService(key string) error {
delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime)
}
if overCapacity(newEndpoints.Subsets) {
newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning"
} else {
delete(newEndpoints.Annotations, v1.EndpointsOverCapacity)
}
if newEndpoints.Labels == nil {
newEndpoints.Labels = make(map[string]string)
}
@ -646,3 +658,24 @@ func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.E
}
return epp
}
// overCapacity returns true if there are more addresses in the provided subsets
// than the maxCapacity.
func overCapacity(subsets []v1.EndpointSubset) bool {
numEndpoints := 0
for _, subset := range subsets {
numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses)
}
return numEndpoints > maxCapacity
}
// capacityAnnotationSetCorrectly returns true if overCapacity() is true and the
// EndpointsOverCapacity annotation is set to "warning" or if overCapacity()
// is false and the annotation is not set.
func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool {
val, ok := annotations[v1.EndpointsOverCapacity]
if overCapacity(subsets) {
return ok && val == "warning"
}
return !ok
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package endpoint
import (
"context"
"fmt"
"net/http"
"net/http/httptest"
@ -35,6 +36,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
clientscheme "k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
@ -225,6 +227,29 @@ func newController(url string, batchPeriod time.Duration) *endpointController {
}
}
func newFakeController(batchPeriod time.Duration) (*fake.Clientset, *endpointController) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc())
eController := NewEndpointController(
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Services(),
informerFactory.Core().V1().Endpoints(),
client,
batchPeriod)
eController.podsSynced = alwaysReady
eController.servicesSynced = alwaysReady
eController.endpointsSynced = alwaysReady
return client, &endpointController{
eController,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Core().V1().Services().Informer().GetStore(),
informerFactory.Core().V1().Endpoints().Informer().GetStore(),
}
}
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, endpointsHandler := makeTestServer(t, ns)
@ -1981,6 +2006,106 @@ func TestSyncEndpointsServiceNotFound(t *testing.T) {
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil)
}
func TestSyncServiceOverCapacity(t *testing.T) {
testCases := []struct {
name string
startingAnnotation *string
numExisting int
numDesired int
expectedAnnotation bool
}{{
name: "empty",
startingAnnotation: nil,
numExisting: 0,
numDesired: 0,
expectedAnnotation: false,
}, {
name: "annotation added past capacity",
startingAnnotation: nil,
numExisting: maxCapacity - 1,
numDesired: maxCapacity + 1,
expectedAnnotation: true,
}, {
name: "annotation removed below capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity - 1,
numDesired: maxCapacity - 1,
expectedAnnotation: false,
}, {
name: "annotation removed at capacity",
startingAnnotation: utilpointer.StringPtr("warning"),
numExisting: maxCapacity,
numDesired: maxCapacity,
expectedAnnotation: false,
}, {
name: "no endpoints change, annotation value corrected",
startingAnnotation: utilpointer.StringPtr("invalid"),
numExisting: maxCapacity + 1,
numDesired: maxCapacity + 1,
expectedAnnotation: true,
}}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ns := "test"
client, c := newFakeController(0 * time.Second)
addPods(c.podStore, ns, tc.numDesired, 1, 0, ipv4only)
pods := c.podStore.List()
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []v1.ServicePort{{Port: 80}},
},
}
c.serviceStore.Add(svc)
subset := v1.EndpointSubset{}
for i := 0; i < tc.numExisting; i++ {
pod := pods[i].(*v1.Pod)
epa, _ := podToEndpointAddressForService(svc, pod)
subset.Addresses = append(subset.Addresses, *epa)
}
endpoints := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: svc.Name,
Namespace: ns,
ResourceVersion: "1",
Annotations: map[string]string{},
},
Subsets: []v1.EndpointSubset{subset},
}
if tc.startingAnnotation != nil {
endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation
}
c.endpointsStore.Add(endpoints)
client.CoreV1().Endpoints(ns).Create(context.TODO(), endpoints, metav1.CreateOptions{})
c.syncService(fmt.Sprintf("%s/%s", ns, svc.Name))
actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), endpoints.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("unexpected error getting endpoints: %v", err)
}
actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity]
if tc.expectedAnnotation {
if !ok {
t.Errorf("Expected EndpointsOverCapacity annotation to be set")
} else if actualAnnotation != "warning" {
t.Errorf("Expected EndpointsOverCapacity annotation to be 'warning', got %s", actualAnnotation)
}
} else {
if ok {
t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation)
}
}
})
}
}
func TestEndpointPortFromServicePort(t *testing.T) {
http := utilpointer.StringPtr("http")
testCases := map[string]struct {

View File

@ -123,6 +123,14 @@ const (
// https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md
EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time"
// EndpointsOverCapacity will be set on an Endpoints resource when it
// exceeds the maximum capacity of 1000 addresses. Inititially the Endpoints
// controller will set this annotation with a value of "warning". In a
// future release, the controller may set this annotation with a value of
// "truncated" to indicate that any addresses exceeding the limit of 1000
// have been truncated from the Endpoints resource.
EndpointsOverCapacity = "endpoints.kubernetes.io/over-capacity"
// MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated
// list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode.
// This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or