diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 15404321e24..bfc052504b4 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -23,7 +23,6 @@ import ( "time" v1 "k8s.io/api/core/v1" - apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/conversion" @@ -67,13 +66,19 @@ const ( // endpoint resource and indicates that the number of endpoints have been truncated to // maxCapacity truncated = "truncated" + + // labelManagedBy is a label for recognizing Endpoints managed by this controller. + labelManagedBy = "endpoints.kubernetes.io/managed-by" + + // controllerName is the name of this controller + controllerName = "endpoint-controller" ) // NewEndpointController returns a new *Controller. func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller { broadcaster := record.NewBroadcaster(record.WithContext(ctx)) - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"}) + recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName}) e := &Controller{ client: client, @@ -460,19 +465,11 @@ func (e *Controller) syncService(ctx context.Context, key string) error { createEndpoints := len(currentEndpoints.ResourceVersion) == 0 // Compare the sorted subsets and labels - // Remove the HeadlessService label from the endpoints if it exists, - // as this won't be set on the service itself - // and will cause a false negative in this diff check. - // But first check if it has that label to avoid expensive copies. - compareLabels := currentEndpoints.Labels - if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok { - compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService) - } // When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints // updates caused by Pod updates that we don't care, e.g. annotation update. if !createEndpoints && endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) && - apiequality.Semantic.DeepEqual(compareLabels, service.Labels) && + labelsCorrectForEndpoints(currentEndpoints.Labels, service.Labels) && capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) { logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service)) return nil @@ -506,6 +503,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { } else { newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService) } + newEndpoints.Labels[labelManagedBy] = controllerName logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps) var updatedEndpoints *v1.Endpoints @@ -718,3 +716,24 @@ var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie( func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool { return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2) } + +// labelsCorrectForEndpoints tests that epLabels is correctly derived from svcLabels +// (ignoring the v1.IsHeadlessService label). +func labelsCorrectForEndpoints(epLabels, svcLabels map[string]string) bool { + if epLabels[labelManagedBy] != controllerName { + return false + } + + // Every label in epLabels except v1.IsHeadlessService and labelManagedBy should + // correspond to a label in svcLabels, and svcLabels should not have any other + // labels that aren't in epLabels. + skipped := 0 + for k, v := range epLabels { + if k == v1.IsHeadlessService || k == labelManagedBy { + skipped++ + } else if sv, exists := svcLabels[k]; !exists || sv != v { + return false + } + } + return len(svcLabels) == len(epLabels)-skipped +} diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 4188c93c263..2ccc848864b 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -317,6 +317,9 @@ func TestSyncEndpointsExistingNilSubsets(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: nil, }) @@ -346,6 +349,9 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{}, }) @@ -376,6 +382,9 @@ func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{ @@ -501,6 +510,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -524,6 +534,7 @@ func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -652,6 +663,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -701,6 +713,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -746,6 +759,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -792,6 +806,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -838,6 +853,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -861,6 +877,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Name: "foo", Namespace: ns, ResourceVersion: "1", + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}}, @@ -887,6 +906,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -909,6 +929,9 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { ResourceVersion: "1", Name: "foo", Namespace: ns, + Labels: map[string]string{ + labelManagedBy: controllerName, + }, }, Subsets: []v1.EndpointSubset{{ Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, @@ -972,6 +995,7 @@ func TestSyncEndpointsItems(t *testing.T) { ResourceVersion: "", Name: "foo", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1022,6 +1046,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { }} serviceLabels[v1.IsHeadlessService] = "" + serviceLabels[labelManagedBy] = controllerName data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ ResourceVersion: "", @@ -1074,6 +1099,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { } serviceLabels[v1.IsHeadlessService] = "" + serviceLabels[labelManagedBy] = controllerName data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", @@ -1183,6 +1209,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, "a": "b", v1.IsHeadlessService: "", }, @@ -1212,7 +1239,8 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ - "foo": "bar", + labelManagedBy: controllerName, + "foo": "bar", }, }, Subsets: []v1.EndpointSubset{}, @@ -1236,6 +1264,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1281,6 +1310,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1327,6 +1357,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1361,6 +1392,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "foo", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1580,6 +1612,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) { v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1636,6 +1669,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) { v1.EndpointsLastChangeTriggerTime: triggerTimeString, }, Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, }, @@ -1690,6 +1724,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) { Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ + labelManagedBy: controllerName, v1.IsHeadlessService: "", }, // Annotation not set anymore. },