From 5e7e1e7cf168c8565ffbab2fff3fe65e823770fe Mon Sep 17 00:00:00 2001 From: James Munnelly Date: Mon, 3 Mar 2025 13:17:29 +0000 Subject: [PATCH] KEP-4742: Node Topology Labels via Downward API --- .../samples/generic/server/admission.go | 3 + pkg/features/kube_features.go | 10 + pkg/kubeapiserver/options/plugins.go | 4 + pkg/registry/core/pod/storage/storage.go | 25 ++- pkg/registry/core/pod/storage/storage_test.go | 14 +- .../admission/podtopologylabels/admission.go | 205 ++++++++++++++++++ .../podtopologylabels/admission_test.go | 174 +++++++++++++++ plugin/pkg/admission/podtopologylabels/doc.go | 23 ++ .../reference/versioned_feature_list.yaml | 6 + test/integration/pods/pods_test.go | 122 +++++++++++ 10 files changed, 580 insertions(+), 6 deletions(-) create mode 100644 plugin/pkg/admission/podtopologylabels/admission.go create mode 100644 plugin/pkg/admission/podtopologylabels/admission_test.go create mode 100644 plugin/pkg/admission/podtopologylabels/doc.go diff --git a/pkg/controlplane/apiserver/samples/generic/server/admission.go b/pkg/controlplane/apiserver/samples/generic/server/admission.go index eba588534c7..998fd4286bc 100644 --- a/pkg/controlplane/apiserver/samples/generic/server/admission.go +++ b/pkg/controlplane/apiserver/samples/generic/server/admission.go @@ -30,6 +30,7 @@ import ( certsigning "k8s.io/kubernetes/plugin/pkg/admission/certificates/signing" certsubjectrestriction "k8s.io/kubernetes/plugin/pkg/admission/certificates/subjectrestriction" "k8s.io/kubernetes/plugin/pkg/admission/defaulttolerationseconds" + "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount" ) @@ -48,6 +49,8 @@ func DefaultOffAdmissionPlugins() sets.Set[string] { certsubjectrestriction.PluginName, // CertificateSubjectRestriction validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy mutatingadmissionpolicy.PluginName, // MutatingAdmissionPolicy + podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled. + validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled ) return sets.New(options.AllOrderedPlugins...).Difference(defaultOnPlugins) diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index ec9a8d2fd68..9e00230d383 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -964,6 +964,16 @@ const ( // restore the old behavior. Please file issues if you hit issues and have to use this Feature Gate. // The Feature Gate will be locked to true and then removed in +2 releases (1.35) if there are no bug reported DisableCPUQuotaWithExclusiveCPUs featuregate.Feature = "DisableCPUQuotaWithExclusiveCPUs" + + // owner: @munnerz + // kep: https://kep.k8s.io/4742 + // alpha: v1.33 + // + // Enables the PodTopologyLabelsAdmission admission plugin to automatically set node topology labels + // (i.e. those with the 'topology.k8s.io/' prefix on Node objects) onto Pod objects when they are + // bound/scheduled to a node. + // This allows workloads running in pods to understand the topology information of their assigned node. + PodTopologyLabelsAdmission featuregate.Feature = "PodTopologyLabelsAdmission" ) // defaultVersionedKubernetesFeatureGates consists of all known Kubernetes-specific feature keys with VersionedSpecs. diff --git a/pkg/kubeapiserver/options/plugins.go b/pkg/kubeapiserver/options/plugins.go index e9f8c41b814..0dccb55e2a4 100644 --- a/pkg/kubeapiserver/options/plugins.go +++ b/pkg/kubeapiserver/options/plugins.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/nodetaint" "k8s.io/kubernetes/plugin/pkg/admission/podnodeselector" "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction" + "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/runtimeclass" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" @@ -93,6 +94,7 @@ var AllOrderedPlugins = []string{ certsubjectrestriction.PluginName, // CertificateSubjectRestriction defaultingressclass.PluginName, // DefaultIngressClass denyserviceexternalips.PluginName, // DenyServiceExternalIPs + podtopologylabels.PluginName, // PodTopologyLabels // new admission plugins should generally be inserted above here // webhook, resourcequota, and deny plugins must go at the end @@ -138,6 +140,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) { certsigning.Register(plugins) ctbattest.Register(plugins) certsubjectrestriction.Register(plugins) + podtopologylabels.Register(plugins) } // DefaultOffAdmissionPlugins get admission plugins off by default for kube-apiserver. @@ -162,6 +165,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] { certsubjectrestriction.PluginName, // CertificateSubjectRestriction defaultingressclass.PluginName, // DefaultIngressClass podsecurity.PluginName, // PodSecurity + podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled. mutatingadmissionpolicy.PluginName, // Mutatingadmissionpolicy, only active when feature gate MutatingAdmissionpolicy is enabled validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled ) diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 8aaa358d085..0066e6bcfa7 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -33,10 +33,12 @@ import ( "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" "k8s.io/apiserver/pkg/util/dryrun" + utilfeature "k8s.io/apiserver/pkg/util/feature" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" podutil "k8s.io/kubernetes/pkg/api/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/validation" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/printers" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" @@ -191,7 +193,7 @@ func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Objec } } - err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun)) + err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, binding.Labels, dryrun.IsDryRun(options.DryRun)) out = &metav1.Status{Status: metav1.StatusSuccess} return } @@ -206,7 +208,7 @@ func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() // setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if // the pod is unassigned and merges the provided annotations with those of the pod. // Returns the current state of the pod, or an error. -func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) { +func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey, err := r.store.KeyFunc(ctx, podID) if err != nil { return nil, err @@ -245,6 +247,21 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types for k, v := range annotations { pod.Annotations[k] = v } + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodTopologyLabelsAdmission) { + // If any labels are present on the Binding, copy them across to the Pod. + if len(labels) > 0 { + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + for k, v := range labels { + if _, ok := pod.Labels[k]; ok { + // don't overwrite labels that are already present on the Pod + continue + } + pod.Labels[k] = v + } + } + } podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{ Type: api.PodScheduled, Status: api.ConditionTrue, @@ -256,8 +273,8 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types } // assignPod assigns the given pod to the given machine. -func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) { - if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, dryRun); err != nil { +func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) { + if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { err = storeerr.InterpretGetError(err, api.Resource("pods"), podID) err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID) if _, ok := err.(*errors.StatusError); !ok { diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index 39177e5ff96..17e0de0a633 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/version" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" @@ -42,8 +43,11 @@ import ( apiserverstorage "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" podtest "k8s.io/kubernetes/pkg/api/pod/testing" api "k8s.io/kubernetes/pkg/apis/core" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/securitycontext" ) @@ -676,12 +680,15 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.32")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.SetPodTopologyLabels, true) // Suddenly, a wild scheduler appears: _, err = bindingStorage.Create(ctx, "foo", &api.Binding{ ObjectMeta: metav1.ObjectMeta{ Namespace: metav1.NamespaceDefault, Name: "foo", - Annotations: map[string]string{"label1": "value1"}, + Annotations: map[string]string{"annotation1": "value1"}, + Labels: map[string]string{"label1": "label-value1"}, }, Target: api.ObjectReference{Name: "machine"}, }, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) @@ -695,9 +702,12 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { } pod := obj.(*api.Pod) - if !(pod.Annotations != nil && pod.Annotations["label1"] == "value1") { + if !(pod.Annotations != nil && pod.Annotations["annotation1"] == "value1") { t.Fatalf("Pod annotations don't match the expected: %v", pod.Annotations) } + if !(pod.Labels != nil && pod.Labels["label1"] == "label-value1") { + t.Fatalf("Pod labels don't match the expected: %v", pod.Labels) + } } func TestEtcdCreateWithConflict(t *testing.T) { diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go new file mode 100644 index 00000000000..8467f1614ef --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -0,0 +1,205 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologylabels + +import ( + "context" + "fmt" + "io" + "strings" + + "k8s.io/klog/v2" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" + "k8s.io/client-go/informers" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/component-base/featuregate" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" +) + +// PluginName is a string with the name of the plugin +const PluginName = "PodTopologyLabels" + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, func(_ io.Reader) (admission.Interface, error) { + plugin := NewPodTopologyPlugin() + return plugin, nil + }) +} + +// NewPodTopologyPlugin initializes a Plugin +func NewPodTopologyPlugin() *Plugin { + return &Plugin{ + Handler: admission.NewHandler(admission.Create), + // Always copy zone and region labels. + labels: sets.New("topology.k8s.io/zone", "topology.k8s.io/region"), + // Also support copying arbitrary custom topology labels. + domains: sets.New("topology.k8s.io"), + // Copy any sub-domains of topology.k8s.io as well. + suffixes: sets.New(".topology.k8s.io"), + } +} + +type Plugin struct { + *admission.Handler + + nodeLister corev1listers.NodeLister + + // explicit labels, list of domains or a list of domain + // suffixes to be copies to Pod objects being bound. + labels, domains, suffixes sets.Set[string] + + enabled, inspectedFeatureGates bool +} + +var _ admission.MutationInterface = &Plugin{} +var _ genericadmissioninitializer.WantsExternalKubeInformerFactory = &Plugin{} +var _ genericadmissioninitializer.WantsFeatures = &Plugin{} + +// InspectFeatureGates implements WantsFeatures. +func (p *Plugin) InspectFeatureGates(featureGates featuregate.FeatureGate) { + p.enabled = featureGates.Enabled(features.PodTopologyLabelsAdmission) + p.inspectedFeatureGates = true +} + +func (p *Plugin) SetExternalKubeInformerFactory(factory informers.SharedInformerFactory) { + nodeInformer := factory.Core().V1().Nodes() + p.nodeLister = nodeInformer.Lister() + p.SetReadyFunc(nodeInformer.Informer().HasSynced) +} + +func (p *Plugin) ValidateInitialization() error { + if p.nodeLister == nil { + return fmt.Errorf("nodeLister not set") + } + if !p.inspectedFeatureGates { + return fmt.Errorf("feature gates not inspected") + } + return nil +} + +func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) { + if !p.enabled { + return nil + } + if shouldIgnore(a) { + return nil + } + // we need to wait for our caches to warm + if !p.WaitForReady() { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } + + binding := a.GetObject().(*api.Binding) + // other fields are not set by the default scheduler for the binding target, so only check the Kind. + if binding.Target.Kind != "Node" { + klog.V(6).Info("Skipping Pod being bound to non-Node object type", "target", binding.Target.GroupVersionKind()) + return nil + } + + node, err := p.nodeLister.Get(binding.Target.Name) + if err != nil { + // Ignore NotFound errors to avoid risking breaking compatibility/behaviour. + if apierrors.IsNotFound(err) { + return nil + } + return err + } + + // fast-path/short circuit if the node has no labels + if node.Labels == nil { + return nil + } + + labelsToCopy := make(map[string]string) + for k, v := range node.Labels { + if !p.isTopologyLabel(k) { + continue + } + labelsToCopy[k] = v + } + + if len(labelsToCopy) == 0 { + // fast-path/short circuit if the node has no topology labels + return nil + } + + // copy the topology labels into the Binding's labels, as these are copied from the Binding + // to the Pod object being bound within the podBinding registry/store. + if binding.Labels == nil { + binding.Labels = make(map[string]string) + } + for k, v := range labelsToCopy { + if _, exists := binding.Labels[k]; exists { + // Don't overwrite labels on Binding resources as this could lead to unexpected + // behaviour if any schedulers rely on being able to explicitly set values themselves. + continue + } + binding.Labels[k] = v + } + + return nil +} + +func (p *Plugin) isTopologyLabel(key string) bool { + // First check explicit label keys. + if p.labels.Has(key) { + return true + } + // Check the domain portion of the label key, if present + domain, _, hasDomain := strings.Cut(key, "/") + if !hasDomain { + // fast-path if there is no / separator + return false + } + if p.domains.Has(domain) { + // check for explicit domains to copy + return true + } + for _, suffix := range p.suffixes.UnsortedList() { + // check if the domain has one of the suffixes that are to be copied + if strings.HasSuffix(domain, suffix) { + return true + } + } + return false +} + +func shouldIgnore(a admission.Attributes) bool { + resource := a.GetResource().GroupResource() + if resource != api.Resource("pods") { + return true + } + if a.GetSubresource() != "binding" { + // only run the checks below on the binding subresource + return true + } + + obj := a.GetObject() + _, ok := obj.(*api.Binding) + if !ok { + klog.Errorf("expected Binding but got %s", a.GetKind().Kind) + return true + } + + return false +} diff --git a/plugin/pkg/admission/podtopologylabels/admission_test.go b/plugin/pkg/admission/podtopologylabels/admission_test.go new file mode 100644 index 00000000000..ca0ecefb62e --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/admission_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologylabels + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + corev1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apiserver/pkg/admission" + genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" + admissiontesting "k8s.io/apiserver/pkg/admission/testing" + "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" + api "k8s.io/kubernetes/pkg/apis/core" + kubefeatures "k8s.io/kubernetes/pkg/features" +) + +// TestPodTopology verifies the pod topology admission plugin works as expected. +func TestPodTopology(t *testing.T) { + tests := []struct { + name string // name of the test case. + bindingTarget *api.ObjectReference // target being bound to. Defaults to a valid node with the provided labels. + targetNodeLabels map[string]string // list of labels set on the node being bound to. + existingBindingLabels map[string]string // list of labels that are set on the Binding prior to admission (aka by the client/scheduler) + expectedBindingLabels map[string]string // list of labels that we expect to be set on the Binding after admission. + featureDisabled bool // configure whether the SetPodTopologyLabels feature gate should be disabled. + }{ + { + name: "copies topology.k8s.io/zone and region labels to binding annotations", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + "non-topology.k8s.io/label": "something", // verify we don't unexpectedly copy non topology.k8s.io labels. + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + }, + }, + { + name: "copies arbitrary topology labels", + targetNodeLabels: map[string]string{ + "topology.k8s.io/arbitrary": "something", + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/arbitrary": "something", + }, + }, + { + name: "copies topology labels that use a subdomain", + targetNodeLabels: map[string]string{ + "something.topology.k8s.io/a-thing": "value", + }, + expectedBindingLabels: map[string]string{ + "something.topology.k8s.io/a-thing": "value", + }, + }, + { + name: "does not copy label keys that don't contain a / character", + targetNodeLabels: map[string]string{ + "topology.k8s.io": "value", + }, + existingBindingLabels: map[string]string{}, + }, + { + name: "does not overwrite existing topology labels on Binding objects", + existingBindingLabels: map[string]string{ + "topology.k8s.io/zone": "oldValue", + }, + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "newValue", + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/zone": "oldValue", + }, + }, + { + name: "does nothing if the SetPodTopologyLabels feature gate is disabled", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + }, + expectedBindingLabels: map[string]string{}, + featureDisabled: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ns"}, + } + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "valid-test-node", + Labels: test.targetNodeLabels, + }, + } + // Pod we bind during test cases. + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name}, + Spec: corev1.PodSpec{}, + } + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, feature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, + !test.featureDisabled) + mockClient := fake.NewSimpleClientset(namespace, node, pod) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + target := test.bindingTarget + if target == nil { + target = &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + } + } + binding := &api.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + Labels: test.existingBindingLabels, + }, + Target: *target, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + updatedBindingLabels := binding.Labels + if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels)) + } + }) + } +} + +// newHandlerForTest returns the admission controller configured for testing. +func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInformerFactory, error) { + factory := informers.NewSharedInformerFactory(c, 5*time.Minute) + handler := NewPodTopologyPlugin() + pluginInitializer := genericadmissioninitializer.New(c, nil, factory, nil, feature.DefaultFeatureGate, nil, nil) + pluginInitializer.Initialize(handler) + return handler, factory, admission.ValidateInitialization(handler) +} diff --git a/plugin/pkg/admission/podtopologylabels/doc.go b/plugin/pkg/admission/podtopologylabels/doc.go new file mode 100644 index 00000000000..99271759ddf --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/doc.go @@ -0,0 +1,23 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package podtopologylabels is a plugin that mutates `pod/binding` requests +// to set `topology.k8s.io` labels (including subdomains) from the Node object +// referenced in the Binding to the Binding, which causes the Pod to also +// have these values set. +// If the binding target is NOT a Node object, no action is taken. +// If the referenced Node object does not exist, no action is taken. +package podtopologylabels // import "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index dd0ad06e1fc..658daa9c738 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1075,6 +1075,12 @@ lockToDefault: true preRelease: GA version: "1.30" +- name: PodTopologyLabelsAdmission + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.33" - name: PortForwardWebsockets versionedSpecs: - default: false diff --git a/test/integration/pods/pods_test.go b/test/integration/pods/pods_test.go index c02d3890d6d..33b23235f85 100644 --- a/test/integration/pods/pods_test.go +++ b/test/integration/pods/pods_test.go @@ -22,11 +22,15 @@ import ( "strings" "testing" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/version" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -40,6 +44,124 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) +func TestPodTopologyLabels(t *testing.T) { + tests := []podTopologyTestCase{ + { + name: "zone and region topology labels copied from assigned Node", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + }, + } + // Enable the feature BEFORE starting the test server, as the admission plugin only checks feature gates + // on start up and not on each invocation at runtime. + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, true) + testPodTopologyLabels(t, tests) +} + +func TestPodTopologyLabels_FeatureDisabled(t *testing.T) { + tests := []podTopologyTestCase{ + { + name: "does nothing when the feature is not enabled", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{}, + }, + } + // Disable the feature BEFORE starting the test server, as the admission plugin only checks feature gates + // on start up and not on each invocation at runtime. + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, false) + testPodTopologyLabels(t, tests) +} + +// podTopologyTestCase is defined outside of TestPodTopologyLabels to allow us to re-use the test implementation logic +// between the feature enabled and feature disabled tests. +// This will no longer be required once the feature gate graduates to GA/locked to being enabled. +type podTopologyTestCase struct { + name string + targetNodeLabels map[string]string + expectedPodLabels map[string]string +} + +func testPodTopologyLabels(t *testing.T, tests []podTopologyTestCase) { + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + client := clientset.NewForConfigOrDie(server.ClientConfig) + ns := framework.CreateNamespaceOrDie(client, "pod-topology-labels", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + prototypePod := func() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-topology-test-", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + } + } + prototypeNode := func() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "podtopology-test-node-", + }, + } + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Create the Node we are going to bind to. + node := prototypeNode() + // Set the labels on the Node we are going to create. + node.Labels = test.targetNodeLabels + ctx := context.Background() + + var err error + if node, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create node: %v", err) + } + + pod := prototypePod() + if pod, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create pod: %v", err) + } + + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, + Target: v1.ObjectReference{ + Kind: "Node", + Name: node.Name, + }, + } + if err := client.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to bind pod to node: %v", err) + } + + if pod, err = client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil { + t.Errorf("Failed to fetch bound Pod: %v", err) + } + + if !apiequality.Semantic.DeepEqual(pod.Labels, test.expectedPodLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(pod.Labels, test.expectedPodLabels)) + } + }) + } +} + func TestPodUpdateActiveDeadlineSeconds(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())