From 5e7e1e7cf168c8565ffbab2fff3fe65e823770fe Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Mon, 3 Mar 2025 13:17:29 +0000 Subject: [PATCH 1/6] KEP-4742: Node Topology Labels via Downward API --- .../samples/generic/server/admission.go | 3 + pkg/features/kube_features.go | 10 + pkg/kubeapiserver/options/plugins.go | 4 + pkg/registry/core/pod/storage/storage.go | 25 ++- pkg/registry/core/pod/storage/storage_test.go | 14 +- .../admission/podtopologylabels/admission.go | 205 ++++++++++++++++++ .../podtopologylabels/admission_test.go | 174 +++++++++++++++ plugin/pkg/admission/podtopologylabels/doc.go | 23 ++ .../reference/versioned_feature_list.yaml | 6 + test/integration/pods/pods_test.go | 122 +++++++++++ 10 files changed, 580 insertions(+), 6 deletions(-) create mode 100644 plugin/pkg/admission/podtopologylabels/admission.go create mode 100644 plugin/pkg/admission/podtopologylabels/admission_test.go create mode 100644 plugin/pkg/admission/podtopologylabels/doc.go diff --git a/pkg/controlplane/apiserver/samples/generic/server/admission.go b/pkg/controlplane/apiserver/samples/generic/server/admission.go index eba588534c7..998fd4286bc 100644 --- a/pkg/controlplane/apiserver/samples/generic/server/admission.go +++ b/pkg/controlplane/apiserver/samples/generic/server/admission.go @@ -30,6 +30,7 @@ import ( certsigning "k8s.io/kubernetes/plugin/pkg/admission/certificates/signing" certsubjectrestriction "k8s.io/kubernetes/plugin/pkg/admission/certificates/subjectrestriction" "k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds" + "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount" ) @@ -48,6 +49,8 @@ func DefaultOffAdmissionPlugins() sets.Set[string] { certsubjectrestriction.PluginName, // CertificateSubjectRestriction validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy mutatingadmissionpolicy.PluginName, // MutatingAdmissionPolicy + podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled. + validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled ) return sets.New(options.AllOrderedPlugins...).Difference(defaultOnPlugins) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index ec9a8d2fd68..9e00230d383 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -964,6 +964,16 @@ const ( // restore the old behavior. Please file issues if you hit issues and have to use this Feature Gate. // The Feature Gate will be locked to true and then removed in +2 releases (1.35) if there are no bug reported DisableCPUQuotaWithExclusiveCPUs featuregate.Feature = "DisableCPUQuotaWithExclusiveCPUs" + + // owner: @munnerz + // kep: https://kep.k8s.io/4742 + // alpha: v1.33 + // + // Enables the PodTopologyLabelsAdmission admission plugin to automatically set node topology labels + // (i.e. those with the 'topology.k8s.io/' prefix on Node objects) onto Pod objects when they are + // bound/scheduled to a node. + // This allows workloads running in pods to understand the topology information of their assigned node. + PodTopologyLabelsAdmission featuregate.Feature = "PodTopologyLabelsAdmission" ) // defaultVersionedKubernetesFeatureGates consists of all known Kubernetes-specific feature keys with VersionedSpecs. diff --git a/pkg/kubeapiserver/options/plugins.go b/pkg/kubeapiserver/options/plugins.go index e9f8c41b814..0dccb55e2a4 100644 --- a/pkg/kubeapiserver/options/plugins.go +++ b/pkg/kubeapiserver/options/plugins.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/nodetaint" "k8s.io/kubernetes/plugin/pkg/admission/podnodeselector" "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction" + "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/runtimeclass" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" @@ -93,6 +94,7 @@ var AllOrderedPlugins = []string{ certsubjectrestriction.PluginName, // CertificateSubjectRestriction defaultingressclass.PluginName, // DefaultIngressClass denyserviceexternalips.PluginName, // DenyServiceExternalIPs + podtopologylabels.PluginName, // PodTopologyLabels // new admission plugins should generally be inserted above here // webhook, resourcequota, and deny plugins must go at the end @@ -138,6 +140,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) { certsigning.Register(plugins) ctbattest.Register(plugins) certsubjectrestriction.Register(plugins) + podtopologylabels.Register(plugins) } // DefaultOffAdmissionPlugins get admission plugins off by default for kube-apiserver. @@ -162,6 +165,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] { certsubjectrestriction.PluginName, // CertificateSubjectRestriction defaultingressclass.PluginName, // DefaultIngressClass podsecurity.PluginName, // PodSecurity + podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled. mutatingadmissionpolicy.PluginName, // Mutatingadmissionpolicy, only active when feature gate MutatingAdmissionpolicy is enabled validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled ) diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 8aaa358d085..0066e6bcfa7 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -33,10 +33,12 @@ import ( "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" "k8s.io/apiserver/pkg/util/dryrun" + utilfeature "k8s.io/apiserver/pkg/util/feature" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" podutil "k8s.io/kubernetes/pkg/api/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/validation" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/printers" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" @@ -191,7 +193,7 @@ func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Objec } } - err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun)) + err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, binding.Labels, dryrun.IsDryRun(options.DryRun)) out = &metav1.Status{Status: metav1.StatusSuccess} return } @@ -206,7 +208,7 @@ func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() // setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if // the pod is unassigned and merges the provided annotations with those of the pod. // Returns the current state of the pod, or an error. -func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) { +func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey, err := r.store.KeyFunc(ctx, podID) if err != nil { return nil, err @@ -245,6 +247,21 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types for k, v := range annotations { pod.Annotations[k] = v } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodTopologyLabelsAdmission) { + // If any labels are present on the Binding, copy them across to the Pod. + if len(labels) > 0 { + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + for k, v := range labels { + if _, ok := pod.Labels[k]; ok { + // don't overwrite labels that are already present on the Pod + continue + } + pod.Labels[k] = v + } + } + } podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{ Type: api.PodScheduled, Status: api.ConditionTrue, @@ -256,8 +273,8 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types } // assignPod assigns the given pod to the given machine. -func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) { - if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, dryRun); err != nil { +func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) { + if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { err = storeerr.InterpretGetError(err, api.Resource("pods"), podID) err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID) if _, ok := err.(*errors.StatusError); !ok { diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index 39177e5ff96..17e0de0a633 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/version" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" @@ -42,8 +43,11 @@ import ( apiserverstorage "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" podtest "k8s.io/kubernetes/pkg/api/pod/testing" api "k8s.io/kubernetes/pkg/apis/core" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/securitycontext" ) @@ -676,12 +680,15 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.32")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.SetPodTopologyLabels, true) // Suddenly, a wild scheduler appears: _, err = bindingStorage.Create(ctx, "foo", &api.Binding{ ObjectMeta: metav1.ObjectMeta{ Namespace: metav1.NamespaceDefault, Name: "foo", - Annotations: map[string]string{"label1": "value1"}, + Annotations: map[string]string{"annotation1": "value1"}, + Labels: map[string]string{"label1": "label-value1"}, }, Target: api.ObjectReference{Name: "machine"}, }, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) @@ -695,9 +702,12 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { } pod := obj.(*api.Pod) - if !(pod.Annotations != nil && pod.Annotations["label1"] == "value1") { + if !(pod.Annotations != nil && pod.Annotations["annotation1"] == "value1") { t.Fatalf("Pod annotations don't match the expected: %v", pod.Annotations) } + if !(pod.Labels != nil && pod.Labels["label1"] == "label-value1") { + t.Fatalf("Pod labels don't match the expected: %v", pod.Labels) + } } func TestEtcdCreateWithConflict(t *testing.T) { diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go new file mode 100644 index 00000000000..8467f1614ef --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -0,0 +1,205 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologylabels + +import ( + "context" + "fmt" + "io" + "strings" + + "k8s.io/klog/v2" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" + "k8s.io/client-go/informers" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/component-base/featuregate" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" +) + +// PluginName is a string with the name of the plugin +const PluginName = "PodTopologyLabels" + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, func(_ io.Reader) (admission.Interface, error) { + plugin := NewPodTopologyPlugin() + return plugin, nil + }) +} + +// NewPodTopologyPlugin initializes a Plugin +func NewPodTopologyPlugin() *Plugin { + return &Plugin{ + Handler: admission.NewHandler(admission.Create), + // Always copy zone and region labels. + labels: sets.New("topology.k8s.io/zone", "topology.k8s.io/region"), + // Also support copying arbitrary custom topology labels. + domains: sets.New("topology.k8s.io"), + // Copy any sub-domains of topology.k8s.io as well. + suffixes: sets.New(".topology.k8s.io"), + } +} + +type Plugin struct { + *admission.Handler + + nodeLister corev1listers.NodeLister + + // explicit labels, list of domains or a list of domain + // suffixes to be copies to Pod objects being bound. + labels, domains, suffixes sets.Set[string] + + enabled, inspectedFeatureGates bool +} + +var _ admission.MutationInterface = &Plugin{} +var _ genericadmissioninitializer.WantsExternalKubeInformerFactory = &Plugin{} +var _ genericadmissioninitializer.WantsFeatures = &Plugin{} + +// InspectFeatureGates implements WantsFeatures. +func (p *Plugin) InspectFeatureGates(featureGates featuregate.FeatureGate) { + p.enabled = featureGates.Enabled(features.PodTopologyLabelsAdmission) + p.inspectedFeatureGates = true +} + +func (p *Plugin) SetExternalKubeInformerFactory(factory informers.SharedInformerFactory) { + nodeInformer := factory.Core().V1().Nodes() + p.nodeLister = nodeInformer.Lister() + p.SetReadyFunc(nodeInformer.Informer().HasSynced) +} + +func (p *Plugin) ValidateInitialization() error { + if p.nodeLister == nil { + return fmt.Errorf("nodeLister not set") + } + if !p.inspectedFeatureGates { + return fmt.Errorf("feature gates not inspected") + } + return nil +} + +func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) { + if !p.enabled { + return nil + } + if shouldIgnore(a) { + return nil + } + // we need to wait for our caches to warm + if !p.WaitForReady() { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } + + binding := a.GetObject().(*api.Binding) + // other fields are not set by the default scheduler for the binding target, so only check the Kind. + if binding.Target.Kind != "Node" { + klog.V(6).Info("Skipping Pod being bound to non-Node object type", "target", binding.Target.GroupVersionKind()) + return nil + } + + node, err := p.nodeLister.Get(binding.Target.Name) + if err != nil { + // Ignore NotFound errors to avoid risking breaking compatibility/behaviour. + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + // fast-path/short circuit if the node has no labels + if node.Labels == nil { + return nil + } + + labelsToCopy := make(map[string]string) + for k, v := range node.Labels { + if !p.isTopologyLabel(k) { + continue + } + labelsToCopy[k] = v + } + + if len(labelsToCopy) == 0 { + // fast-path/short circuit if the node has no topology labels + return nil + } + + // copy the topology labels into the Binding's labels, as these are copied from the Binding + // to the Pod object being bound within the podBinding registry/store. + if binding.Labels == nil { + binding.Labels = make(map[string]string) + } + for k, v := range labelsToCopy { + if _, exists := binding.Labels[k]; exists { + // Don't overwrite labels on Binding resources as this could lead to unexpected + // behaviour if any schedulers rely on being able to explicitly set values themselves. + continue + } + binding.Labels[k] = v + } + + return nil +} + +func (p *Plugin) isTopologyLabel(key string) bool { + // First check explicit label keys. + if p.labels.Has(key) { + return true + } + // Check the domain portion of the label key, if present + domain, _, hasDomain := strings.Cut(key, "/") + if !hasDomain { + // fast-path if there is no / separator + return false + } + if p.domains.Has(domain) { + // check for explicit domains to copy + return true + } + for _, suffix := range p.suffixes.UnsortedList() { + // check if the domain has one of the suffixes that are to be copied + if strings.HasSuffix(domain, suffix) { + return true + } + } + return false +} + +func shouldIgnore(a admission.Attributes) bool { + resource := a.GetResource().GroupResource() + if resource != api.Resource("pods") { + return true + } + if a.GetSubresource() != "binding" { + // only run the checks below on the binding subresource + return true + } + + obj := a.GetObject() + _, ok := obj.(*api.Binding) + if !ok { + klog.Errorf("expected Binding but got %s", a.GetKind().Kind) + return true + } + + return false +} diff --git a/plugin/pkg/admission/podtopologylabels/admission_test.go b/plugin/pkg/admission/podtopologylabels/admission_test.go new file mode 100644 index 00000000000..ca0ecefb62e --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/admission_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologylabels + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + corev1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apiserver/pkg/admission" + genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" + admissiontesting "k8s.io/apiserver/pkg/admission/testing" + "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" + api "k8s.io/kubernetes/pkg/apis/core" + kubefeatures "k8s.io/kubernetes/pkg/features" +) + +// TestPodTopology verifies the pod topology admission plugin works as expected. +func TestPodTopology(t *testing.T) { + tests := []struct { + name string // name of the test case. + bindingTarget *api.ObjectReference // target being bound to. Defaults to a valid node with the provided labels. + targetNodeLabels map[string]string // list of labels set on the node being bound to. + existingBindingLabels map[string]string // list of labels that are set on the Binding prior to admission (aka by the client/scheduler) + expectedBindingLabels map[string]string // list of labels that we expect to be set on the Binding after admission. + featureDisabled bool // configure whether the SetPodTopologyLabels feature gate should be disabled. + }{ + { + name: "copies topology.k8s.io/zone and region labels to binding annotations", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + "non-topology.k8s.io/label": "something", // verify we don't unexpectedly copy non topology.k8s.io labels. + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + }, + }, + { + name: "copies arbitrary topology labels", + targetNodeLabels: map[string]string{ + "topology.k8s.io/arbitrary": "something", + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/arbitrary": "something", + }, + }, + { + name: "copies topology labels that use a subdomain", + targetNodeLabels: map[string]string{ + "something.topology.k8s.io/a-thing": "value", + }, + expectedBindingLabels: map[string]string{ + "something.topology.k8s.io/a-thing": "value", + }, + }, + { + name: "does not copy label keys that don't contain a / character", + targetNodeLabels: map[string]string{ + "topology.k8s.io": "value", + }, + existingBindingLabels: map[string]string{}, + }, + { + name: "does not overwrite existing topology labels on Binding objects", + existingBindingLabels: map[string]string{ + "topology.k8s.io/zone": "oldValue", + }, + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "newValue", + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/zone": "oldValue", + }, + }, + { + name: "does nothing if the SetPodTopologyLabels feature gate is disabled", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + }, + expectedBindingLabels: map[string]string{}, + featureDisabled: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ns"}, + } + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "valid-test-node", + Labels: test.targetNodeLabels, + }, + } + // Pod we bind during test cases. + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name}, + Spec: corev1.PodSpec{}, + } + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, feature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, + !test.featureDisabled) + mockClient := fake.NewSimpleClientset(namespace, node, pod) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + target := test.bindingTarget + if target == nil { + target = &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + } + } + binding := &api.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + Labels: test.existingBindingLabels, + }, + Target: *target, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + updatedBindingLabels := binding.Labels + if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels)) + } + }) + } +} + +// newHandlerForTest returns the admission controller configured for testing. +func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInformerFactory, error) { + factory := informers.NewSharedInformerFactory(c, 5*time.Minute) + handler := NewPodTopologyPlugin() + pluginInitializer := genericadmissioninitializer.New(c, nil, factory, nil, feature.DefaultFeatureGate, nil, nil) + pluginInitializer.Initialize(handler) + return handler, factory, admission.ValidateInitialization(handler) +} diff --git a/plugin/pkg/admission/podtopologylabels/doc.go b/plugin/pkg/admission/podtopologylabels/doc.go new file mode 100644 index 00000000000..99271759ddf --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package podtopologylabels is a plugin that mutates `pod/binding` requests +// to set `topology.k8s.io` labels (including subdomains) from the Node object +// referenced in the Binding to the Binding, which causes the Pod to also +// have these values set. +// If the binding target is NOT a Node object, no action is taken. +// If the referenced Node object does not exist, no action is taken. +package podtopologylabels // import "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index dd0ad06e1fc..658daa9c738 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1075,6 +1075,12 @@ lockToDefault: true preRelease: GA version: "1.30" +- name: PodTopologyLabelsAdmission + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.33" - name: PortForwardWebsockets versionedSpecs: - default: false diff --git a/test/integration/pods/pods_test.go b/test/integration/pods/pods_test.go index c02d3890d6d..33b23235f85 100644 --- a/test/integration/pods/pods_test.go +++ b/test/integration/pods/pods_test.go @@ -22,11 +22,15 @@ import ( "strings" "testing" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/version" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -40,6 +44,124 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) +func TestPodTopologyLabels(t *testing.T) { + tests := []podTopologyTestCase{ + { + name: "zone and region topology labels copied from assigned Node", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + }, + } + // Enable the feature BEFORE starting the test server, as the admission plugin only checks feature gates + // on start up and not on each invocation at runtime. + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, true) + testPodTopologyLabels(t, tests) +} + +func TestPodTopologyLabels_FeatureDisabled(t *testing.T) { + tests := []podTopologyTestCase{ + { + name: "does nothing when the feature is not enabled", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{}, + }, + } + // Disable the feature BEFORE starting the test server, as the admission plugin only checks feature gates + // on start up and not on each invocation at runtime. + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, false) + testPodTopologyLabels(t, tests) +} + +// podTopologyTestCase is defined outside of TestPodTopologyLabels to allow us to re-use the test implementation logic +// between the feature enabled and feature disabled tests. +// This will no longer be required once the feature gate graduates to GA/locked to being enabled. +type podTopologyTestCase struct { + name string + targetNodeLabels map[string]string + expectedPodLabels map[string]string +} + +func testPodTopologyLabels(t *testing.T, tests []podTopologyTestCase) { + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + client := clientset.NewForConfigOrDie(server.ClientConfig) + ns := framework.CreateNamespaceOrDie(client, "pod-topology-labels", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + prototypePod := func() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-topology-test-", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + } + } + prototypeNode := func() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "podtopology-test-node-", + }, + } + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Create the Node we are going to bind to. + node := prototypeNode() + // Set the labels on the Node we are going to create. + node.Labels = test.targetNodeLabels + ctx := context.Background() + + var err error + if node, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create node: %v", err) + } + + pod := prototypePod() + if pod, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create pod: %v", err) + } + + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, + Target: v1.ObjectReference{ + Kind: "Node", + Name: node.Name, + }, + } + if err := client.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to bind pod to node: %v", err) + } + + if pod, err = client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil { + t.Errorf("Failed to fetch bound Pod: %v", err) + } + + if !apiequality.Semantic.DeepEqual(pod.Labels, test.expectedPodLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(pod.Labels, test.expectedPodLabels)) + } + }) + } +} + func TestPodUpdateActiveDeadlineSeconds(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) From 6ddabb6ee64bba9fa71b98af6ebd0987f6e9d432 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Tue, 4 Mar 2025 17:58:27 +0000 Subject: [PATCH 2/6] updating to reflect latest KEP design --- .../samples/generic/server/admission.go | 3 -- .../samples/generic/server/admission_test.go | 2 + pkg/features/kube_features.go | 10 +++-- pkg/registry/core/pod/storage/storage.go | 36 ++++++++++------ pkg/registry/core/pod/storage/storage_test.go | 4 +- .../admission/podtopologylabels/admission.go | 43 +++++++++++++++---- .../podtopologylabels/admission_test.go | 15 ++++--- plugin/pkg/admission/podtopologylabels/doc.go | 7 +-- test/integration/pods/pods_test.go | 28 +++++++++++- 9 files changed, 107 insertions(+), 41 deletions(-) diff --git a/pkg/controlplane/apiserver/samples/generic/server/admission.go b/pkg/controlplane/apiserver/samples/generic/server/admission.go index 998fd4286bc..eba588534c7 100644 --- a/pkg/controlplane/apiserver/samples/generic/server/admission.go +++ b/pkg/controlplane/apiserver/samples/generic/server/admission.go @@ -30,7 +30,6 @@ import ( certsigning "k8s.io/kubernetes/plugin/pkg/admission/certificates/signing" certsubjectrestriction "k8s.io/kubernetes/plugin/pkg/admission/certificates/subjectrestriction" "k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds" - "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount" ) @@ -49,8 +48,6 @@ func DefaultOffAdmissionPlugins() sets.Set[string] { certsubjectrestriction.PluginName, // CertificateSubjectRestriction validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy mutatingadmissionpolicy.PluginName, // MutatingAdmissionPolicy - podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled. - validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled ) return sets.New(options.AllOrderedPlugins...).Difference(defaultOnPlugins) diff --git a/pkg/controlplane/apiserver/samples/generic/server/admission_test.go b/pkg/controlplane/apiserver/samples/generic/server/admission_test.go index b85423d7c7f..a16a257840a 100644 --- a/pkg/controlplane/apiserver/samples/generic/server/admission_test.go +++ b/pkg/controlplane/apiserver/samples/generic/server/admission_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/limitranger" "k8s.io/kubernetes/plugin/pkg/admission/network/defaultingressclass" "k8s.io/kubernetes/plugin/pkg/admission/nodetaint" + "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/runtimeclass" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" @@ -42,6 +43,7 @@ var intentionallyOffPlugins = sets.New[string]( runtimeclass.PluginName, // RuntimeClass defaultingressclass.PluginName, // DefaultIngressClass podsecurity.PluginName, // PodSecurity + podtopologylabels.PluginName, // PodTopologyLabels ) func TestDefaultOffAdmissionPlugins(t *testing.T) { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 9e00230d383..11dd7106c83 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -969,10 +969,14 @@ const ( // kep: https://kep.k8s.io/4742 // alpha: v1.33 // - // Enables the PodTopologyLabelsAdmission admission plugin to automatically set node topology labels - // (i.e. those with the 'topology.k8s.io/' prefix on Node objects) onto Pod objects when they are - // bound/scheduled to a node. + // Enables the PodTopologyLabelsAdmission admission plugin that mutates `pod/binding` + // requests by copying the `topology.k8s.io/{zone,region}` and `kubernetes.io/hostname` + // labels from the assigned Node object (in the Binding being admitted) onto the Binding + // so that it can be persisted onto the Pod object when the Pod is being scheduled. // This allows workloads running in pods to understand the topology information of their assigned node. + // Enabling this feature also permits external schedulers to set labels on pods in an atomic + // operation when scheduling a Pod by setting the `metadata.labels` field on the submitted Binding, + // similar to how `metadata.annotations` behaves. PodTopologyLabelsAdmission featuregate.Feature = "PodTopologyLabelsAdmission" ) diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 0066e6bcfa7..27f7c7fcfff 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -247,20 +247,12 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types for k, v := range annotations { pod.Annotations[k] = v } + // Copy all labels from the Binding over to the Pod object, but do not + // overwrite any existing labels that have been previously set, to avoid + // changing any existing behaviour for pods that may be defined with a + // template by users and created by higher-level controllers. if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodTopologyLabelsAdmission) { - // If any labels are present on the Binding, copy them across to the Pod. - if len(labels) > 0 { - if pod.Labels == nil { - pod.Labels = make(map[string]string) - } - for k, v := range labels { - if _, ok := pod.Labels[k]; ok { - // don't overwrite labels that are already present on the Pod - continue - } - pod.Labels[k] = v - } - } + copyLabelsWithoutOverwriting(pod, labels) } podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{ Type: api.PodScheduled, @@ -272,6 +264,24 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types return finalPod, err } +func copyLabelsWithoutOverwriting(pod *api.Pod, labels map[string]string) { + if len(labels) == 0 { + // nothing to do + return + } + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + // Iterate over the binding's labels and copy them across to the Pod. + for k, v := range labels { + if _, ok := pod.Labels[k]; ok { + // don't overwrite labels that are already present on the Pod + continue + } + pod.Labels[k] = v + } +} + // assignPod assigns the given pod to the given machine. func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) { if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index 17e0de0a633..753bb970b8a 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -680,8 +680,8 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.32")) - featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.SetPodTopologyLabels, true) + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, true) // Suddenly, a wild scheduler appears: _, err = bindingStorage.Create(ctx, "foo", &api.Binding{ ObjectMeta: metav1.ObjectMeta{ diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go index 8467f1614ef..c4189269592 100644 --- a/plugin/pkg/admission/podtopologylabels/admission.go +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -38,24 +38,49 @@ import ( // PluginName is a string with the name of the plugin const PluginName = "PodTopologyLabels" +// defaultConfig is the configuration used for the default instantiation of the plugin. +// This is the configured used by kube-apiserver. +// It is not exported to avoid any chance of accidentally mutating the variable. +var defaultConfig = Config{ + Labels: []string{"topology.k8s.io/zone", "topology.k8s.io/region", "kubernetes.io/hostname"}, +} + // Register registers a plugin func Register(plugins *admission.Plugins) { plugins.Register(PluginName, func(_ io.Reader) (admission.Interface, error) { - plugin := NewPodTopologyPlugin() + plugin := NewPodTopologyPlugin(defaultConfig) return plugin, nil }) } +// Config contains configuration for instances of the podtopologylabels admission plugin. +// This is not exposed as user-facing APIServer configuration, however can be used by +// platform operators when building custom topology label plugins. +type Config struct { + // Labels is set of explicit label keys to be copied from the Node object onto + // pod Binding objects during admission. + Labels []string + // Domains is a set of label key prefixes used to copy across label values + // for all labels with a given domain prefix. + // For example, `example.com` would match all labels matching `example.com/*`. + // Keys without a domain portion (i.e. those not containing a /) will never match. + Domains []string + // Suffixes is a set of label key domain suffixes used to copy label values for + // all labels of a given subdomain. + // This acts as a suffix match on the domain portion of label keys. + // If a suffix does not have a leading '.', one will be prepended to avoid + // programmer errors with values like `example.com` matching `notexample.com`. + // Keys without a domain portion (i.e. those not containing a /) will never match. + Suffixes []string +} + // NewPodTopologyPlugin initializes a Plugin -func NewPodTopologyPlugin() *Plugin { +func NewPodTopologyPlugin(c Config) *Plugin { return &Plugin{ - Handler: admission.NewHandler(admission.Create), - // Always copy zone and region labels. - labels: sets.New("topology.k8s.io/zone", "topology.k8s.io/region"), - // Also support copying arbitrary custom topology labels. - domains: sets.New("topology.k8s.io"), - // Copy any sub-domains of topology.k8s.io as well. - suffixes: sets.New(".topology.k8s.io"), + Handler: admission.NewHandler(admission.Create), + labels: sets.New(c.Labels...), + domains: sets.New(c.Domains...), + suffixes: sets.New(c.Suffixes...), } } diff --git a/plugin/pkg/admission/podtopologylabels/admission_test.go b/plugin/pkg/admission/podtopologylabels/admission_test.go index ca0ecefb62e..1b79203bbf3 100644 --- a/plugin/pkg/admission/podtopologylabels/admission_test.go +++ b/plugin/pkg/admission/podtopologylabels/admission_test.go @@ -54,6 +54,7 @@ func TestPodTopology(t *testing.T) { targetNodeLabels: map[string]string{ "topology.k8s.io/zone": "zone1", "topology.k8s.io/region": "region1", + "topology.k8s.io/arbitrary": "something", "non-topology.k8s.io/label": "something", // verify we don't unexpectedly copy non topology.k8s.io labels. }, expectedBindingLabels: map[string]string{ @@ -62,21 +63,23 @@ func TestPodTopology(t *testing.T) { }, }, { - name: "copies arbitrary topology labels", + name: "does not copy arbitrary topology labels", targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", "topology.k8s.io/arbitrary": "something", }, expectedBindingLabels: map[string]string{ - "topology.k8s.io/arbitrary": "something", + "topology.k8s.io/zone": "zone1", }, }, { - name: "copies topology labels that use a subdomain", + name: "does not copy topology labels that use a subdomain", targetNodeLabels: map[string]string{ - "something.topology.k8s.io/a-thing": "value", + "topology.k8s.io/region": "region1", + "sub.topology.k8s.io/zone": "value", }, expectedBindingLabels: map[string]string{ - "something.topology.k8s.io/a-thing": "value", + "topology.k8s.io/region": "region1", }, }, { @@ -167,7 +170,7 @@ func TestPodTopology(t *testing.T) { // newHandlerForTest returns the admission controller configured for testing. func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInformerFactory, error) { factory := informers.NewSharedInformerFactory(c, 5*time.Minute) - handler := NewPodTopologyPlugin() + handler := NewPodTopologyPlugin(defaultConfig) // todo: write additional test cases with non-default config. pluginInitializer := genericadmissioninitializer.New(c, nil, factory, nil, feature.DefaultFeatureGate, nil, nil) pluginInitializer.Initialize(handler) return handler, factory, admission.ValidateInitialization(handler) diff --git a/plugin/pkg/admission/podtopologylabels/doc.go b/plugin/pkg/admission/podtopologylabels/doc.go index 99271759ddf..2ae51a8e03f 100644 --- a/plugin/pkg/admission/podtopologylabels/doc.go +++ b/plugin/pkg/admission/podtopologylabels/doc.go @@ -15,9 +15,10 @@ limitations under the License. */ // Package podtopologylabels is a plugin that mutates `pod/binding` requests -// to set `topology.k8s.io` labels (including subdomains) from the Node object -// referenced in the Binding to the Binding, which causes the Pod to also -// have these values set. +// by copying the `topology.k8s.io/{zone,region}` and `kubernetes.io/hostname` +// labels from the assigned Node object (in the Binding being admitted) onto +// the Binding so that it can be persisted onto the Pod object when the Pod +// is being scheduled. // If the binding target is NOT a Node object, no action is taken. // If the referenced Node object does not exist, no action is taken. package podtopologylabels // import "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" diff --git a/test/integration/pods/pods_test.go b/test/integration/pods/pods_test.go index 33b23235f85..c1c88dc2b71 100644 --- a/test/integration/pods/pods_test.go +++ b/test/integration/pods/pods_test.go @@ -57,6 +57,28 @@ func TestPodTopologyLabels(t *testing.T) { "topology.k8s.io/region": "region", }, }, + { + name: "subdomains of topology.k8s.io are not copied", + targetNodeLabels: map[string]string{ + "sub.topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/region": "region", + }, + }, + { + name: "custom topology.k8s.io labels are not copied", + targetNodeLabels: map[string]string{ + "topology.k8s.io/custom": "thing", + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + }, } // Enable the feature BEFORE starting the test server, as the admission plugin only checks feature gates // on start up and not on each invocation at runtime. @@ -70,8 +92,10 @@ func TestPodTopologyLabels_FeatureDisabled(t *testing.T) { { name: "does nothing when the feature is not enabled", targetNodeLabels: map[string]string{ - "topology.k8s.io/zone": "zone", - "topology.k8s.io/region": "region", + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + "topology.k8s.io/custom": "thing", + "sub.topology.k8s.io/zone": "zone", }, expectedPodLabels: map[string]string{}, }, From 934e247030bb100cda3f43c5fe89dc669d36d59e Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Wed, 19 Mar 2025 16:44:28 +0000 Subject: [PATCH 3/6] Remove kubernetes.io/hostname label copying, skip overriding, and support direct spec.nodeName changes. --- pkg/registry/core/pod/storage/storage.go | 8 +- pkg/registry/core/pod/storage/storage_test.go | 2 - .../admission/podtopologylabels/admission.go | 184 ++++++++++-------- .../podtopologylabels/admission_test.go | 135 +++++++++---- plugin/pkg/admission/podtopologylabels/doc.go | 9 +- 5 files changed, 204 insertions(+), 134 deletions(-) diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 27f7c7fcfff..b93a38cd192 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -205,10 +205,10 @@ func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() return true } -// setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if -// the pod is unassigned and merges the provided annotations with those of the pod. +// setPodNodeAndMetadata sets the given pod's nodeName to 'machine' if and only if +// the pod is unassigned, and merges the provided annotations and labels with those of the pod. // Returns the current state of the pod, or an error. -func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) { +func (r *BindingREST) setPodNodeAndMetadata(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey, err := r.store.KeyFunc(ctx, podID) if err != nil { return nil, err @@ -284,7 +284,7 @@ func copyLabelsWithoutOverwriting(pod *api.Pod, labels map[string]string) { // assignPod assigns the given pod to the given machine. func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) { - if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { + if _, err = r.setPodNodeAndMetadata(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { err = storeerr.InterpretGetError(err, api.Resource("pods"), podID) err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID) if _, ok := err.(*errors.StatusError); !ok { diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index 753bb970b8a..a2a0ac6c0a1 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/version" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" @@ -680,7 +679,6 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, true) // Suddenly, a wild scheduler appears: _, err = bindingStorage.Create(ctx, "foo", &api.Binding{ diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go index c4189269592..d0d6d2066f9 100644 --- a/plugin/pkg/admission/podtopologylabels/admission.go +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "io" - "strings" "k8s.io/klog/v2" @@ -39,7 +38,7 @@ import ( const PluginName = "PodTopologyLabels" // defaultConfig is the configuration used for the default instantiation of the plugin. -// This is the configured used by kube-apiserver. +// This configuration is used by kube-apiserver. // It is not exported to avoid any chance of accidentally mutating the variable. var defaultConfig = Config{ Labels: []string{"topology.k8s.io/zone", "topology.k8s.io/region", "kubernetes.io/hostname"}, @@ -60,27 +59,13 @@ type Config struct { // Labels is set of explicit label keys to be copied from the Node object onto // pod Binding objects during admission. Labels []string - // Domains is a set of label key prefixes used to copy across label values - // for all labels with a given domain prefix. - // For example, `example.com` would match all labels matching `example.com/*`. - // Keys without a domain portion (i.e. those not containing a /) will never match. - Domains []string - // Suffixes is a set of label key domain suffixes used to copy label values for - // all labels of a given subdomain. - // This acts as a suffix match on the domain portion of label keys. - // If a suffix does not have a leading '.', one will be prepended to avoid - // programmer errors with values like `example.com` matching `notexample.com`. - // Keys without a domain portion (i.e. those not containing a /) will never match. - Suffixes []string } // NewPodTopologyPlugin initializes a Plugin func NewPodTopologyPlugin(c Config) *Plugin { return &Plugin{ - Handler: admission.NewHandler(admission.Create), - labels: sets.New(c.Labels...), - domains: sets.New(c.Domains...), - suffixes: sets.New(c.Suffixes...), + Handler: admission.NewHandler(admission.Create), + labels: sets.New(c.Labels...), } } @@ -89,9 +74,8 @@ type Plugin struct { nodeLister corev1listers.NodeLister - // explicit labels, list of domains or a list of domain - // suffixes to be copies to Pod objects being bound. - labels, domains, suffixes sets.Set[string] + // explicit labels to be copied to Pod objects being bound. + labels sets.Set[string] enabled, inspectedFeatureGates bool } @@ -126,14 +110,21 @@ func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission. if !p.enabled { return nil } - if shouldIgnore(a) { - return nil + // check whether the request is for a Binding or a Pod spec update. + shouldAdmit, doAdmit, err := p.shouldAdmit(a) + if !shouldAdmit || err != nil { + // error is either nil and admit == false, or err is non-nil and should be returned. + return err } // we need to wait for our caches to warm if !p.WaitForReady() { return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) } + // run type specific admission + return doAdmit(ctx, a, o) +} +func (p *Plugin) admitBinding(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { binding := a.GetObject().(*api.Binding) // other fields are not set by the default scheduler for the binding target, so only check the Kind. if binding.Target.Kind != "Node" { @@ -141,28 +132,10 @@ func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission. return nil } - node, err := p.nodeLister.Get(binding.Target.Name) + labelsToCopy, err := p.topologyLabelsForNodeName(binding.Target.Name) if err != nil { - // Ignore NotFound errors to avoid risking breaking compatibility/behaviour. - if apierrors.IsNotFound(err) { - return nil - } return err } - - // fast-path/short circuit if the node has no labels - if node.Labels == nil { - return nil - } - - labelsToCopy := make(map[string]string) - for k, v := range node.Labels { - if !p.isTopologyLabel(k) { - continue - } - labelsToCopy[k] = v - } - if len(labelsToCopy) == 0 { // fast-path/short circuit if the node has no topology labels return nil @@ -173,58 +146,101 @@ func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission. if binding.Labels == nil { binding.Labels = make(map[string]string) } - for k, v := range labelsToCopy { - if _, exists := binding.Labels[k]; exists { - // Don't overwrite labels on Binding resources as this could lead to unexpected - // behaviour if any schedulers rely on being able to explicitly set values themselves. - continue - } - binding.Labels[k] = v - } + mergeLabels(binding.Labels, labelsToCopy) return nil } +func (p *Plugin) admitPod(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { + pod := a.GetObject().(*api.Pod) + if pod.Spec.NodeName == "" { + // pod has not been scheduled yet + return nil + } + + // Determine the topology labels from the assigned node to be copied. + labelsToCopy, err := p.topologyLabelsForNodeName(pod.Spec.NodeName) + if err != nil { + return err + } + if len(labelsToCopy) == 0 { + // fast-path/short circuit if the node has no topology labels + return nil + } + + // copy the topology labels into the Pod's labels. + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + // avoid overwriting any existing labels on the Pod. + mergeLabels(pod.Labels, labelsToCopy) + + return nil +} + +func (p *Plugin) topologyLabelsForNodeName(nodeName string) (map[string]string, error) { + labels := make(map[string]string) + node, err := p.nodeLister.Get(nodeName) + if err != nil { + // Ignore NotFound errors to avoid risking breaking compatibility/behaviour. + if apierrors.IsNotFound(err) { + return labels, nil + } + return nil, err + } + + for k, v := range node.Labels { + if !p.isTopologyLabel(k) { + continue + } + labels[k] = v + } + + return labels, nil +} + +// mergeLabels merges new into existing, without overwriting existing keys. +func mergeLabels(existing, new map[string]string) { + for k, v := range new { + if _, exists := existing[k]; exists { + continue + } + existing[k] = v + } +} + func (p *Plugin) isTopologyLabel(key string) bool { // First check explicit label keys. if p.labels.Has(key) { return true } - // Check the domain portion of the label key, if present - domain, _, hasDomain := strings.Cut(key, "/") - if !hasDomain { - // fast-path if there is no / separator - return false + return false +} + +type admitFunc func(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) + +// shouldAdmit inspects the provided adminssion attributes to determine whether the request +// requires admittance through this plugin. +func (p *Plugin) shouldAdmit(a admission.Attributes) (bool, admitFunc, error) { + if a.GetResource().GroupResource() != api.Resource("pods") { + return false, nil, nil } - if p.domains.Has(domain) { - // check for explicit domains to copy - return true - } - for _, suffix := range p.suffixes.UnsortedList() { - // check if the domain has one of the suffixes that are to be copied - if strings.HasSuffix(domain, suffix) { - return true + + switch a.GetSubresource() { + case "": // regular Pod endpoint + _, ok := a.GetObject().(*api.Pod) + if !ok { + return false, nil, fmt.Errorf("expected Pod but got %T", a.GetObject()) } + return true, p.admitPod, nil + case "binding": + _, ok := a.GetObject().(*api.Binding) + if !ok { + return false, nil, fmt.Errorf("expected Binding but got %s", a.GetKind().Kind) + } + return true, p.admitBinding, nil + default: + // Ignore all other sub-resources. + return false, nil, nil } - return false -} - -func shouldIgnore(a admission.Attributes) bool { - resource := a.GetResource().GroupResource() - if resource != api.Resource("pods") { - return true - } - if a.GetSubresource() != "binding" { - // only run the checks below on the binding subresource - return true - } - - obj := a.GetObject() - _, ok := obj.(*api.Binding) - if !ok { - klog.Errorf("expected Binding but got %s", a.GetKind().Kind) - return true - } - - return false } diff --git a/plugin/pkg/admission/podtopologylabels/admission_test.go b/plugin/pkg/admission/podtopologylabels/admission_test.go index 1b79203bbf3..f85aba70c54 100644 --- a/plugin/pkg/admission/podtopologylabels/admission_test.go +++ b/plugin/pkg/admission/podtopologylabels/admission_test.go @@ -26,7 +26,6 @@ import ( corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/version" "k8s.io/apiserver/pkg/admission" genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" admissiontesting "k8s.io/apiserver/pkg/admission/testing" @@ -50,7 +49,7 @@ func TestPodTopology(t *testing.T) { featureDisabled bool // configure whether the SetPodTopologyLabels feature gate should be disabled. }{ { - name: "copies topology.k8s.io/zone and region labels to binding annotations", + name: "copies topology.k8s.io/zone and region labels to binding labels", targetNodeLabels: map[string]string{ "topology.k8s.io/zone": "zone1", "topology.k8s.io/region": "region1", @@ -123,46 +122,102 @@ func TestPodTopology(t *testing.T) { Labels: test.targetNodeLabels, }, } - // Pod we bind during test cases. - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name}, - Spec: corev1.PodSpec{}, - } - featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, feature.DefaultFeatureGate, version.MustParse("1.33")) - featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, - !test.featureDisabled) - mockClient := fake.NewSimpleClientset(namespace, node, pod) - handler, informerFactory, err := newHandlerForTest(mockClient) - if err != nil { - t.Fatalf("unexpected error initializing handler: %v", err) - } - stopCh := make(chan struct{}) - defer close(stopCh) - informerFactory.Start(stopCh) - target := test.bindingTarget - if target == nil { - target = &api.ObjectReference{ - Kind: "Node", - Name: node.Name, + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, !test.featureDisabled) + + t.Run("using Pod directly (update)", func(t *testing.T) { + // set up the client and informers + mockClient := fake.NewSimpleClientset(namespace, node) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) } - } - binding := &api.Binding{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - Labels: test.existingBindingLabels, - }, - Target: *target, - } - if err := admissiontesting.WithReinvocationTesting(t, handler). - Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil { - t.Errorf("failed running admission plugin: %v", err) - } - updatedBindingLabels := binding.Labels - if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) { - t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels)) - } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + oldPod := &api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels}, + Spec: api.PodSpec{}, + } + pod := oldPod.DeepCopy() + pod.Spec.NodeName = node.Name + + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(pod, oldPod, + api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, + api.Resource("pods").WithVersion("version"), "", admission.Update, &metav1.UpdateOptions{}, + false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + }) + + t.Run("using Pod directly (create)", func(t *testing.T) { + // set up the client and informers + mockClient := fake.NewSimpleClientset(namespace, node) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + pod := &api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels}, + Spec: api.PodSpec{ + NodeName: node.Name, + }, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(pod, nil, + api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, + api.Resource("pods").WithVersion("version"), "", admission.Create, &metav1.UpdateOptions{}, + false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + }) + + t.Run("using Binding subresource", func(t *testing.T) { + // Pod we bind during test cases. + existingPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name}, + Spec: corev1.PodSpec{}, + } + mockClient := fake.NewSimpleClientset(namespace, node, existingPod) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + // create and submit a Binding object + target := test.bindingTarget + if target == nil { + target = &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + } + } + binding := &api.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: existingPod.Name, + Namespace: existingPod.Namespace, + Labels: test.existingBindingLabels, + }, + Target: *target, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), existingPod.Namespace, existingPod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + updatedBindingLabels := binding.Labels + if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels)) + } + }) }) } } diff --git a/plugin/pkg/admission/podtopologylabels/doc.go b/plugin/pkg/admission/podtopologylabels/doc.go index 2ae51a8e03f..56131f0b70a 100644 --- a/plugin/pkg/admission/podtopologylabels/doc.go +++ b/plugin/pkg/admission/podtopologylabels/doc.go @@ -15,10 +15,11 @@ limitations under the License. */ // Package podtopologylabels is a plugin that mutates `pod/binding` requests -// by copying the `topology.k8s.io/{zone,region}` and `kubernetes.io/hostname` -// labels from the assigned Node object (in the Binding being admitted) onto -// the Binding so that it can be persisted onto the Pod object when the Pod -// is being scheduled. +// by copying the `topology.k8s.io/{zone,region}` labels from the assigned Node +// object (in the Binding being admitted) onto the Binding so that it can be +// persisted onto the Pod object when the Pod is being scheduled. +// Requests for the regular `pods` resource that set the `spec.nodeName` will +// also trigger the plugin to copy the labels as described. // If the binding target is NOT a Node object, no action is taken. // If the referenced Node object does not exist, no action is taken. package podtopologylabels // import "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" From 01ae1b1b5a5e2210d1aeebc5e955e1aacf8e7e62 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Thu, 20 Mar 2025 20:12:02 +0000 Subject: [PATCH 4/6] remove kubernetes.io/hostname label copying --- pkg/features/kube_features.go | 4 ++-- plugin/pkg/admission/podtopologylabels/admission.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 11dd7106c83..a6e058523ce 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -970,8 +970,8 @@ const ( // alpha: v1.33 // // Enables the PodTopologyLabelsAdmission admission plugin that mutates `pod/binding` - // requests by copying the `topology.k8s.io/{zone,region}` and `kubernetes.io/hostname` - // labels from the assigned Node object (in the Binding being admitted) onto the Binding + // requests by copying the `topology.k8s.io/{zone,region}` labels from the assigned + // Node object (in the Binding being admitted) onto the Binding // so that it can be persisted onto the Pod object when the Pod is being scheduled. // This allows workloads running in pods to understand the topology information of their assigned node. // Enabling this feature also permits external schedulers to set labels on pods in an atomic diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go index d0d6d2066f9..a2c22e786a9 100644 --- a/plugin/pkg/admission/podtopologylabels/admission.go +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -41,7 +41,7 @@ const PluginName = "PodTopologyLabels" // This configuration is used by kube-apiserver. // It is not exported to avoid any chance of accidentally mutating the variable. var defaultConfig = Config{ - Labels: []string{"topology.k8s.io/zone", "topology.k8s.io/region", "kubernetes.io/hostname"}, + Labels: []string{"topology.k8s.io/zone", "topology.k8s.io/region"}, } // Register registers a plugin From a490960c92bdae770fe3532943dcc6df582ef529 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Thu, 20 Mar 2025 20:23:19 +0000 Subject: [PATCH 5/6] fixup! KEP-4742: Node Topology Labels via Downward API --- pkg/features/kube_features.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index a6e058523ce..ace72de0fea 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -1585,6 +1585,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.30; remove in 1.32 }, + PodTopologyLabelsAdmission: { + {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha}, + }, + PortForwardWebsockets: { {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta}, From 8cfb9adbf60397fd1264ea7b0900419884a09719 Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Thu, 20 Mar 2025 22:02:40 +0000 Subject: [PATCH 6/6] overwrite existing labels during pod Binding storage --- pkg/registry/core/pod/storage/storage.go | 14 ++++--------- .../admission/podtopologylabels/admission.go | 7 ++----- .../podtopologylabels/admission_test.go | 4 ++-- test/integration/pods/pods_test.go | 20 +++++++++++++++++++ 4 files changed, 28 insertions(+), 17 deletions(-) diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index b93a38cd192..7323efe5faa 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -247,12 +247,10 @@ func (r *BindingREST) setPodNodeAndMetadata(ctx context.Context, podUID types.UI for k, v := range annotations { pod.Annotations[k] = v } - // Copy all labels from the Binding over to the Pod object, but do not - // overwrite any existing labels that have been previously set, to avoid - // changing any existing behaviour for pods that may be defined with a - // template by users and created by higher-level controllers. + // Copy all labels from the Binding over to the Pod object, overwriting + // any existing labels set on the Pod. if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodTopologyLabelsAdmission) { - copyLabelsWithoutOverwriting(pod, labels) + copyLabelsWithOverwriting(pod, labels) } podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{ Type: api.PodScheduled, @@ -264,7 +262,7 @@ func (r *BindingREST) setPodNodeAndMetadata(ctx context.Context, podUID types.UI return finalPod, err } -func copyLabelsWithoutOverwriting(pod *api.Pod, labels map[string]string) { +func copyLabelsWithOverwriting(pod *api.Pod, labels map[string]string) { if len(labels) == 0 { // nothing to do return @@ -274,10 +272,6 @@ func copyLabelsWithoutOverwriting(pod *api.Pod, labels map[string]string) { } // Iterate over the binding's labels and copy them across to the Pod. for k, v := range labels { - if _, ok := pod.Labels[k]; ok { - // don't overwrite labels that are already present on the Pod - continue - } pod.Labels[k] = v } } diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go index a2c22e786a9..3b010df6fe6 100644 --- a/plugin/pkg/admission/podtopologylabels/admission.go +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -172,7 +172,7 @@ func (p *Plugin) admitPod(ctx context.Context, a admission.Attributes, o admissi if pod.Labels == nil { pod.Labels = make(map[string]string) } - // avoid overwriting any existing labels on the Pod. + // overwrite any existing labels on the pod mergeLabels(pod.Labels, labelsToCopy) return nil @@ -199,12 +199,9 @@ func (p *Plugin) topologyLabelsForNodeName(nodeName string) (map[string]string, return labels, nil } -// mergeLabels merges new into existing, without overwriting existing keys. +// mergeLabels merges new into existing, overwriting existing keys. func mergeLabels(existing, new map[string]string) { for k, v := range new { - if _, exists := existing[k]; exists { - continue - } existing[k] = v } } diff --git a/plugin/pkg/admission/podtopologylabels/admission_test.go b/plugin/pkg/admission/podtopologylabels/admission_test.go index f85aba70c54..956c2a934c2 100644 --- a/plugin/pkg/admission/podtopologylabels/admission_test.go +++ b/plugin/pkg/admission/podtopologylabels/admission_test.go @@ -89,7 +89,7 @@ func TestPodTopology(t *testing.T) { existingBindingLabels: map[string]string{}, }, { - name: "does not overwrite existing topology labels on Binding objects", + name: "overwrites existing topology labels", existingBindingLabels: map[string]string{ "topology.k8s.io/zone": "oldValue", }, @@ -97,7 +97,7 @@ func TestPodTopology(t *testing.T) { "topology.k8s.io/zone": "newValue", }, expectedBindingLabels: map[string]string{ - "topology.k8s.io/zone": "oldValue", + "topology.k8s.io/zone": "newValue", }, }, { diff --git a/test/integration/pods/pods_test.go b/test/integration/pods/pods_test.go index c1c88dc2b71..bc77f0b04aa 100644 --- a/test/integration/pods/pods_test.go +++ b/test/integration/pods/pods_test.go @@ -79,6 +79,24 @@ func TestPodTopologyLabels(t *testing.T) { "topology.k8s.io/region": "region", }, }, + { + name: "labels from Bindings overwriting existing labels on Pod", + existingPodLabels: map[string]string{ + "topology.k8s.io/zone": "bad-zone", + "topology.k8s.io/region": "bad-region", + "topology.k8s.io/abc": "123", + }, + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + "topology.k8s.io/abc": "456", // this label isn't in (zone, region) so isn't copied + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + "topology.k8s.io/abc": "123", + }, + }, } // Enable the feature BEFORE starting the test server, as the admission plugin only checks feature gates // on start up and not on each invocation at runtime. @@ -113,6 +131,7 @@ func TestPodTopologyLabels_FeatureDisabled(t *testing.T) { type podTopologyTestCase struct { name string targetNodeLabels map[string]string + existingPodLabels map[string]string expectedPodLabels map[string]string } @@ -160,6 +179,7 @@ func testPodTopologyLabels(t *testing.T, tests []podTopologyTestCase) { } pod := prototypePod() + pod.Labels = test.existingPodLabels if pod, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil { t.Errorf("Failed to create pod: %v", err) }