mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
Merge pull request #130564 from danwinship/label-endpoints
Add "endpoints.kubernetes.io/managed-by" label to Endpoints
This commit is contained in:
commit
8873c7e875
@ -23,7 +23,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/conversion"
|
"k8s.io/apimachinery/pkg/conversion"
|
||||||
@ -67,13 +66,19 @@ const (
|
|||||||
// endpoint resource and indicates that the number of endpoints have been truncated to
|
// endpoint resource and indicates that the number of endpoints have been truncated to
|
||||||
// maxCapacity
|
// maxCapacity
|
||||||
truncated = "truncated"
|
truncated = "truncated"
|
||||||
|
|
||||||
|
// labelManagedBy is a label for recognizing Endpoints managed by this controller.
|
||||||
|
labelManagedBy = "endpoints.kubernetes.io/managed-by"
|
||||||
|
|
||||||
|
// controllerName is the name of this controller
|
||||||
|
controllerName = "endpoint-controller"
|
||||||
)
|
)
|
||||||
|
|
||||||
// NewEndpointController returns a new *Controller.
|
// NewEndpointController returns a new *Controller.
|
||||||
func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
|
func NewEndpointController(ctx context.Context, podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer,
|
||||||
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
|
endpointsInformer coreinformers.EndpointsInformer, client clientset.Interface, endpointUpdatesBatchPeriod time.Duration) *Controller {
|
||||||
broadcaster := record.NewBroadcaster(record.WithContext(ctx))
|
broadcaster := record.NewBroadcaster(record.WithContext(ctx))
|
||||||
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "endpoint-controller"})
|
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
|
||||||
|
|
||||||
e := &Controller{
|
e := &Controller{
|
||||||
client: client,
|
client: client,
|
||||||
@ -460,19 +465,11 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
|
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
|
||||||
|
|
||||||
// Compare the sorted subsets and labels
|
// Compare the sorted subsets and labels
|
||||||
// Remove the HeadlessService label from the endpoints if it exists,
|
|
||||||
// as this won't be set on the service itself
|
|
||||||
// and will cause a false negative in this diff check.
|
|
||||||
// But first check if it has that label to avoid expensive copies.
|
|
||||||
compareLabels := currentEndpoints.Labels
|
|
||||||
if _, ok := currentEndpoints.Labels[v1.IsHeadlessService]; ok {
|
|
||||||
compareLabels = utillabels.CloneAndRemoveLabel(currentEndpoints.Labels, v1.IsHeadlessService)
|
|
||||||
}
|
|
||||||
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
|
// When comparing the subsets, we ignore the difference in ResourceVersion of Pod to avoid unnecessary Endpoints
|
||||||
// updates caused by Pod updates that we don't care, e.g. annotation update.
|
// updates caused by Pod updates that we don't care, e.g. annotation update.
|
||||||
if !createEndpoints &&
|
if !createEndpoints &&
|
||||||
endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
|
endpointSubsetsEqualIgnoreResourceVersion(currentEndpoints.Subsets, subsets) &&
|
||||||
apiequality.Semantic.DeepEqual(compareLabels, service.Labels) &&
|
labelsCorrectForEndpoints(currentEndpoints.Labels, service.Labels) &&
|
||||||
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
|
capacityAnnotationSetCorrectly(currentEndpoints.Annotations, currentEndpoints.Subsets) {
|
||||||
logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
|
logger.V(5).Info("endpoints are equal, skipping update", "service", klog.KObj(service))
|
||||||
return nil
|
return nil
|
||||||
@ -506,6 +503,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
|||||||
} else {
|
} else {
|
||||||
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
|
newEndpoints.Labels = utillabels.CloneAndRemoveLabel(newEndpoints.Labels, v1.IsHeadlessService)
|
||||||
}
|
}
|
||||||
|
newEndpoints.Labels[labelManagedBy] = controllerName
|
||||||
|
|
||||||
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
|
logger.V(4).Info("Update endpoints", "service", klog.KObj(service), "readyEndpoints", totalReadyEps, "notreadyEndpoints", totalNotReadyEps)
|
||||||
var updatedEndpoints *v1.Endpoints
|
var updatedEndpoints *v1.Endpoints
|
||||||
@ -718,3 +716,24 @@ var semanticIgnoreResourceVersion = conversion.EqualitiesOrDie(
|
|||||||
func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool {
|
func endpointSubsetsEqualIgnoreResourceVersion(subsets1, subsets2 []v1.EndpointSubset) bool {
|
||||||
return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2)
|
return semanticIgnoreResourceVersion.DeepEqual(subsets1, subsets2)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// labelsCorrectForEndpoints tests that epLabels is correctly derived from svcLabels
|
||||||
|
// (ignoring the v1.IsHeadlessService label).
|
||||||
|
func labelsCorrectForEndpoints(epLabels, svcLabels map[string]string) bool {
|
||||||
|
if epLabels[labelManagedBy] != controllerName {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Every label in epLabels except v1.IsHeadlessService and labelManagedBy should
|
||||||
|
// correspond to a label in svcLabels, and svcLabels should not have any other
|
||||||
|
// labels that aren't in epLabels.
|
||||||
|
skipped := 0
|
||||||
|
for k, v := range epLabels {
|
||||||
|
if k == v1.IsHeadlessService || k == labelManagedBy {
|
||||||
|
skipped++
|
||||||
|
} else if sv, exists := svcLabels[k]; !exists || sv != v {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return len(svcLabels) == len(epLabels)-skipped
|
||||||
|
}
|
||||||
|
@ -317,6 +317,9 @@ func TestSyncEndpointsExistingNilSubsets(t *testing.T) {
|
|||||||
Name: "foo",
|
Name: "foo",
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Subsets: nil,
|
Subsets: nil,
|
||||||
})
|
})
|
||||||
@ -346,6 +349,9 @@ func TestSyncEndpointsExistingEmptySubsets(t *testing.T) {
|
|||||||
Name: "foo",
|
Name: "foo",
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Subsets: []v1.EndpointSubset{},
|
Subsets: []v1.EndpointSubset{},
|
||||||
})
|
})
|
||||||
@ -376,6 +382,9 @@ func TestSyncEndpointsWithPodResourceVersionUpdateOnly(t *testing.T) {
|
|||||||
Name: "foo",
|
Name: "foo",
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Subsets: []v1.EndpointSubset{{
|
Subsets: []v1.EndpointSubset{{
|
||||||
Addresses: []v1.EndpointAddress{
|
Addresses: []v1.EndpointAddress{
|
||||||
@ -501,6 +510,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -524,6 +534,7 @@ func TestSyncEndpointsHeadlessServiceLabel(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -652,6 +663,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -701,6 +713,7 @@ func TestSyncEndpointsProtocolSCTP(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -746,6 +759,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -792,6 +806,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -838,6 +853,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -861,6 +877,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
|||||||
Name: "foo",
|
Name: "foo",
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Subsets: []v1.EndpointSubset{{
|
Subsets: []v1.EndpointSubset{{
|
||||||
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
Addresses: []v1.EndpointAddress{{IP: "6.7.8.9", NodeName: &emptyNodeName}},
|
||||||
@ -887,6 +906,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -909,6 +929,9 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
|
|||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Subsets: []v1.EndpointSubset{{
|
Subsets: []v1.EndpointSubset{{
|
||||||
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
|
||||||
@ -972,6 +995,7 @@ func TestSyncEndpointsItems(t *testing.T) {
|
|||||||
ResourceVersion: "",
|
ResourceVersion: "",
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1022,6 +1046,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
|
|||||||
}}
|
}}
|
||||||
|
|
||||||
serviceLabels[v1.IsHeadlessService] = ""
|
serviceLabels[v1.IsHeadlessService] = ""
|
||||||
|
serviceLabels[labelManagedBy] = controllerName
|
||||||
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
ResourceVersion: "",
|
ResourceVersion: "",
|
||||||
@ -1074,6 +1099,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
serviceLabels[v1.IsHeadlessService] = ""
|
serviceLabels[v1.IsHeadlessService] = ""
|
||||||
|
serviceLabels[labelManagedBy] = controllerName
|
||||||
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
data := runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
@ -1183,6 +1209,7 @@ func TestSyncEndpointsHeadlessService(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
"a": "b",
|
"a": "b",
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
@ -1212,6 +1239,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
"foo": "bar",
|
"foo": "bar",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1236,6 +1264,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseFail
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1281,6 +1310,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyNeverAndPhaseSucc
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1327,6 +1357,7 @@ func TestSyncEndpointsItemsExcludeNotReadyPodsWithRestartPolicyOnFailureAndPhase
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1361,6 +1392,7 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
|
|||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1580,6 +1612,7 @@ func TestLastTriggerChangeTimeAnnotation(t *testing.T) {
|
|||||||
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
|
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
|
||||||
},
|
},
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1636,6 +1669,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationOverridden(t *testing.T) {
|
|||||||
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
|
v1.EndpointsLastChangeTriggerTime: triggerTimeString,
|
||||||
},
|
},
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@ -1690,6 +1724,7 @@ func TestLastTriggerChangeTimeAnnotation_AnnotationCleared(t *testing.T) {
|
|||||||
Namespace: ns,
|
Namespace: ns,
|
||||||
ResourceVersion: "1",
|
ResourceVersion: "1",
|
||||||
Labels: map[string]string{
|
Labels: map[string]string{
|
||||||
|
labelManagedBy: controllerName,
|
||||||
v1.IsHeadlessService: "",
|
v1.IsHeadlessService: "",
|
||||||
}, // Annotation not set anymore.
|
}, // Annotation not set anymore.
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user