From 38966453ce106c2a5ac77816805fa8b086323b21 Mon Sep 17 00:00:00 2001 From: Quan Tian Date: Fri, 11 Feb 2022 23:43:36 +0800 Subject: [PATCH] Skip updating Endpoints and EndpointSlice if no relevant fields change When comparing EndpointSubsets and Endpoints, we ignore the difference in ResourceVersion of Pod to avoid unnecessary updates caused by Pod updates that we don't care, e.g. annotation update. Otherwise periodic Service resync would intensively update Endpoints or EndpointSlice whose Pods have irrelevant change between two resyncs, leading to delay in processing newly created Services. In a scale cluster with thousands of such Endpoints, we observed 2 minutes of delay when the resync happens. --- .../endpoint/endpoints_controller.go | 4 +- .../endpoint/endpoints_controller_test.go | 46 +++++ .../util/endpoint/controller_utils.go | 50 ++++-- .../util/endpoint/controller_utils_test.go | 164 ++++++++++++++++++ 4 files changed, 248 insertions(+), 16 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 9751ed933c2..fbfe40b67ed 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -490,8 +490,10 @@ func (e *Controller) syncService(ctx context.Context, key string) error { if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok { compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService) } + // When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints + // updates caused by Pod updates that we don't care, e.g. annotation update. if !createEndpoints && - apiequality.Semantic.DeepEqual(currentEndpoints.Subsets, subsets) && + endpointutil.EndpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { klog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 0df46d5a434..a6a2fe5cf58 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -319,6 +319,52 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) { endpointsHandler.ValidateRequestCount(t, 0) } +func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) { + ns := metav1.NamespaceDefault + testServer, endpointsHandler := makeTestServer(t, ns) + defer testServer.Close() + pod0 := testPod(ns, 0, 1, true, ipv4only) + pod1 := testPod(ns, 1, 1, false, ipv4only) + endpoints := newController(testServer.URL, 0*time.Second) + endpoints.endpointsStore.Add(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{ + { + IP: pod0.Status.PodIPs[0].IP, + NodeName: &emptyNodeName, + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod0.Name, Namespace: ns, ResourceVersion: "1"}, + }, + }, + NotReadyAddresses: []v1.EndpointAddress{ + { + IP: pod1.Status.PodIPs[0].IP, + NodeName: &emptyNodeName, + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: pod1.Name, Namespace: ns, ResourceVersion: "2"}, + }, + }, + Ports: []v1.EndpointPort{{Port: 8080, Protocol: "TCP"}}, + }}, + }) + endpoints.serviceStore.Add(&v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []v1.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: intstr.FromInt(8080)}}, + }, + }) + pod0.ResourceVersion = "3" + pod1.ResourceVersion = "4" + endpoints.podStore.Add(pod0) + endpoints.podStore.Add(pod1) + endpoints.syncService(context.TODO(), ns+"/foo") + endpointsHandler.ValidateRequestCount(t, 0) +} + func TestSyncEndpointsNewNoSubsets(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns) diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index d3c9df88321..58c38b3c3c1 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -26,7 +26,7 @@ import ( v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/conversion" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -37,6 +37,18 @@ import ( "k8s.io/kubernetes/pkg/util/hash" ) +// semanticIgnoreResourceVersion does semantic deep equality checks for objects +// but excludes ResourceVersion of ObjectReference. They are used when comparing +// endpoints in Endpoints and EndpointSlice objects to avoid unnecessary updates +// caused by Pod resourceVersion change. +var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie( + func(a, b v1.ObjectReference) bool { + a.ResourceVersion = "" + b.ResourceVersion = "" + return a == b + }, +) + // ServiceSelectorCache is a cache of service selectors to avoid high CPU consumption caused by frequent calls to AsSelectorPreValidated (see #73527) type ServiceSelectorCache struct { lock sync.RWMutex @@ -279,7 +291,8 @@ func (sl portsInOrder) Less(i, j int) bool { // EndpointsEqualBeyondHash returns true if endpoints have equal attributes // but excludes equality checks that would have already been covered with -// endpoint hashing (see hashEndpoint func for more info). +// endpoint hashing (see hashEndpoint func for more info) and ignores difference +// in ResourceVersion of TargetRef. func EndpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool { if stringPtrChanged(ep1.NodeName, ep2.NodeName) { return false @@ -293,7 +306,20 @@ func EndpointsEqualBeyondHash(ep1, ep2 *discovery.Endpoint) bool { return false } - if objectRefPtrChanged(ep1.TargetRef, ep2.TargetRef) { + // Serving and Terminating will only be set when the EndpointSliceTerminatingCondition feature is on. + // Ignore their difference if the expected or actual value is nil, which means the feature enablement is changed. + // Otherwise all EndpointSlices in the system would be updated on the first controller-manager restart even without + // actual changes, leading to delay in processing legitimate updates. + // Its value will be set to the expected one when there is an actual change triggering update of this EndpointSlice. + if ep1.Conditions.Serving != nil && ep2.Conditions.Serving != nil && *ep1.Conditions.Serving != *ep2.Conditions.Serving { + return false + } + + if ep1.Conditions.Terminating != nil && ep2.Conditions.Terminating != nil && *ep1.Conditions.Terminating != *ep2.Conditions.Terminating { + return false + } + + if !semanticIgnoreResourceVersion.DeepEqual(ep1.TargetRef, ep2.TargetRef) { return false } @@ -311,18 +337,6 @@ func boolPtrChanged(ptr1, ptr2 *bool) bool { return false } -// objectRefPtrChanged returns true if a set of object ref pointers have -// different values. -func objectRefPtrChanged(ref1, ref2 *v1.ObjectReference) bool { - if (ref1 == nil) != (ref2 == nil) { - return true - } - if ref1 != nil && ref2 != nil && !apiequality.Semantic.DeepEqual(*ref1, *ref2) { - return true - } - return false -} - // stringPtrChanged returns true if a set of string pointers have different values. func stringPtrChanged(ptr1, ptr2 *string) bool { if (ptr1 == nil) != (ptr2 == nil) { @@ -333,3 +347,9 @@ func stringPtrChanged(ptr1, ptr2 *string) bool { } return false } + +// EndpointSubsetsEqualIgnoreResourceVersion returns true if EndpointSubsets +// have equal attributes but excludes ResourceVersion of Pod. +func EndpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool { + return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2) +} diff --git a/pkg/controller/util/endpoint/controller_utils_test.go b/pkg/controller/util/endpoint/controller_utils_test.go index a7c8ba265c7..acc50e9bd01 100644 --- a/pkg/controller/util/endpoint/controller_utils_test.go +++ b/pkg/controller/util/endpoint/controller_utils_test.go @@ -755,6 +755,58 @@ func TestEndpointsEqualBeyondHash(t *testing.T) { }, expected: false, }, + { + name: "Serving condition changed from nil to true", + ep1: &discovery.Endpoint{ + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: nil, + Terminating: nil, + }, + Addresses: []string{"10.0.0.1"}, + TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, + Zone: utilpointer.StringPtr("zone-1"), + NodeName: utilpointer.StringPtr("node-1"), + }, + ep2: &discovery.Endpoint{ + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Addresses: []string{"10.0.0.1"}, + TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, + Zone: utilpointer.StringPtr("zone-1"), + NodeName: utilpointer.StringPtr("node-1"), + }, + expected: true, + }, + { + name: "Serving condition changed from false to true", + ep1: &discovery.Endpoint{ + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(false), + Terminating: utilpointer.BoolPtr(false), + }, + Addresses: []string{"10.0.0.1"}, + TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, + Zone: utilpointer.StringPtr("zone-1"), + NodeName: utilpointer.StringPtr("node-1"), + }, + ep2: &discovery.Endpoint{ + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + Serving: utilpointer.BoolPtr(true), + Terminating: utilpointer.BoolPtr(false), + }, + Addresses: []string{"10.0.0.1"}, + TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0"}, + Zone: utilpointer.StringPtr("zone-1"), + NodeName: utilpointer.StringPtr("node-1"), + }, + expected: false, + }, { name: "Pod name changed", ep1: &discovery.Endpoint{ @@ -777,6 +829,28 @@ func TestEndpointsEqualBeyondHash(t *testing.T) { }, expected: false, }, + { + name: "Pod resourceVersion changed", + ep1: &discovery.Endpoint{ + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + }, + Addresses: []string{"10.0.0.1"}, + TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0", ResourceVersion: "1"}, + Zone: utilpointer.StringPtr("zone-1"), + NodeName: utilpointer.StringPtr("node-1"), + }, + ep2: &discovery.Endpoint{ + Conditions: discovery.EndpointConditions{ + Ready: utilpointer.BoolPtr(true), + }, + Addresses: []string{"10.0.0.1"}, + TargetRef: &v1.ObjectReference{Kind: "Pod", Namespace: "default", Name: "pod0", ResourceVersion: "2"}, + Zone: utilpointer.StringPtr("zone-1"), + NodeName: utilpointer.StringPtr("node-1"), + }, + expected: true, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -786,3 +860,93 @@ func TestEndpointsEqualBeyondHash(t *testing.T) { }) } } + +func TestEndpointSubsetsEqualIgnoreResourceVersion(t *testing.T) { + copyAndMutateEndpointSubset := func(orig *v1.EndpointSubset, mutator func(*v1.EndpointSubset)) *v1.EndpointSubset { + newSubSet := orig.DeepCopy() + mutator(newSubSet) + return newSubSet + } + es1 := &v1.EndpointSubset{ + Addresses: []v1.EndpointAddress{ + { + IP: "1.1.1.1", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-1", Namespace: "ns", ResourceVersion: "1"}, + }, + }, + NotReadyAddresses: []v1.EndpointAddress{ + { + IP: "1.1.1.2", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod1-2", Namespace: "ns2", ResourceVersion: "2"}, + }, + }, + Ports: []v1.EndpointPort{{Port: 8081, Protocol: "TCP"}}, + } + es2 := &v1.EndpointSubset{ + Addresses: []v1.EndpointAddress{ + { + IP: "2.2.2.1", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-1", Namespace: "ns", ResourceVersion: "3"}, + }, + }, + NotReadyAddresses: []v1.EndpointAddress{ + { + IP: "2.2.2.2", + TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod2-2", Namespace: "ns2", ResourceVersion: "4"}, + }, + }, + Ports: []v1.EndpointPort{{Port: 8082, Protocol: "TCP"}}, + } + tests := []struct { + name string + subsets1 []v1.EndpointSubset + subsets2 []v1.EndpointSubset + expected bool + }{ + { + name: "Subsets removed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1}, + expected: false, + }, + { + name: "Ready Pod IP changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { + es.Addresses[0].IP = "1.1.1.10" + }), *es2}, + expected: false, + }, + { + name: "NotReady Pod IP changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { + es.NotReadyAddresses[0].IP = "2.2.2.10" + })}, + expected: false, + }, + { + name: "Pod ResourceVersion changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es2, func(es *v1.EndpointSubset) { + es.Addresses[0].TargetRef.ResourceVersion = "100" + })}, + expected: true, + }, + { + name: "Ports changed", + subsets1: []v1.EndpointSubset{*es1, *es2}, + subsets2: []v1.EndpointSubset{*es1, *copyAndMutateEndpointSubset(es1, func(es *v1.EndpointSubset) { + es.Ports[0].Port = 8082 + })}, + expected: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := EndpointSubsetsEqualIgnoreResourceVersion(tt.subsets1, tt.subsets2); got != tt.expected { + t.Errorf("semanticIgnoreResourceVersion.DeepEqual() = %v, expected %v", got, tt.expected) + } + }) + } +}