Add "endpoints.kubernetes.io/managed-by" label to Endpoints

Add a label to allow us to recognize endpoint-controller-generated
Endpoints in the future. (In particular, to allow us to recognize
stale Endpoints whose Service gets deleted while the Endpoints
controller is not running.)

Unlike the corresponding EndpointSlice label, this is not defined as
part of the public API, because we have no interest in getting other
controllers to use it. (They should switch to creating EndpointSlices
instead.)
This commit is contained in:
Dan Winship 2025-03-04 11:06:10 -05:00
parent 58704903c5
commit c4ea350ef6
2 changed files with 66 additions and 12 deletions

View File

@ -23,7 +23,6 @@ import (
"time"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
@ -67,13 +66,19 @@ const (
// endpoint resource and indicates that the number of endpoints have been truncated to
// maxCapacity
truncated = "truncated"
// labelManagedBy is a label for recognizing Endpoints managed by this controller.
labelManagedBy = "endpoints.kubernetes.io/managed-by"
// controllerName is the name of this controller
controllerName = "endpoint-controller"
)
// NewEndpointController returns a new *Controller.
func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
broadcaster := record.NewBroadcaster(record.WithContext(ctx))
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
e := &Controller{
client: client,
@ -460,19 +465,11 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
// Compare the sorted subsets and labels
// Remove the HeadlessService label from the endpoints if it exists,
// as this won't be set on the service itself
// and will cause a false negative in this diff check.
// But first check if it has that label to avoid expensive copies.
compareLabels := currentEndpoints.Labels
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
}
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
// updates caused by Pod updates that we don't care, e.g. annotation update.
if !createEndpoints &&
endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
labelsCorrectForEndpoints(currentEndpoints.Labels, service.Labels) &&
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
return nil
@ -506,6 +503,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
} else {
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
}
newEndpoints.Labels[labelManagedBy] = controllerName
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
var updatedEndpoints *v1.Endpoints
@ -718,3 +716,24 @@ var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie(
func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool {
return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2)
}
// labelsCorrectForEndpoints tests that epLabels is correctly derived from svcLabels
// (ignoring the v1.IsHeadlessService label).
func labelsCorrectForEndpoints(epLabels, svcLabels map[string]string) bool {
if epLabels[labelManagedBy] != controllerName {
return false
}
// Every label in epLabels except v1.IsHeadlessService and labelManagedBy should
// correspond to a label in svcLabels, and svcLabels should not have any other
// labels that aren't in epLabels.
skipped := 0
for k, v := range epLabels {
if k == v1.IsHeadlessService || k == labelManagedBy {
skipped++
} else if sv, exists := svcLabels[k]; !exists || sv != v {
return false
}
}
return len(svcLabels) == len(epLabels)-skipped
}

View File

@ -317,6 +317,9 @@ func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
},
},
Subsets: nil,
})
@ -346,6 +349,9 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
},
},
Subsets: []v1.EndpointSubset{},
})
@ -376,6 +382,9 @@ func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) {
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
@ -501,6 +510,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -524,6 +534,7 @@ func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -652,6 +663,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -701,6 +713,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -746,6 +759,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -792,6 +806,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -838,6 +853,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -861,6 +877,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Name: "foo",
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
@ -887,6 +906,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -909,6 +929,9 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
ResourceVersion: "1",
Name: "foo",
Namespace: ns,
Labels: map[string]string{
labelManagedBy: controllerName,
},
},
Subsets: []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
@ -972,6 +995,7 @@ func TestSyncEndpointsItems(t *testing.T) {
ResourceVersion: "",
Name: "foo",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -1022,6 +1046,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
}}
serviceLabels[v1.IsHeadlessService] = ""
serviceLabels[labelManagedBy] = controllerName
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
ResourceVersion: "",
@ -1074,6 +1099,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
}
serviceLabels[v1.IsHeadlessService] = ""
serviceLabels[labelManagedBy] = controllerName
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
@ -1183,6 +1209,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
"a": "b",
v1.IsHeadlessService: "",
},
@ -1212,7 +1239,8 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
"foo": "bar",
labelManagedBy: controllerName,
"foo": "bar",
},
},
Subsets: []v1.EndpointSubset{},
@ -1236,6 +1264,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -1281,6 +1310,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -1327,6 +1357,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -1361,6 +1392,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -1580,6 +1612,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
},
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -1636,6 +1669,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
},
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
},
},
@ -1690,6 +1724,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
Namespace: ns,
ResourceVersion: "1",
Labels: map[string]string{
labelManagedBy: controllerName,
v1.IsHeadlessService: "",
}, // Annotation not set anymore.
},