From 8a3f72074e6390f8f14a89a7b78cef4379737216 Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Mon, 8 Mar 2021 17:54:18 -0800 Subject: [PATCH] Adding new EndpointsOverCapacity annotation for Endpoints controller Now that the EndpointSlice API and controllers are GA, the Endpoints controller will use this annotation to warn when Endpoints are over capacity. In a future release, this warning will be replaced with truncation. --- pkg/apis/core/annotation_key_constants.go | 8 ++ .../endpoint/endpoints_controller.go | 35 ++++- .../endpoint/endpoints_controller_test.go | 125 ++++++++++++++++++ .../api/core/v1/annotation_key_constants.go | 8 ++ 4 files changed, 175 insertions(+), 1 deletion(-) diff --git a/pkg/apis/core/annotation_key_constants.go b/pkg/apis/core/annotation_key_constants.go index 3bb1d1bdd05..baacf3f10cd 100644 --- a/pkg/apis/core/annotation_key_constants.go +++ b/pkg/apis/core/annotation_key_constants.go @@ -101,6 +101,14 @@ const ( // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + // EndpointsOverCapacity will be set on an Endpoints resource when it + // exceeds the maximum capacity of 1000 addresses. Inititially the Endpoints + // controller will set this annotation with a value of "warning". In a + // future release, the controller may set this annotation with a value of + // "truncated" to indicate that any addresses exceeding the limit of 1000 + // have been truncated from the Endpoints resource. + EndpointsOverCapacity = "endpoints.kubernetes.io/over-capacity" + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 8399404203c..676bd2eebae 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -60,6 +60,11 @@ const ( // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s maxRetries = 15 + // maxCapacity represents the maximum number of addresses that should be + // stored in an Endpoints resource. In a future release, this controller + // may truncate endpoints exceeding this length. + maxCapacity = 1000 + // TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints // controller should go ahead and create endpoints for unready pods. This annotation is // currently only used by StatefulSets, where we need the pod to be DNS @@ -510,7 +515,8 @@ func (e *Controller) syncService(key string) error { } if !createEndpoints && apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) && - apiequality.Semantic.DeepEqual(compareLabels, service.Labels) { + apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && + capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } @@ -528,6 +534,12 @@ func (e *Controller) syncService(key string) error { delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime) } + if overCapacity(newEndpoints.Subsets) { + newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning" + } else { + delete(newEndpoints.Annotations, v1.EndpointsOverCapacity) + } + if newEndpoints.Labels == nil { newEndpoints.Labels = make(map[string]string) } @@ -646,3 +658,24 @@ func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.E } return epp } + +// overCapacity returns true if there are more addresses in the provided subsets +// than the maxCapacity. +func overCapacity(subsets []v1.EndpointSubset) bool { + numEndpoints := 0 + for _, subset := range subsets { + numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses) + } + return numEndpoints > maxCapacity +} + +// capacityAnnotationSetCorrectly returns true if overCapacity() is true and the +// EndpointsOverCapacity annotation is set to "warning" or if overCapacity() +// is false and the annotation is not set. +func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool { + val, ok := annotations[v1.EndpointsOverCapacity] + if overCapacity(subsets) { + return ok && val == "warning" + } + return !ok +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 19fb2c5e7a4..3af2fcf9a47 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package endpoint import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -35,6 +36,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" clientscheme "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -225,6 +227,29 @@ func newController(url string, batchPeriod time.Duration) *endpointController { } } +func newFakeController(batchPeriod time.Duration) (*fake.Clientset, *endpointController) { + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc()) + + eController := NewEndpointController( + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().Services(), + informerFactory.Core().V1().Endpoints(), + client, + batchPeriod) + + eController.podsSynced = alwaysReady + eController.servicesSynced = alwaysReady + eController.endpointsSynced = alwaysReady + + return client, &endpointController{ + eController, + informerFactory.Core().V1().Pods().Informer().GetStore(), + informerFactory.Core().V1().Services().Informer().GetStore(), + informerFactory.Core().V1().Endpoints().Informer().GetStore(), + } +} + func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) @@ -1981,6 +2006,106 @@ func TestSyncEndpointsServiceNotFound(t *testing.T) { endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil) } +func TestSyncServiceOverCapacity(t *testing.T) { + testCases := []struct { + name string + startingAnnotation *string + numExisting int + numDesired int + expectedAnnotation bool + }{{ + name: "empty", + startingAnnotation: nil, + numExisting: 0, + numDesired: 0, + expectedAnnotation: false, + }, { + name: "annotation added past capacity", + startingAnnotation: nil, + numExisting: maxCapacity - 1, + numDesired: maxCapacity + 1, + expectedAnnotation: true, + }, { + name: "annotation removed below capacity", + startingAnnotation: utilpointer.StringPtr("warning"), + numExisting: maxCapacity - 1, + numDesired: maxCapacity - 1, + expectedAnnotation: false, + }, { + name: "annotation removed at capacity", + startingAnnotation: utilpointer.StringPtr("warning"), + numExisting: maxCapacity, + numDesired: maxCapacity, + expectedAnnotation: false, + }, { + name: "no endpoints change, annotation value corrected", + startingAnnotation: utilpointer.StringPtr("invalid"), + numExisting: maxCapacity + 1, + numDesired: maxCapacity + 1, + expectedAnnotation: true, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ns := "test" + client, c := newFakeController(0 * time.Second) + + addPods(c.podStore, ns, tc.numDesired, 1, 0, ipv4only) + pods := c.podStore.List() + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80}}, + }, + } + c.serviceStore.Add(svc) + + subset := v1.EndpointSubset{} + for i := 0; i < tc.numExisting; i++ { + pod := pods[i].(*v1.Pod) + epa, _ := podToEndpointAddressForService(svc, pod) + subset.Addresses = append(subset.Addresses, *epa) + } + endpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: svc.Name, + Namespace: ns, + ResourceVersion: "1", + Annotations: map[string]string{}, + }, + Subsets: []v1.EndpointSubset{subset}, + } + if tc.startingAnnotation != nil { + endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation + } + c.endpointsStore.Add(endpoints) + client.CoreV1().Endpoints(ns).Create(context.TODO(), endpoints, metav1.CreateOptions{}) + + c.syncService(fmt.Sprintf("%s/%s", ns, svc.Name)) + + actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), endpoints.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("unexpected error getting endpoints: %v", err) + } + + actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity] + if tc.expectedAnnotation { + if !ok { + t.Errorf("Expected EndpointsOverCapacity annotation to be set") + } else if actualAnnotation != "warning" { + t.Errorf("Expected EndpointsOverCapacity annotation to be 'warning', got %s", actualAnnotation) + } + } else { + if ok { + t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation) + } + } + }) + } +} + func TestEndpointPortFromServicePort(t *testing.T) { http := utilpointer.StringPtr("http") testCases := map[string]struct { diff --git a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go index 2b486415a60..612f6aa747a 100644 --- a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go +++ b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go @@ -123,6 +123,14 @@ const ( // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + // EndpointsOverCapacity will be set on an Endpoints resource when it + // exceeds the maximum capacity of 1000 addresses. Inititially the Endpoints + // controller will set this annotation with a value of "warning". In a + // future release, the controller may set this annotation with a value of + // "truncated" to indicate that any addresses exceeding the limit of 1000 + // have been truncated from the Endpoints resource. + EndpointsOverCapacity = "endpoints.kubernetes.io/over-capacity" + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or