diff --git a/pkg/controlplane/apiserver/samples/generic/server/admission_test.go b/pkg/controlplane/apiserver/samples/generic/server/admission_test.go index b85423d7c7f..a16a257840a 100644 --- a/pkg/controlplane/apiserver/samples/generic/server/admission_test.go +++ b/pkg/controlplane/apiserver/samples/generic/server/admission_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/limitranger" "k8s.io/kubernetes/plugin/pkg/admission/network/defaultingressclass" "k8s.io/kubernetes/plugin/pkg/admission/nodetaint" + "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/runtimeclass" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" @@ -42,6 +43,7 @@ var intentionallyOffPlugins = sets.New[string]( runtimeclass.PluginName, // RuntimeClass defaultingressclass.PluginName, // DefaultIngressClass podsecurity.PluginName, // PodSecurity + podtopologylabels.PluginName, // PodTopologyLabels ) func TestDefaultOffAdmissionPlugins(t *testing.T) { diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index ec9a8d2fd68..ace72de0fea 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -964,6 +964,20 @@ const ( // restore the old behavior. Please file issues if you hit issues and have to use this Feature Gate. // The Feature Gate will be locked to true and then removed in +2 releases (1.35) if there are no bug reported DisableCPUQuotaWithExclusiveCPUs featuregate.Feature = "DisableCPUQuotaWithExclusiveCPUs" + + // owner: @munnerz + // kep: https://kep.k8s.io/4742 + // alpha: v1.33 + // + // Enables the PodTopologyLabelsAdmission admission plugin that mutates `pod/binding` + // requests by copying the `topology.k8s.io/{zone,region}` labels from the assigned + // Node object (in the Binding being admitted) onto the Binding + // so that it can be persisted onto the Pod object when the Pod is being scheduled. + // This allows workloads running in pods to understand the topology information of their assigned node. + // Enabling this feature also permits external schedulers to set labels on pods in an atomic + // operation when scheduling a Pod by setting the `metadata.labels` field on the submitted Binding, + // similar to how `metadata.annotations` behaves. + PodTopologyLabelsAdmission featuregate.Feature = "PodTopologyLabelsAdmission" ) // defaultVersionedKubernetesFeatureGates consists of all known Kubernetes-specific feature keys with VersionedSpecs. @@ -1571,6 +1585,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate {Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.30; remove in 1.32 }, + PodTopologyLabelsAdmission: { + {Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha}, + }, + PortForwardWebsockets: { {Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha}, {Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta}, diff --git a/pkg/kubeapiserver/options/plugins.go b/pkg/kubeapiserver/options/plugins.go index e9f8c41b814..0dccb55e2a4 100644 --- a/pkg/kubeapiserver/options/plugins.go +++ b/pkg/kubeapiserver/options/plugins.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/plugin/pkg/admission/nodetaint" "k8s.io/kubernetes/plugin/pkg/admission/podnodeselector" "k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction" + "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority" "k8s.io/kubernetes/plugin/pkg/admission/runtimeclass" "k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity" @@ -93,6 +94,7 @@ var AllOrderedPlugins = []string{ certsubjectrestriction.PluginName, // CertificateSubjectRestriction defaultingressclass.PluginName, // DefaultIngressClass denyserviceexternalips.PluginName, // DenyServiceExternalIPs + podtopologylabels.PluginName, // PodTopologyLabels // new admission plugins should generally be inserted above here // webhook, resourcequota, and deny plugins must go at the end @@ -138,6 +140,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) { certsigning.Register(plugins) ctbattest.Register(plugins) certsubjectrestriction.Register(plugins) + podtopologylabels.Register(plugins) } // DefaultOffAdmissionPlugins get admission plugins off by default for kube-apiserver. @@ -162,6 +165,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] { certsubjectrestriction.PluginName, // CertificateSubjectRestriction defaultingressclass.PluginName, // DefaultIngressClass podsecurity.PluginName, // PodSecurity + podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled. mutatingadmissionpolicy.PluginName, // Mutatingadmissionpolicy, only active when feature gate MutatingAdmissionpolicy is enabled validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled ) diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 8aaa358d085..7323efe5faa 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -33,10 +33,12 @@ import ( "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" "k8s.io/apiserver/pkg/util/dryrun" + utilfeature "k8s.io/apiserver/pkg/util/feature" policyclient "k8s.io/client-go/kubernetes/typed/policy/v1" podutil "k8s.io/kubernetes/pkg/api/pod" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/apis/core/validation" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/printers" printersinternal "k8s.io/kubernetes/pkg/printers/internalversion" @@ -191,7 +193,7 @@ func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Objec } } - err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun)) + err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, binding.Labels, dryrun.IsDryRun(options.DryRun)) out = &metav1.Status{Status: metav1.StatusSuccess} return } @@ -203,10 +205,10 @@ func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() return true } -// setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if -// the pod is unassigned and merges the provided annotations with those of the pod. +// setPodNodeAndMetadata sets the given pod's nodeName to 'machine' if and only if +// the pod is unassigned, and merges the provided annotations and labels with those of the pod. // Returns the current state of the pod, or an error. -func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) { +func (r *BindingREST) setPodNodeAndMetadata(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey, err := r.store.KeyFunc(ctx, podID) if err != nil { return nil, err @@ -245,6 +247,11 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types for k, v := range annotations { pod.Annotations[k] = v } + // Copy all labels from the Binding over to the Pod object, overwriting + // any existing labels set on the Pod. + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodTopologyLabelsAdmission) { + copyLabelsWithOverwriting(pod, labels) + } podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{ Type: api.PodScheduled, Status: api.ConditionTrue, @@ -255,9 +262,23 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types return finalPod, err } +func copyLabelsWithOverwriting(pod *api.Pod, labels map[string]string) { + if len(labels) == 0 { + // nothing to do + return + } + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + // Iterate over the binding's labels and copy them across to the Pod. + for k, v := range labels { + pod.Labels[k] = v + } +} + // assignPod assigns the given pod to the given machine. -func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) { - if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, dryRun); err != nil { +func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) { + if _, err = r.setPodNodeAndMetadata(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { err = storeerr.InterpretGetError(err, api.Resource("pods"), podID) err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID) if _, ok := err.(*errors.StatusError); !ok { diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index 39177e5ff96..a2a0ac6c0a1 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -42,8 +42,11 @@ import ( apiserverstorage "k8s.io/apiserver/pkg/storage" storeerr "k8s.io/apiserver/pkg/storage/errors" etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + utilfeature "k8s.io/apiserver/pkg/util/feature" + featuregatetesting "k8s.io/component-base/featuregate/testing" podtest "k8s.io/kubernetes/pkg/api/pod/testing" api "k8s.io/kubernetes/pkg/apis/core" + kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/registrytest" "k8s.io/kubernetes/pkg/securitycontext" ) @@ -676,12 +679,14 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, true) // Suddenly, a wild scheduler appears: _, err = bindingStorage.Create(ctx, "foo", &api.Binding{ ObjectMeta: metav1.ObjectMeta{ Namespace: metav1.NamespaceDefault, Name: "foo", - Annotations: map[string]string{"label1": "value1"}, + Annotations: map[string]string{"annotation1": "value1"}, + Labels: map[string]string{"label1": "label-value1"}, }, Target: api.ObjectReference{Name: "machine"}, }, rest.ValidateAllObjectFunc, &metav1.CreateOptions{}) @@ -695,9 +700,12 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { } pod := obj.(*api.Pod) - if !(pod.Annotations != nil && pod.Annotations["label1"] == "value1") { + if !(pod.Annotations != nil && pod.Annotations["annotation1"] == "value1") { t.Fatalf("Pod annotations don't match the expected: %v", pod.Annotations) } + if !(pod.Labels != nil && pod.Labels["label1"] == "label-value1") { + t.Fatalf("Pod labels don't match the expected: %v", pod.Labels) + } } func TestEtcdCreateWithConflict(t *testing.T) { diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go new file mode 100644 index 00000000000..3b010df6fe6 --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -0,0 +1,243 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologylabels + +import ( + "context" + "fmt" + "io" + + "k8s.io/klog/v2" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/admission" + genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" + "k8s.io/client-go/informers" + corev1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/component-base/featuregate" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" +) + +// PluginName is a string with the name of the plugin +const PluginName = "PodTopologyLabels" + +// defaultConfig is the configuration used for the default instantiation of the plugin. +// This configuration is used by kube-apiserver. +// It is not exported to avoid any chance of accidentally mutating the variable. +var defaultConfig = Config{ + Labels: []string{"topology.k8s.io/zone", "topology.k8s.io/region"}, +} + +// Register registers a plugin +func Register(plugins *admission.Plugins) { + plugins.Register(PluginName, func(_ io.Reader) (admission.Interface, error) { + plugin := NewPodTopologyPlugin(defaultConfig) + return plugin, nil + }) +} + +// Config contains configuration for instances of the podtopologylabels admission plugin. +// This is not exposed as user-facing APIServer configuration, however can be used by +// platform operators when building custom topology label plugins. +type Config struct { + // Labels is set of explicit label keys to be copied from the Node object onto + // pod Binding objects during admission. + Labels []string +} + +// NewPodTopologyPlugin initializes a Plugin +func NewPodTopologyPlugin(c Config) *Plugin { + return &Plugin{ + Handler: admission.NewHandler(admission.Create), + labels: sets.New(c.Labels...), + } +} + +type Plugin struct { + *admission.Handler + + nodeLister corev1listers.NodeLister + + // explicit labels to be copied to Pod objects being bound. + labels sets.Set[string] + + enabled, inspectedFeatureGates bool +} + +var _ admission.MutationInterface = &Plugin{} +var _ genericadmissioninitializer.WantsExternalKubeInformerFactory = &Plugin{} +var _ genericadmissioninitializer.WantsFeatures = &Plugin{} + +// InspectFeatureGates implements WantsFeatures. +func (p *Plugin) InspectFeatureGates(featureGates featuregate.FeatureGate) { + p.enabled = featureGates.Enabled(features.PodTopologyLabelsAdmission) + p.inspectedFeatureGates = true +} + +func (p *Plugin) SetExternalKubeInformerFactory(factory informers.SharedInformerFactory) { + nodeInformer := factory.Core().V1().Nodes() + p.nodeLister = nodeInformer.Lister() + p.SetReadyFunc(nodeInformer.Informer().HasSynced) +} + +func (p *Plugin) ValidateInitialization() error { + if p.nodeLister == nil { + return fmt.Errorf("nodeLister not set") + } + if !p.inspectedFeatureGates { + return fmt.Errorf("feature gates not inspected") + } + return nil +} + +func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) { + if !p.enabled { + return nil + } + // check whether the request is for a Binding or a Pod spec update. + shouldAdmit, doAdmit, err := p.shouldAdmit(a) + if !shouldAdmit || err != nil { + // error is either nil and admit == false, or err is non-nil and should be returned. + return err + } + // we need to wait for our caches to warm + if !p.WaitForReady() { + return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) + } + // run type specific admission + return doAdmit(ctx, a, o) +} + +func (p *Plugin) admitBinding(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { + binding := a.GetObject().(*api.Binding) + // other fields are not set by the default scheduler for the binding target, so only check the Kind. + if binding.Target.Kind != "Node" { + klog.V(6).Info("Skipping Pod being bound to non-Node object type", "target", binding.Target.GroupVersionKind()) + return nil + } + + labelsToCopy, err := p.topologyLabelsForNodeName(binding.Target.Name) + if err != nil { + return err + } + if len(labelsToCopy) == 0 { + // fast-path/short circuit if the node has no topology labels + return nil + } + + // copy the topology labels into the Binding's labels, as these are copied from the Binding + // to the Pod object being bound within the podBinding registry/store. + if binding.Labels == nil { + binding.Labels = make(map[string]string) + } + mergeLabels(binding.Labels, labelsToCopy) + + return nil +} + +func (p *Plugin) admitPod(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { + pod := a.GetObject().(*api.Pod) + if pod.Spec.NodeName == "" { + // pod has not been scheduled yet + return nil + } + + // Determine the topology labels from the assigned node to be copied. + labelsToCopy, err := p.topologyLabelsForNodeName(pod.Spec.NodeName) + if err != nil { + return err + } + if len(labelsToCopy) == 0 { + // fast-path/short circuit if the node has no topology labels + return nil + } + + // copy the topology labels into the Pod's labels. + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + // overwrite any existing labels on the pod + mergeLabels(pod.Labels, labelsToCopy) + + return nil +} + +func (p *Plugin) topologyLabelsForNodeName(nodeName string) (map[string]string, error) { + labels := make(map[string]string) + node, err := p.nodeLister.Get(nodeName) + if err != nil { + // Ignore NotFound errors to avoid risking breaking compatibility/behaviour. + if apierrors.IsNotFound(err) { + return labels, nil + } + return nil, err + } + + for k, v := range node.Labels { + if !p.isTopologyLabel(k) { + continue + } + labels[k] = v + } + + return labels, nil +} + +// mergeLabels merges new into existing, overwriting existing keys. +func mergeLabels(existing, new map[string]string) { + for k, v := range new { + existing[k] = v + } +} + +func (p *Plugin) isTopologyLabel(key string) bool { + // First check explicit label keys. + if p.labels.Has(key) { + return true + } + return false +} + +type admitFunc func(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) + +// shouldAdmit inspects the provided adminssion attributes to determine whether the request +// requires admittance through this plugin. +func (p *Plugin) shouldAdmit(a admission.Attributes) (bool, admitFunc, error) { + if a.GetResource().GroupResource() != api.Resource("pods") { + return false, nil, nil + } + + switch a.GetSubresource() { + case "": // regular Pod endpoint + _, ok := a.GetObject().(*api.Pod) + if !ok { + return false, nil, fmt.Errorf("expected Pod but got %T", a.GetObject()) + } + return true, p.admitPod, nil + case "binding": + _, ok := a.GetObject().(*api.Binding) + if !ok { + return false, nil, fmt.Errorf("expected Binding but got %s", a.GetKind().Kind) + } + return true, p.admitBinding, nil + default: + // Ignore all other sub-resources. + return false, nil, nil + } +} diff --git a/plugin/pkg/admission/podtopologylabels/admission_test.go b/plugin/pkg/admission/podtopologylabels/admission_test.go new file mode 100644 index 00000000000..956c2a934c2 --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/admission_test.go @@ -0,0 +1,232 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package podtopologylabels + +import ( + "context" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + + corev1 "k8s.io/api/core/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/admission" + genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" + admissiontesting "k8s.io/apiserver/pkg/admission/testing" + "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/fake" + featuregatetesting "k8s.io/component-base/featuregate/testing" + api "k8s.io/kubernetes/pkg/apis/core" + kubefeatures "k8s.io/kubernetes/pkg/features" +) + +// TestPodTopology verifies the pod topology admission plugin works as expected. +func TestPodTopology(t *testing.T) { + tests := []struct { + name string // name of the test case. + bindingTarget *api.ObjectReference // target being bound to. Defaults to a valid node with the provided labels. + targetNodeLabels map[string]string // list of labels set on the node being bound to. + existingBindingLabels map[string]string // list of labels that are set on the Binding prior to admission (aka by the client/scheduler) + expectedBindingLabels map[string]string // list of labels that we expect to be set on the Binding after admission. + featureDisabled bool // configure whether the SetPodTopologyLabels feature gate should be disabled. + }{ + { + name: "copies topology.k8s.io/zone and region labels to binding labels", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + "topology.k8s.io/arbitrary": "something", + "non-topology.k8s.io/label": "something", // verify we don't unexpectedly copy non topology.k8s.io labels. + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + }, + }, + { + name: "does not copy arbitrary topology labels", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/arbitrary": "something", + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + }, + }, + { + name: "does not copy topology labels that use a subdomain", + targetNodeLabels: map[string]string{ + "topology.k8s.io/region": "region1", + "sub.topology.k8s.io/zone": "value", + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/region": "region1", + }, + }, + { + name: "does not copy label keys that don't contain a / character", + targetNodeLabels: map[string]string{ + "topology.k8s.io": "value", + }, + existingBindingLabels: map[string]string{}, + }, + { + name: "overwrites existing topology labels", + existingBindingLabels: map[string]string{ + "topology.k8s.io/zone": "oldValue", + }, + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "newValue", + }, + expectedBindingLabels: map[string]string{ + "topology.k8s.io/zone": "newValue", + }, + }, + { + name: "does nothing if the SetPodTopologyLabels feature gate is disabled", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone1", + "topology.k8s.io/region": "region1", + }, + expectedBindingLabels: map[string]string{}, + featureDisabled: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + namespace := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: "test-ns"}, + } + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "valid-test-node", + Labels: test.targetNodeLabels, + }, + } + + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, !test.featureDisabled) + + t.Run("using Pod directly (update)", func(t *testing.T) { + // set up the client and informers + mockClient := fake.NewSimpleClientset(namespace, node) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + oldPod := &api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels}, + Spec: api.PodSpec{}, + } + pod := oldPod.DeepCopy() + pod.Spec.NodeName = node.Name + + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(pod, oldPod, + api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, + api.Resource("pods").WithVersion("version"), "", admission.Update, &metav1.UpdateOptions{}, + false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + }) + + t.Run("using Pod directly (create)", func(t *testing.T) { + // set up the client and informers + mockClient := fake.NewSimpleClientset(namespace, node) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + pod := &api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels}, + Spec: api.PodSpec{ + NodeName: node.Name, + }, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(pod, nil, + api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, + api.Resource("pods").WithVersion("version"), "", admission.Create, &metav1.UpdateOptions{}, + false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + }) + + t.Run("using Binding subresource", func(t *testing.T) { + // Pod we bind during test cases. + existingPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name}, + Spec: corev1.PodSpec{}, + } + mockClient := fake.NewSimpleClientset(namespace, node, existingPod) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + // create and submit a Binding object + target := test.bindingTarget + if target == nil { + target = &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + } + } + binding := &api.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: existingPod.Name, + Namespace: existingPod.Namespace, + Labels: test.existingBindingLabels, + }, + Target: *target, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), existingPod.Namespace, existingPod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + updatedBindingLabels := binding.Labels + if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels)) + } + }) + }) + } +} + +// newHandlerForTest returns the admission controller configured for testing. +func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInformerFactory, error) { + factory := informers.NewSharedInformerFactory(c, 5*time.Minute) + handler := NewPodTopologyPlugin(defaultConfig) // todo: write additional test cases with non-default config. + pluginInitializer := genericadmissioninitializer.New(c, nil, factory, nil, feature.DefaultFeatureGate, nil, nil) + pluginInitializer.Initialize(handler) + return handler, factory, admission.ValidateInitialization(handler) +} diff --git a/plugin/pkg/admission/podtopologylabels/doc.go b/plugin/pkg/admission/podtopologylabels/doc.go new file mode 100644 index 00000000000..56131f0b70a --- /dev/null +++ b/plugin/pkg/admission/podtopologylabels/doc.go @@ -0,0 +1,25 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package podtopologylabels is a plugin that mutates `pod/binding` requests +// by copying the `topology.k8s.io/{zone,region}` labels from the assigned Node +// object (in the Binding being admitted) onto the Binding so that it can be +// persisted onto the Pod object when the Pod is being scheduled. +// Requests for the regular `pods` resource that set the `spec.nodeName` will +// also trigger the plugin to copy the labels as described. +// If the binding target is NOT a Node object, no action is taken. +// If the referenced Node object does not exist, no action is taken. +package podtopologylabels // import "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels" diff --git a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml index dd0ad06e1fc..658daa9c738 100644 --- a/test/compatibility_lifecycle/reference/versioned_feature_list.yaml +++ b/test/compatibility_lifecycle/reference/versioned_feature_list.yaml @@ -1075,6 +1075,12 @@ lockToDefault: true preRelease: GA version: "1.30" +- name: PodTopologyLabelsAdmission + versionedSpecs: + - default: false + lockToDefault: false + preRelease: Alpha + version: "1.33" - name: PortForwardWebsockets versionedSpecs: - default: false diff --git a/test/integration/pods/pods_test.go b/test/integration/pods/pods_test.go index c02d3890d6d..bc77f0b04aa 100644 --- a/test/integration/pods/pods_test.go +++ b/test/integration/pods/pods_test.go @@ -22,11 +22,15 @@ import ( "strings" "testing" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/version" utilfeature "k8s.io/apiserver/pkg/util/feature" clientset "k8s.io/client-go/kubernetes" typedv1 "k8s.io/client-go/kubernetes/typed/core/v1" @@ -40,6 +44,168 @@ import ( "k8s.io/kubernetes/test/integration/framework" ) +func TestPodTopologyLabels(t *testing.T) { + tests := []podTopologyTestCase{ + { + name: "zone and region topology labels copied from assigned Node", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + }, + { + name: "subdomains of topology.k8s.io are not copied", + targetNodeLabels: map[string]string{ + "sub.topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/region": "region", + }, + }, + { + name: "custom topology.k8s.io labels are not copied", + targetNodeLabels: map[string]string{ + "topology.k8s.io/custom": "thing", + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + }, + }, + { + name: "labels from Bindings overwriting existing labels on Pod", + existingPodLabels: map[string]string{ + "topology.k8s.io/zone": "bad-zone", + "topology.k8s.io/region": "bad-region", + "topology.k8s.io/abc": "123", + }, + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + "topology.k8s.io/abc": "456", // this label isn't in (zone, region) so isn't copied + }, + expectedPodLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + "topology.k8s.io/abc": "123", + }, + }, + } + // Enable the feature BEFORE starting the test server, as the admission plugin only checks feature gates + // on start up and not on each invocation at runtime. + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, true) + testPodTopologyLabels(t, tests) +} + +func TestPodTopologyLabels_FeatureDisabled(t *testing.T) { + tests := []podTopologyTestCase{ + { + name: "does nothing when the feature is not enabled", + targetNodeLabels: map[string]string{ + "topology.k8s.io/zone": "zone", + "topology.k8s.io/region": "region", + "topology.k8s.io/custom": "thing", + "sub.topology.k8s.io/zone": "zone", + }, + expectedPodLabels: map[string]string{}, + }, + } + // Disable the feature BEFORE starting the test server, as the admission plugin only checks feature gates + // on start up and not on each invocation at runtime. + featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) + featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, false) + testPodTopologyLabels(t, tests) +} + +// podTopologyTestCase is defined outside of TestPodTopologyLabels to allow us to re-use the test implementation logic +// between the feature enabled and feature disabled tests. +// This will no longer be required once the feature gate graduates to GA/locked to being enabled. +type podTopologyTestCase struct { + name string + targetNodeLabels map[string]string + existingPodLabels map[string]string + expectedPodLabels map[string]string +} + +func testPodTopologyLabels(t *testing.T, tests []podTopologyTestCase) { + server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd()) + defer server.TearDownFn() + client := clientset.NewForConfigOrDie(server.ClientConfig) + ns := framework.CreateNamespaceOrDie(client, "pod-topology-labels", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + prototypePod := func() *v1.Pod { + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "pod-topology-test-", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fake-name", + Image: "fakeimage", + }, + }, + }, + } + } + prototypeNode := func() *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "podtopology-test-node-", + }, + } + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Create the Node we are going to bind to. + node := prototypeNode() + // Set the labels on the Node we are going to create. + node.Labels = test.targetNodeLabels + ctx := context.Background() + + var err error + if node, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create node: %v", err) + } + + pod := prototypePod() + pod.Labels = test.existingPodLabels + if pod, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to create pod: %v", err) + } + + binding := &v1.Binding{ + ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace}, + Target: v1.ObjectReference{ + Kind: "Node", + Name: node.Name, + }, + } + if err := client.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil { + t.Errorf("Failed to bind pod to node: %v", err) + } + + if pod, err = client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil { + t.Errorf("Failed to fetch bound Pod: %v", err) + } + + if !apiequality.Semantic.DeepEqual(pod.Labels, test.expectedPodLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(pod.Labels, test.expectedPodLabels)) + } + }) + } +} + func TestPodUpdateActiveDeadlineSeconds(t *testing.T) { // Disable ServiceAccount admission plugin as we don't have serviceaccount controller running. server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())