diff --git a/pkg/apis/core/annotation_key_constants.go b/pkg/apis/core/annotation_key_constants.go index 3bb1d1bdd05..baacf3f10cd 100644 --- a/pkg/apis/core/annotation_key_constants.go +++ b/pkg/apis/core/annotation_key_constants.go @@ -101,6 +101,14 @@ const ( // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + // EndpointsOverCapacity will be set on an Endpoints resource when it + // exceeds the maximum capacity of 1000 addresses. Inititially the Endpoints + // controller will set this annotation with a value of "warning". In a + // future release, the controller may set this annotation with a value of + // "truncated" to indicate that any addresses exceeding the limit of 1000 + // have been truncated from the Endpoints resource. + EndpointsOverCapacity = "endpoints.kubernetes.io/over-capacity" + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 8399404203c..676bd2eebae 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -60,6 +60,11 @@ const ( // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s maxRetries = 15 + // maxCapacity represents the maximum number of addresses that should be + // stored in an Endpoints resource. In a future release, this controller + // may truncate endpoints exceeding this length. + maxCapacity = 1000 + // TolerateUnreadyEndpointsAnnotation is an annotation on the Service denoting if the endpoints // controller should go ahead and create endpoints for unready pods. This annotation is // currently only used by StatefulSets, where we need the pod to be DNS @@ -510,7 +515,8 @@ func (e *Controller) syncService(key string) error { } if !createEndpoints && apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) && - apiequality.Semantic.DeepEqual(compareLabels, service.Labels) { + apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && + capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) return nil } @@ -528,6 +534,12 @@ func (e *Controller) syncService(key string) error { delete(newEndpoints.Annotations, v1.EndpointsLastChangeTriggerTime) } + if overCapacity(newEndpoints.Subsets) { + newEndpoints.Annotations[v1.EndpointsOverCapacity] = "warning" + } else { + delete(newEndpoints.Annotations, v1.EndpointsOverCapacity) + } + if newEndpoints.Labels == nil { newEndpoints.Labels = make(map[string]string) } @@ -646,3 +658,24 @@ func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.E } return epp } + +// overCapacity returns true if there are more addresses in the provided subsets +// than the maxCapacity. +func overCapacity(subsets []v1.EndpointSubset) bool { + numEndpoints := 0 + for _, subset := range subsets { + numEndpoints += len(subset.Addresses) + len(subset.NotReadyAddresses) + } + return numEndpoints > maxCapacity +} + +// capacityAnnotationSetCorrectly returns true if overCapacity() is true and the +// EndpointsOverCapacity annotation is set to "warning" or if overCapacity() +// is false and the annotation is not set. +func capacityAnnotationSetCorrectly(annotations map[string]string, subsets []v1.EndpointSubset) bool { + val, ok := annotations[v1.EndpointsOverCapacity] + if overCapacity(subsets) { + return ok && val == "warning" + } + return !ok +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 19fb2c5e7a4..3af2fcf9a47 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package endpoint import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -35,6 +36,7 @@ import ( utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" clientscheme "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -225,6 +227,29 @@ func newController(url string, batchPeriod time.Duration) *endpointController { } } +func newFakeController(batchPeriod time.Duration) (*fake.Clientset, *endpointController) { + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, controllerpkg.NoResyncPeriodFunc()) + + eController := NewEndpointController( + informerFactory.Core().V1().Pods(), + informerFactory.Core().V1().Services(), + informerFactory.Core().V1().Endpoints(), + client, + batchPeriod) + + eController.podsSynced = alwaysReady + eController.servicesSynced = alwaysReady + eController.endpointsSynced = alwaysReady + + return client, &endpointController{ + eController, + informerFactory.Core().V1().Pods().Informer().GetStore(), + informerFactory.Core().V1().Services().Informer().GetStore(), + informerFactory.Core().V1().Endpoints().Informer().GetStore(), + } +} + func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) @@ -1981,6 +2006,106 @@ func TestSyncEndpointsServiceNotFound(t *testing.T) { endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints/foo", "DELETE", nil) } +func TestSyncServiceOverCapacity(t *testing.T) { + testCases := []struct { + name string + startingAnnotation *string + numExisting int + numDesired int + expectedAnnotation bool + }{{ + name: "empty", + startingAnnotation: nil, + numExisting: 0, + numDesired: 0, + expectedAnnotation: false, + }, { + name: "annotation added past capacity", + startingAnnotation: nil, + numExisting: maxCapacity - 1, + numDesired: maxCapacity + 1, + expectedAnnotation: true, + }, { + name: "annotation removed below capacity", + startingAnnotation: utilpointer.StringPtr("warning"), + numExisting: maxCapacity - 1, + numDesired: maxCapacity - 1, + expectedAnnotation: false, + }, { + name: "annotation removed at capacity", + startingAnnotation: utilpointer.StringPtr("warning"), + numExisting: maxCapacity, + numDesired: maxCapacity, + expectedAnnotation: false, + }, { + name: "no endpoints change, annotation value corrected", + startingAnnotation: utilpointer.StringPtr("invalid"), + numExisting: maxCapacity + 1, + numDesired: maxCapacity + 1, + expectedAnnotation: true, + }} + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ns := "test" + client, c := newFakeController(0 * time.Second) + + addPods(c.podStore, ns, tc.numDesired, 1, 0, ipv4only) + pods := c.podStore.List() + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80}}, + }, + } + c.serviceStore.Add(svc) + + subset := v1.EndpointSubset{} + for i := 0; i < tc.numExisting; i++ { + pod := pods[i].(*v1.Pod) + epa, _ := podToEndpointAddressForService(svc, pod) + subset.Addresses = append(subset.Addresses, *epa) + } + endpoints := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: svc.Name, + Namespace: ns, + ResourceVersion: "1", + Annotations: map[string]string{}, + }, + Subsets: []v1.EndpointSubset{subset}, + } + if tc.startingAnnotation != nil { + endpoints.Annotations[v1.EndpointsOverCapacity] = *tc.startingAnnotation + } + c.endpointsStore.Add(endpoints) + client.CoreV1().Endpoints(ns).Create(context.TODO(), endpoints, metav1.CreateOptions{}) + + c.syncService(fmt.Sprintf("%s/%s", ns, svc.Name)) + + actualEndpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), endpoints.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("unexpected error getting endpoints: %v", err) + } + + actualAnnotation, ok := actualEndpoints.Annotations[v1.EndpointsOverCapacity] + if tc.expectedAnnotation { + if !ok { + t.Errorf("Expected EndpointsOverCapacity annotation to be set") + } else if actualAnnotation != "warning" { + t.Errorf("Expected EndpointsOverCapacity annotation to be 'warning', got %s", actualAnnotation) + } + } else { + if ok { + t.Errorf("Expected EndpointsOverCapacity annotation not to be set, got %s", actualAnnotation) + } + } + }) + } +} + func TestEndpointPortFromServicePort(t *testing.T) { http := utilpointer.StringPtr("http") testCases := map[string]struct { diff --git a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go index 2b486415a60..612f6aa747a 100644 --- a/staging/src/k8s.io/api/core/v1/annotation_key_constants.go +++ b/staging/src/k8s.io/api/core/v1/annotation_key_constants.go @@ -123,6 +123,14 @@ const ( // https://github.com/kubernetes/community/blob/master/sig-scalability/slos/network_programming_latency.md EndpointsLastChangeTriggerTime = "endpoints.kubernetes.io/last-change-trigger-time" + // EndpointsOverCapacity will be set on an Endpoints resource when it + // exceeds the maximum capacity of 1000 addresses. Inititially the Endpoints + // controller will set this annotation with a value of "warning". In a + // future release, the controller may set this annotation with a value of + // "truncated" to indicate that any addresses exceeding the limit of 1000 + // have been truncated from the Endpoints resource. + EndpointsOverCapacity = "endpoints.kubernetes.io/over-capacity" + // MigratedPluginsAnnotationKey is the annotation key, set for CSINode objects, that is a comma-separated // list of in-tree plugins that will be serviced by the CSI backend on the Node represented by CSINode. // This annotation is used by the Attach Detach Controller to determine whether to use the in-tree or