Merge pull request #127092 from munnerz/pod-topology

KEP-4742: Copy topology labels from Node objects to Pods upon binding/scheduling
This commit is contained in:
Kubernetes Prow Robot 2025-03-20 18:18:30 -07:00 committed by GitHub
commit 507eee87e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 733 additions and 8 deletions

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/admission/limitranger"
"k8s.io/kubernetes/plugin/pkg/admission/network/defaultingressclass"
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"
podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority"
"k8s.io/kubernetes/plugin/pkg/admission/runtimeclass"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
@ -42,6 +43,7 @@ var intentionallyOffPlugins = sets.New[string](
runtimeclass.PluginName, // RuntimeClass
defaultingressclass.PluginName, // DefaultIngressClass
podsecurity.PluginName, // PodSecurity
podtopologylabels.PluginName, // PodTopologyLabels
)
func TestDefaultOffAdmissionPlugins(t *testing.T) {

View File

@ -964,6 +964,20 @@ const (
// restore the old behavior. Please file issues if you hit issues and have to use this Feature Gate.
// The Feature Gate will be locked to true and then removed in +2 releases (1.35) if there are no bug reported
DisableCPUQuotaWithExclusiveCPUs featuregate.Feature = "DisableCPUQuotaWithExclusiveCPUs"
// owner: @munnerz
// kep: https://kep.k8s.io/4742
// alpha: v1.33
//
// Enables the PodTopologyLabelsAdmission admission plugin that mutates `pod/binding`
// requests by copying the `topology.k8s.io/{zone,region}` labels from the assigned
// Node object (in the Binding being admitted) onto the Binding
// so that it can be persisted onto the Pod object when the Pod is being scheduled.
// This allows workloads running in pods to understand the topology information of their assigned node.
// Enabling this feature also permits external schedulers to set labels on pods in an atomic
// operation when scheduling a Pod by setting the `metadata.labels` field on the submitted Binding,
// similar to how `metadata.annotations` behaves.
PodTopologyLabelsAdmission featuregate.Feature = "PodTopologyLabelsAdmission"
)
// defaultVersionedKubernetesFeatureGates consists of all known Kubernetes-specific feature keys with VersionedSpecs.
@ -1571,6 +1585,10 @@ var defaultVersionedKubernetesFeatureGates = map[featuregate.Feature]featuregate
{Version: version.MustParse("1.30"), Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.30; remove in 1.32
},
PodTopologyLabelsAdmission: {
{Version: version.MustParse("1.33"), Default: false, PreRelease: featuregate.Alpha},
},
PortForwardWebsockets: {
{Version: version.MustParse("1.30"), Default: false, PreRelease: featuregate.Alpha},
{Version: version.MustParse("1.31"), Default: true, PreRelease: featuregate.Beta},

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/admission/nodetaint"
"k8s.io/kubernetes/plugin/pkg/admission/podnodeselector"
"k8s.io/kubernetes/plugin/pkg/admission/podtolerationrestriction"
"k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"
podpriority "k8s.io/kubernetes/plugin/pkg/admission/priority"
"k8s.io/kubernetes/plugin/pkg/admission/runtimeclass"
"k8s.io/kubernetes/plugin/pkg/admission/security/podsecurity"
@ -93,6 +94,7 @@ var AllOrderedPlugins = []string{
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
defaultingressclass.PluginName, // DefaultIngressClass
denyserviceexternalips.PluginName, // DenyServiceExternalIPs
podtopologylabels.PluginName, // PodTopologyLabels
// new admission plugins should generally be inserted above here
// webhook, resourcequota, and deny plugins must go at the end
@ -138,6 +140,7 @@ func RegisterAllAdmissionPlugins(plugins *admission.Plugins) {
certsigning.Register(plugins)
ctbattest.Register(plugins)
certsubjectrestriction.Register(plugins)
podtopologylabels.Register(plugins)
}
// DefaultOffAdmissionPlugins get admission plugins off by default for kube-apiserver.
@ -162,6 +165,7 @@ func DefaultOffAdmissionPlugins() sets.Set[string] {
certsubjectrestriction.PluginName, // CertificateSubjectRestriction
defaultingressclass.PluginName, // DefaultIngressClass
podsecurity.PluginName, // PodSecurity
podtopologylabels.PluginName, // PodTopologyLabels, only active when feature gate PodTopologyLabelsAdmission is enabled.
mutatingadmissionpolicy.PluginName, // Mutatingadmissionpolicy, only active when feature gate MutatingAdmissionpolicy is enabled
validatingadmissionpolicy.PluginName, // ValidatingAdmissionPolicy, only active when feature gate ValidatingAdmissionPolicy is enabled
)

View File

@ -33,10 +33,12 @@ import (
"k8s.io/apiserver/pkg/storage"
storeerr "k8s.io/apiserver/pkg/storage/errors"
"k8s.io/apiserver/pkg/util/dryrun"
utilfeature "k8s.io/apiserver/pkg/util/feature"
policyclient "k8s.io/client-go/kubernetes/typed/policy/v1"
podutil "k8s.io/kubernetes/pkg/api/pod"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/apis/core/validation"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/client"
"k8s.io/kubernetes/pkg/printers"
printersinternal "k8s.io/kubernetes/pkg/printers/internalversion"
@ -191,7 +193,7 @@ func (r *BindingREST) Create(ctx context.Context, name string, obj runtime.Objec
}
}
err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, dryrun.IsDryRun(options.DryRun))
err = r.assignPod(ctx, binding.UID, binding.ResourceVersion, binding.Name, binding.Target.Name, binding.Annotations, binding.Labels, dryrun.IsDryRun(options.DryRun))
out = &metav1.Status{Status: metav1.StatusSuccess}
return
}
@ -203,10 +205,10 @@ func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate()
return true
}
// setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if
// the pod is unassigned and merges the provided annotations with those of the pod.
// setPodNodeAndMetadata sets the given pod's nodeName to 'machine' if and only if
// the pod is unassigned, and merges the provided annotations and labels with those of the pod.
// Returns the current state of the pod, or an error.
func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations map[string]string, dryRun bool) (finalPod *api.Pod, err error) {
func (r *BindingREST) setPodNodeAndMetadata(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) {
podKey, err := r.store.KeyFunc(ctx, podID)
if err != nil {
return nil, err
@ -245,6 +247,11 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types
for k, v := range annotations {
pod.Annotations[k] = v
}
// Copy all labels from the Binding over to the Pod object, overwriting
// any existing labels set on the Pod.
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.PodTopologyLabelsAdmission) {
copyLabelsWithOverwriting(pod, labels)
}
podutil.UpdatePodCondition(&pod.Status, &api.PodCondition{
Type: api.PodScheduled,
Status: api.ConditionTrue,
@ -255,9 +262,23 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types
return finalPod, err
}
func copyLabelsWithOverwriting(pod *api.Pod, labels map[string]string) {
if len(labels) == 0 {
// nothing to do
return
}
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
// Iterate over the binding's labels and copy them across to the Pod.
for k, v := range labels {
pod.Labels[k] = v
}
}
// assignPod assigns the given pod to the given machine.
func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations map[string]string, dryRun bool) (err error) {
if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, dryRun); err != nil {
func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) {
if _, err = r.setPodNodeAndMetadata(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil {
err = storeerr.InterpretGetError(err, api.Resource("pods"), podID)
err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID)
if _, ok := err.(*errors.StatusError); !ok {

View File

@ -42,8 +42,11 @@ import (
apiserverstorage "k8s.io/apiserver/pkg/storage"
storeerr "k8s.io/apiserver/pkg/storage/errors"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
utilfeature "k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
podtest "k8s.io/kubernetes/pkg/api/pod/testing"
api "k8s.io/kubernetes/pkg/apis/core"
kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/registry/registrytest"
"k8s.io/kubernetes/pkg/securitycontext"
)
@ -676,12 +679,14 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, true)
// Suddenly, a wild scheduler appears:
_, err = bindingStorage.Create(ctx, "foo", &api.Binding{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
Annotations: map[string]string{"label1": "value1"},
Annotations: map[string]string{"annotation1": "value1"},
Labels: map[string]string{"label1": "label-value1"},
},
Target: api.ObjectReference{Name: "machine"},
}, rest.ValidateAllObjectFunc, &metav1.CreateOptions{})
@ -695,9 +700,12 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) {
}
pod := obj.(*api.Pod)
if !(pod.Annotations != nil && pod.Annotations["label1"] == "value1") {
if !(pod.Annotations != nil && pod.Annotations["annotation1"] == "value1") {
t.Fatalf("Pod annotations don't match the expected: %v", pod.Annotations)
}
if !(pod.Labels != nil && pod.Labels["label1"] == "label-value1") {
t.Fatalf("Pod labels don't match the expected: %v", pod.Labels)
}
}
func TestEtcdCreateWithConflict(t *testing.T) {

View File

@ -0,0 +1,243 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtopologylabels
import (
"context"
"fmt"
"io"
"k8s.io/klog/v2"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/admission"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
"k8s.io/client-go/informers"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/component-base/featuregate"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/features"
)
// PluginName is a string with the name of the plugin
const PluginName = "PodTopologyLabels"
// defaultConfig is the configuration used for the default instantiation of the plugin.
// This configuration is used by kube-apiserver.
// It is not exported to avoid any chance of accidentally mutating the variable.
var defaultConfig = Config{
Labels: []string{"topology.k8s.io/zone", "topology.k8s.io/region"},
}
// Register registers a plugin
func Register(plugins *admission.Plugins) {
plugins.Register(PluginName, func(_ io.Reader) (admission.Interface, error) {
plugin := NewPodTopologyPlugin(defaultConfig)
return plugin, nil
})
}
// Config contains configuration for instances of the podtopologylabels admission plugin.
// This is not exposed as user-facing APIServer configuration, however can be used by
// platform operators when building custom topology label plugins.
type Config struct {
// Labels is set of explicit label keys to be copied from the Node object onto
// pod Binding objects during admission.
Labels []string
}
// NewPodTopologyPlugin initializes a Plugin
func NewPodTopologyPlugin(c Config) *Plugin {
return &Plugin{
Handler: admission.NewHandler(admission.Create),
labels: sets.New(c.Labels...),
}
}
type Plugin struct {
*admission.Handler
nodeLister corev1listers.NodeLister
// explicit labels to be copied to Pod objects being bound.
labels sets.Set[string]
enabled, inspectedFeatureGates bool
}
var _ admission.MutationInterface = &Plugin{}
var _ genericadmissioninitializer.WantsExternalKubeInformerFactory = &Plugin{}
var _ genericadmissioninitializer.WantsFeatures = &Plugin{}
// InspectFeatureGates implements WantsFeatures.
func (p *Plugin) InspectFeatureGates(featureGates featuregate.FeatureGate) {
p.enabled = featureGates.Enabled(features.PodTopologyLabelsAdmission)
p.inspectedFeatureGates = true
}
func (p *Plugin) SetExternalKubeInformerFactory(factory informers.SharedInformerFactory) {
nodeInformer := factory.Core().V1().Nodes()
p.nodeLister = nodeInformer.Lister()
p.SetReadyFunc(nodeInformer.Informer().HasSynced)
}
func (p *Plugin) ValidateInitialization() error {
if p.nodeLister == nil {
return fmt.Errorf("nodeLister not set")
}
if !p.inspectedFeatureGates {
return fmt.Errorf("feature gates not inspected")
}
return nil
}
func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) {
if !p.enabled {
return nil
}
// check whether the request is for a Binding or a Pod spec update.
shouldAdmit, doAdmit, err := p.shouldAdmit(a)
if !shouldAdmit || err != nil {
// error is either nil and admit == false, or err is non-nil and should be returned.
return err
}
// we need to wait for our caches to warm
if !p.WaitForReady() {
return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request"))
}
// run type specific admission
return doAdmit(ctx, a, o)
}
func (p *Plugin) admitBinding(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
binding := a.GetObject().(*api.Binding)
// other fields are not set by the default scheduler for the binding target, so only check the Kind.
if binding.Target.Kind != "Node" {
klog.V(6).Info("Skipping Pod being bound to non-Node object type", "target", binding.Target.GroupVersionKind())
return nil
}
labelsToCopy, err := p.topologyLabelsForNodeName(binding.Target.Name)
if err != nil {
return err
}
if len(labelsToCopy) == 0 {
// fast-path/short circuit if the node has no topology labels
return nil
}
// copy the topology labels into the Binding's labels, as these are copied from the Binding
// to the Pod object being bound within the podBinding registry/store.
if binding.Labels == nil {
binding.Labels = make(map[string]string)
}
mergeLabels(binding.Labels, labelsToCopy)
return nil
}
func (p *Plugin) admitPod(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error {
pod := a.GetObject().(*api.Pod)
if pod.Spec.NodeName == "" {
// pod has not been scheduled yet
return nil
}
// Determine the topology labels from the assigned node to be copied.
labelsToCopy, err := p.topologyLabelsForNodeName(pod.Spec.NodeName)
if err != nil {
return err
}
if len(labelsToCopy) == 0 {
// fast-path/short circuit if the node has no topology labels
return nil
}
// copy the topology labels into the Pod's labels.
if pod.Labels == nil {
pod.Labels = make(map[string]string)
}
// overwrite any existing labels on the pod
mergeLabels(pod.Labels, labelsToCopy)
return nil
}
func (p *Plugin) topologyLabelsForNodeName(nodeName string) (map[string]string, error) {
labels := make(map[string]string)
node, err := p.nodeLister.Get(nodeName)
if err != nil {
// Ignore NotFound errors to avoid risking breaking compatibility/behaviour.
if apierrors.IsNotFound(err) {
return labels, nil
}
return nil, err
}
for k, v := range node.Labels {
if !p.isTopologyLabel(k) {
continue
}
labels[k] = v
}
return labels, nil
}
// mergeLabels merges new into existing, overwriting existing keys.
func mergeLabels(existing, new map[string]string) {
for k, v := range new {
existing[k] = v
}
}
func (p *Plugin) isTopologyLabel(key string) bool {
// First check explicit label keys.
if p.labels.Has(key) {
return true
}
return false
}
type admitFunc func(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error)
// shouldAdmit inspects the provided adminssion attributes to determine whether the request
// requires admittance through this plugin.
func (p *Plugin) shouldAdmit(a admission.Attributes) (bool, admitFunc, error) {
if a.GetResource().GroupResource() != api.Resource("pods") {
return false, nil, nil
}
switch a.GetSubresource() {
case "": // regular Pod endpoint
_, ok := a.GetObject().(*api.Pod)
if !ok {
return false, nil, fmt.Errorf("expected Pod but got %T", a.GetObject())
}
return true, p.admitPod, nil
case "binding":
_, ok := a.GetObject().(*api.Binding)
if !ok {
return false, nil, fmt.Errorf("expected Binding but got %s", a.GetKind().Kind)
}
return true, p.admitBinding, nil
default:
// Ignore all other sub-resources.
return false, nil, nil
}
}

View File

@ -0,0 +1,232 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package podtopologylabels
import (
"context"
"testing"
"time"
"github.com/google/go-cmp/cmp"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/admission"
genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer"
admissiontesting "k8s.io/apiserver/pkg/admission/testing"
"k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
featuregatetesting "k8s.io/component-base/featuregate/testing"
api "k8s.io/kubernetes/pkg/apis/core"
kubefeatures "k8s.io/kubernetes/pkg/features"
)
// TestPodTopology verifies the pod topology admission plugin works as expected.
func TestPodTopology(t *testing.T) {
tests := []struct {
name string // name of the test case.
bindingTarget *api.ObjectReference // target being bound to. Defaults to a valid node with the provided labels.
targetNodeLabels map[string]string // list of labels set on the node being bound to.
existingBindingLabels map[string]string // list of labels that are set on the Binding prior to admission (aka by the client/scheduler)
expectedBindingLabels map[string]string // list of labels that we expect to be set on the Binding after admission.
featureDisabled bool // configure whether the SetPodTopologyLabels feature gate should be disabled.
}{
{
name: "copies topology.k8s.io/zone and region labels to binding labels",
targetNodeLabels: map[string]string{
"topology.k8s.io/zone": "zone1",
"topology.k8s.io/region": "region1",
"topology.k8s.io/arbitrary": "something",
"non-topology.k8s.io/label": "something", // verify we don't unexpectedly copy non topology.k8s.io labels.
},
expectedBindingLabels: map[string]string{
"topology.k8s.io/zone": "zone1",
"topology.k8s.io/region": "region1",
},
},
{
name: "does not copy arbitrary topology labels",
targetNodeLabels: map[string]string{
"topology.k8s.io/zone": "zone1",
"topology.k8s.io/arbitrary": "something",
},
expectedBindingLabels: map[string]string{
"topology.k8s.io/zone": "zone1",
},
},
{
name: "does not copy topology labels that use a subdomain",
targetNodeLabels: map[string]string{
"topology.k8s.io/region": "region1",
"sub.topology.k8s.io/zone": "value",
},
expectedBindingLabels: map[string]string{
"topology.k8s.io/region": "region1",
},
},
{
name: "does not copy label keys that don't contain a / character",
targetNodeLabels: map[string]string{
"topology.k8s.io": "value",
},
existingBindingLabels: map[string]string{},
},
{
name: "overwrites existing topology labels",
existingBindingLabels: map[string]string{
"topology.k8s.io/zone": "oldValue",
},
targetNodeLabels: map[string]string{
"topology.k8s.io/zone": "newValue",
},
expectedBindingLabels: map[string]string{
"topology.k8s.io/zone": "newValue",
},
},
{
name: "does nothing if the SetPodTopologyLabels feature gate is disabled",
targetNodeLabels: map[string]string{
"topology.k8s.io/zone": "zone1",
"topology.k8s.io/region": "region1",
},
expectedBindingLabels: map[string]string{},
featureDisabled: true,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
namespace := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: "test-ns"},
}
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "valid-test-node",
Labels: test.targetNodeLabels,
},
}
featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, !test.featureDisabled)
t.Run("using Pod directly (update)", func(t *testing.T) {
// set up the client and informers
mockClient := fake.NewSimpleClientset(namespace, node)
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Fatalf("unexpected error initializing handler: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
informerFactory.Start(stopCh)
oldPod := &api.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels},
Spec: api.PodSpec{},
}
pod := oldPod.DeepCopy()
pod.Spec.NodeName = node.Name
if err := admissiontesting.WithReinvocationTesting(t, handler).
Admit(context.TODO(), admission.NewAttributesRecord(pod, oldPod,
api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name,
api.Resource("pods").WithVersion("version"), "", admission.Update, &metav1.UpdateOptions{},
false, nil), nil); err != nil {
t.Errorf("failed running admission plugin: %v", err)
}
})
t.Run("using Pod directly (create)", func(t *testing.T) {
// set up the client and informers
mockClient := fake.NewSimpleClientset(namespace, node)
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Fatalf("unexpected error initializing handler: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
informerFactory.Start(stopCh)
pod := &api.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels},
Spec: api.PodSpec{
NodeName: node.Name,
},
}
if err := admissiontesting.WithReinvocationTesting(t, handler).
Admit(context.TODO(), admission.NewAttributesRecord(pod, nil,
api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name,
api.Resource("pods").WithVersion("version"), "", admission.Create, &metav1.UpdateOptions{},
false, nil), nil); err != nil {
t.Errorf("failed running admission plugin: %v", err)
}
})
t.Run("using Binding subresource", func(t *testing.T) {
// Pod we bind during test cases.
existingPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name},
Spec: corev1.PodSpec{},
}
mockClient := fake.NewSimpleClientset(namespace, node, existingPod)
handler, informerFactory, err := newHandlerForTest(mockClient)
if err != nil {
t.Fatalf("unexpected error initializing handler: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
informerFactory.Start(stopCh)
// create and submit a Binding object
target := test.bindingTarget
if target == nil {
target = &api.ObjectReference{
Kind: "Node",
Name: node.Name,
}
}
binding := &api.Binding{
ObjectMeta: metav1.ObjectMeta{
Name: existingPod.Name,
Namespace: existingPod.Namespace,
Labels: test.existingBindingLabels,
},
Target: *target,
}
if err := admissiontesting.WithReinvocationTesting(t, handler).
Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), existingPod.Namespace, existingPod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil {
t.Errorf("failed running admission plugin: %v", err)
}
updatedBindingLabels := binding.Labels
if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) {
t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels))
}
})
})
}
}
// newHandlerForTest returns the admission controller configured for testing.
func newHandlerForTest(c kubernetes.Interface) (*Plugin, informers.SharedInformerFactory, error) {
factory := informers.NewSharedInformerFactory(c, 5*time.Minute)
handler := NewPodTopologyPlugin(defaultConfig) // todo: write additional test cases with non-default config.
pluginInitializer := genericadmissioninitializer.New(c, nil, factory, nil, feature.DefaultFeatureGate, nil, nil)
pluginInitializer.Initialize(handler)
return handler, factory, admission.ValidateInitialization(handler)
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package podtopologylabels is a plugin that mutates `pod/binding` requests
// by copying the `topology.k8s.io/{zone,region}` labels from the assigned Node
// object (in the Binding being admitted) onto the Binding so that it can be
// persisted onto the Pod object when the Pod is being scheduled.
// Requests for the regular `pods` resource that set the `spec.nodeName` will
// also trigger the plugin to copy the labels as described.
// If the binding target is NOT a Node object, no action is taken.
// If the referenced Node object does not exist, no action is taken.
package podtopologylabels // import "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"

View File

@ -1075,6 +1075,12 @@
lockToDefault: true
preRelease: GA
version: "1.30"
- name: PodTopologyLabelsAdmission
versionedSpecs:
- default: false
lockToDefault: false
preRelease: Alpha
version: "1.33"
- name: PortForwardWebsockets
versionedSpecs:
- default: false

View File

@ -22,11 +22,15 @@ import (
"strings"
"testing"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/version"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/core/v1"
@ -40,6 +44,168 @@ import (
"k8s.io/kubernetes/test/integration/framework"
)
func TestPodTopologyLabels(t *testing.T) {
tests := []podTopologyTestCase{
{
name: "zone and region topology labels copied from assigned Node",
targetNodeLabels: map[string]string{
"topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
},
expectedPodLabels: map[string]string{
"topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
},
},
{
name: "subdomains of topology.k8s.io are not copied",
targetNodeLabels: map[string]string{
"sub.topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
},
expectedPodLabels: map[string]string{
"topology.k8s.io/region": "region",
},
},
{
name: "custom topology.k8s.io labels are not copied",
targetNodeLabels: map[string]string{
"topology.k8s.io/custom": "thing",
"topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
},
expectedPodLabels: map[string]string{
"topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
},
},
{
name: "labels from Bindings overwriting existing labels on Pod",
existingPodLabels: map[string]string{
"topology.k8s.io/zone": "bad-zone",
"topology.k8s.io/region": "bad-region",
"topology.k8s.io/abc": "123",
},
targetNodeLabels: map[string]string{
"topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
"topology.k8s.io/abc": "456", // this label isn't in (zone, region) so isn't copied
},
expectedPodLabels: map[string]string{
"topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
"topology.k8s.io/abc": "123",
},
},
}
// Enable the feature BEFORE starting the test server, as the admission plugin only checks feature gates
// on start up and not on each invocation at runtime.
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33"))
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, true)
testPodTopologyLabels(t, tests)
}
func TestPodTopologyLabels_FeatureDisabled(t *testing.T) {
tests := []podTopologyTestCase{
{
name: "does nothing when the feature is not enabled",
targetNodeLabels: map[string]string{
"topology.k8s.io/zone": "zone",
"topology.k8s.io/region": "region",
"topology.k8s.io/custom": "thing",
"sub.topology.k8s.io/zone": "zone",
},
expectedPodLabels: map[string]string{},
},
}
// Disable the feature BEFORE starting the test server, as the admission plugin only checks feature gates
// on start up and not on each invocation at runtime.
featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33"))
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodTopologyLabelsAdmission, false)
testPodTopologyLabels(t, tests)
}
// podTopologyTestCase is defined outside of TestPodTopologyLabels to allow us to re-use the test implementation logic
// between the feature enabled and feature disabled tests.
// This will no longer be required once the feature gate graduates to GA/locked to being enabled.
type podTopologyTestCase struct {
name string
targetNodeLabels map[string]string
existingPodLabels map[string]string
expectedPodLabels map[string]string
}
func testPodTopologyLabels(t *testing.T, tests []podTopologyTestCase) {
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())
defer server.TearDownFn()
client := clientset.NewForConfigOrDie(server.ClientConfig)
ns := framework.CreateNamespaceOrDie(client, "pod-topology-labels", t)
defer framework.DeleteNamespaceOrDie(client, ns, t)
prototypePod := func() *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "pod-topology-test-",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "fake-name",
Image: "fakeimage",
},
},
},
}
}
prototypeNode := func() *v1.Node {
return &v1.Node{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "podtopology-test-node-",
},
}
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Create the Node we are going to bind to.
node := prototypeNode()
// Set the labels on the Node we are going to create.
node.Labels = test.targetNodeLabels
ctx := context.Background()
var err error
if node, err = client.CoreV1().Nodes().Create(ctx, node, metav1.CreateOptions{}); err != nil {
t.Errorf("Failed to create node: %v", err)
}
pod := prototypePod()
pod.Labels = test.existingPodLabels
if pod, err = client.CoreV1().Pods(ns.Name).Create(ctx, pod, metav1.CreateOptions{}); err != nil {
t.Errorf("Failed to create pod: %v", err)
}
binding := &v1.Binding{
ObjectMeta: metav1.ObjectMeta{Name: pod.Name, Namespace: pod.Namespace},
Target: v1.ObjectReference{
Kind: "Node",
Name: node.Name,
},
}
if err := client.CoreV1().Pods(pod.Namespace).Bind(ctx, binding, metav1.CreateOptions{}); err != nil {
t.Errorf("Failed to bind pod to node: %v", err)
}
if pod, err = client.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}); err != nil {
t.Errorf("Failed to fetch bound Pod: %v", err)
}
if !apiequality.Semantic.DeepEqual(pod.Labels, test.expectedPodLabels) {
t.Errorf("Unexpected label values: %v", cmp.Diff(pod.Labels, test.expectedPodLabels))
}
})
}
}
func TestPodUpdateActiveDeadlineSeconds(t *testing.T) {
// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
server := kubeapiservertesting.StartTestServerOrDie(t, nil, framework.DefaultTestServerFlags(), framework.SharedEtcd())