From c4ea350ef6610d83148046129e28d0b362171b45 Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Tue, 4 Mar 2025 11:06:10 -0500 Subject: [PATCH] Add "endpoints.kubernetes.io/managed-by" label to Endpoints Add a label to allow us to recognize endpoint-controller-generated Endpoints in the future. (In particular, to allow us to recognize stale Endpoints whose Service gets deleted while the Endpoints controller is not running.) Unlike the corresponding EndpointSlice label, this is not defined as part of the public API, because we have no interest in getting other controllers to use it. (They should switch to creating EndpointSlices instead.) --- .../endpoint/endpoints_controller.go | 41 ++++++++++++++----- .../endpoint/endpoints_controller_test.go | 37 ++++++++++++++++- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 15404321e24..bfc052504b4 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -23,7 +23,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" @@ -67,13 +66,19 @@ const ( // endpoint resource and indicates that the number of endpoints have been truncated to // maxCapacity truncated = "truncated" + + // labelManagedBy is a label for recognizing Endpoints managed by this controller. + labelManagedBy = "endpoints.kubernetes.io/managed-by" + + // controllerName is the name of this controller + controllerName = "endpoint-controller" ) // NewEndpointController returns a new *Controller. func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller { broadcaster := record.NewBroadcaster(record.WithContext(ctx)) - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"}) + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) e := &Controller{ client: client, @@ -460,19 +465,11 @@ func (e *Controller) syncService(ctx context.Context, key string) error { createEndpoints := len(currentEndpoints.ResourceVersion) == 0 // Compare the sorted subsets and labels - // Remove the HeadlessService label from the endpoints if it exists, - // as this won't be set on the service itself - // and will cause a false negative in this diff check. - // But first check if it has that label to avoid expensive copies. - compareLabels := currentEndpoints.Labels - if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok { - compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService) - } // When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints // updates caused by Pod updates that we don't care, e.g. annotation update. if !createEndpoints && endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && - apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && + labelsCorrectForEndpoints(currentEndpoints.Labels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service)) return nil @@ -506,6 +503,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { } else { newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService) } + newEndpoints.Labels[labelManagedBy] = controllerName logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) var updatedEndpoints *v1.Endpoints @@ -718,3 +716,24 @@ var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie( func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool { return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2) } + +// labelsCorrectForEndpoints tests that epLabels is correctly derived from svcLabels +// (ignoring the v1.IsHeadlessService label). +func labelsCorrectForEndpoints(epLabels, svcLabels map[string]string) bool { + if epLabels[labelManagedBy] != controllerName { + return false + } + + // Every label in epLabels except v1.IsHeadlessService and labelManagedBy should + // correspond to a label in svcLabels, and svcLabels should not have any other + // labels that aren't in epLabels. + skipped := 0 + for k, v := range epLabels { + if k == v1.IsHeadlessService || k == labelManagedBy { + skipped++ + } else if sv, exists := svcLabels[k]; !exists || sv != v { + return false + } + } + return len(svcLabels) == len(epLabels)-skipped +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 4188c93c263..2ccc848864b 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -317,6 +317,9 @@ func TestSyncEndpointsExistingNilSubsets(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: nil, }) @@ -346,6 +349,9 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -376,6 +382,9 @@ func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ @@ -501,6 +510,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -524,6 +534,7 @@ func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -652,6 +663,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -701,6 +713,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -746,6 +759,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -792,6 +806,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -838,6 +853,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -861,6 +877,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, @@ -887,6 +906,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -909,6 +929,9 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ResourceVersion: "1", Name: "foo", Namespace: ns, + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -972,6 +995,7 @@ func TestSyncEndpointsItems(t *testing.T) { ResourceVersion: "", Name: "foo", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1022,6 +1046,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { }} serviceLabels[v1.IsHeadlessService] = "" + serviceLabels[labelManagedBy] = controllerName data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", @@ -1074,6 +1099,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { } serviceLabels[v1.IsHeadlessService] = "" + serviceLabels[labelManagedBy] = controllerName data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -1183,6 +1209,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, "a": "b", v1.IsHeadlessService: "", }, @@ -1212,7 +1239,8 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ - "foo": "bar", + labelManagedBy: controllerName, + "foo": "bar", }, }, Subsets: []v1.EndpointSubset{}, @@ -1236,6 +1264,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1281,6 +1310,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1327,6 +1357,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1361,6 +1392,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "foo", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1580,6 +1612,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) { v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1636,6 +1669,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1690,6 +1724,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, // Annotation not set anymore. },