diff --git a/pkg/registry/core/pod/storage/storage.go b/pkg/registry/core/pod/storage/storage.go index 27f7c7fcfff..b93a38cd192 100644 --- a/pkg/registry/core/pod/storage/storage.go +++ b/pkg/registry/core/pod/storage/storage.go @@ -205,10 +205,10 @@ func (r *BindingREST) PreserveRequestObjectMetaSystemFieldsOnSubresourceCreate() return true } -// setPodHostAndAnnotations sets the given pod's host to 'machine' if and only if -// the pod is unassigned and merges the provided annotations with those of the pod. +// setPodNodeAndMetadata sets the given pod's nodeName to 'machine' if and only if +// the pod is unassigned, and merges the provided annotations and labels with those of the pod. // Returns the current state of the pod, or an error. -func (r *BindingREST) setPodHostAndAnnotations(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) { +func (r *BindingREST) setPodNodeAndMetadata(ctx context.Context, podUID types.UID, podResourceVersion, podID, machine string, annotations, labels map[string]string, dryRun bool) (finalPod *api.Pod, err error) { podKey, err := r.store.KeyFunc(ctx, podID) if err != nil { return nil, err @@ -284,7 +284,7 @@ func copyLabelsWithoutOverwriting(pod *api.Pod, labels map[string]string) { // assignPod assigns the given pod to the given machine. func (r *BindingREST) assignPod(ctx context.Context, podUID types.UID, podResourceVersion, podID string, machine string, annotations, labels map[string]string, dryRun bool) (err error) { - if _, err = r.setPodHostAndAnnotations(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { + if _, err = r.setPodNodeAndMetadata(ctx, podUID, podResourceVersion, podID, machine, annotations, labels, dryRun); err != nil { err = storeerr.InterpretGetError(err, api.Resource("pods"), podID) err = storeerr.InterpretUpdateError(err, api.Resource("pods"), podID) if _, ok := err.(*errors.StatusError); !ok { diff --git a/pkg/registry/core/pod/storage/storage_test.go b/pkg/registry/core/pod/storage/storage_test.go index 753bb970b8a..a2a0ac6c0a1 100644 --- a/pkg/registry/core/pod/storage/storage_test.go +++ b/pkg/registry/core/pod/storage/storage_test.go @@ -34,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/version" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/generic" genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" @@ -680,7 +679,6 @@ func TestEtcdCreateWithContainersNotFound(t *testing.T) { t.Fatalf("unexpected error: %v", err) } - featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, utilfeature.DefaultFeatureGate, version.MustParse("1.33")) featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, true) // Suddenly, a wild scheduler appears: _, err = bindingStorage.Create(ctx, "foo", &api.Binding{ diff --git a/plugin/pkg/admission/podtopologylabels/admission.go b/plugin/pkg/admission/podtopologylabels/admission.go index c4189269592..d0d6d2066f9 100644 --- a/plugin/pkg/admission/podtopologylabels/admission.go +++ b/plugin/pkg/admission/podtopologylabels/admission.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "io" - "strings" "k8s.io/klog/v2" @@ -39,7 +38,7 @@ import ( const PluginName = "PodTopologyLabels" // defaultConfig is the configuration used for the default instantiation of the plugin. -// This is the configured used by kube-apiserver. +// This configuration is used by kube-apiserver. // It is not exported to avoid any chance of accidentally mutating the variable. var defaultConfig = Config{ Labels: []string{"topology.k8s.io/zone", "topology.k8s.io/region", "kubernetes.io/hostname"}, @@ -60,27 +59,13 @@ type Config struct { // Labels is set of explicit label keys to be copied from the Node object onto // pod Binding objects during admission. Labels []string - // Domains is a set of label key prefixes used to copy across label values - // for all labels with a given domain prefix. - // For example, `example.com` would match all labels matching `example.com/*`. - // Keys without a domain portion (i.e. those not containing a /) will never match. - Domains []string - // Suffixes is a set of label key domain suffixes used to copy label values for - // all labels of a given subdomain. - // This acts as a suffix match on the domain portion of label keys. - // If a suffix does not have a leading '.', one will be prepended to avoid - // programmer errors with values like `example.com` matching `notexample.com`. - // Keys without a domain portion (i.e. those not containing a /) will never match. - Suffixes []string } // NewPodTopologyPlugin initializes a Plugin func NewPodTopologyPlugin(c Config) *Plugin { return &Plugin{ - Handler: admission.NewHandler(admission.Create), - labels: sets.New(c.Labels...), - domains: sets.New(c.Domains...), - suffixes: sets.New(c.Suffixes...), + Handler: admission.NewHandler(admission.Create), + labels: sets.New(c.Labels...), } } @@ -89,9 +74,8 @@ type Plugin struct { nodeLister corev1listers.NodeLister - // explicit labels, list of domains or a list of domain - // suffixes to be copies to Pod objects being bound. - labels, domains, suffixes sets.Set[string] + // explicit labels to be copied to Pod objects being bound. + labels sets.Set[string] enabled, inspectedFeatureGates bool } @@ -126,14 +110,21 @@ func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission. if !p.enabled { return nil } - if shouldIgnore(a) { - return nil + // check whether the request is for a Binding or a Pod spec update. + shouldAdmit, doAdmit, err := p.shouldAdmit(a) + if !shouldAdmit || err != nil { + // error is either nil and admit == false, or err is non-nil and should be returned. + return err } // we need to wait for our caches to warm if !p.WaitForReady() { return admission.NewForbidden(a, fmt.Errorf("not yet ready to handle request")) } + // run type specific admission + return doAdmit(ctx, a, o) +} +func (p *Plugin) admitBinding(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { binding := a.GetObject().(*api.Binding) // other fields are not set by the default scheduler for the binding target, so only check the Kind. if binding.Target.Kind != "Node" { @@ -141,28 +132,10 @@ func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission. return nil } - node, err := p.nodeLister.Get(binding.Target.Name) + labelsToCopy, err := p.topologyLabelsForNodeName(binding.Target.Name) if err != nil { - // Ignore NotFound errors to avoid risking breaking compatibility/behaviour. - if apierrors.IsNotFound(err) { - return nil - } return err } - - // fast-path/short circuit if the node has no labels - if node.Labels == nil { - return nil - } - - labelsToCopy := make(map[string]string) - for k, v := range node.Labels { - if !p.isTopologyLabel(k) { - continue - } - labelsToCopy[k] = v - } - if len(labelsToCopy) == 0 { // fast-path/short circuit if the node has no topology labels return nil @@ -173,58 +146,101 @@ func (p *Plugin) Admit(ctx context.Context, a admission.Attributes, o admission. if binding.Labels == nil { binding.Labels = make(map[string]string) } - for k, v := range labelsToCopy { - if _, exists := binding.Labels[k]; exists { - // Don't overwrite labels on Binding resources as this could lead to unexpected - // behaviour if any schedulers rely on being able to explicitly set values themselves. - continue - } - binding.Labels[k] = v - } + mergeLabels(binding.Labels, labelsToCopy) return nil } +func (p *Plugin) admitPod(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) error { + pod := a.GetObject().(*api.Pod) + if pod.Spec.NodeName == "" { + // pod has not been scheduled yet + return nil + } + + // Determine the topology labels from the assigned node to be copied. + labelsToCopy, err := p.topologyLabelsForNodeName(pod.Spec.NodeName) + if err != nil { + return err + } + if len(labelsToCopy) == 0 { + // fast-path/short circuit if the node has no topology labels + return nil + } + + // copy the topology labels into the Pod's labels. + if pod.Labels == nil { + pod.Labels = make(map[string]string) + } + // avoid overwriting any existing labels on the Pod. + mergeLabels(pod.Labels, labelsToCopy) + + return nil +} + +func (p *Plugin) topologyLabelsForNodeName(nodeName string) (map[string]string, error) { + labels := make(map[string]string) + node, err := p.nodeLister.Get(nodeName) + if err != nil { + // Ignore NotFound errors to avoid risking breaking compatibility/behaviour. + if apierrors.IsNotFound(err) { + return labels, nil + } + return nil, err + } + + for k, v := range node.Labels { + if !p.isTopologyLabel(k) { + continue + } + labels[k] = v + } + + return labels, nil +} + +// mergeLabels merges new into existing, without overwriting existing keys. +func mergeLabels(existing, new map[string]string) { + for k, v := range new { + if _, exists := existing[k]; exists { + continue + } + existing[k] = v + } +} + func (p *Plugin) isTopologyLabel(key string) bool { // First check explicit label keys. if p.labels.Has(key) { return true } - // Check the domain portion of the label key, if present - domain, _, hasDomain := strings.Cut(key, "/") - if !hasDomain { - // fast-path if there is no / separator - return false + return false +} + +type admitFunc func(ctx context.Context, a admission.Attributes, o admission.ObjectInterfaces) (err error) + +// shouldAdmit inspects the provided adminssion attributes to determine whether the request +// requires admittance through this plugin. +func (p *Plugin) shouldAdmit(a admission.Attributes) (bool, admitFunc, error) { + if a.GetResource().GroupResource() != api.Resource("pods") { + return false, nil, nil } - if p.domains.Has(domain) { - // check for explicit domains to copy - return true - } - for _, suffix := range p.suffixes.UnsortedList() { - // check if the domain has one of the suffixes that are to be copied - if strings.HasSuffix(domain, suffix) { - return true + + switch a.GetSubresource() { + case "": // regular Pod endpoint + _, ok := a.GetObject().(*api.Pod) + if !ok { + return false, nil, fmt.Errorf("expected Pod but got %T", a.GetObject()) } + return true, p.admitPod, nil + case "binding": + _, ok := a.GetObject().(*api.Binding) + if !ok { + return false, nil, fmt.Errorf("expected Binding but got %s", a.GetKind().Kind) + } + return true, p.admitBinding, nil + default: + // Ignore all other sub-resources. + return false, nil, nil } - return false -} - -func shouldIgnore(a admission.Attributes) bool { - resource := a.GetResource().GroupResource() - if resource != api.Resource("pods") { - return true - } - if a.GetSubresource() != "binding" { - // only run the checks below on the binding subresource - return true - } - - obj := a.GetObject() - _, ok := obj.(*api.Binding) - if !ok { - klog.Errorf("expected Binding but got %s", a.GetKind().Kind) - return true - } - - return false } diff --git a/plugin/pkg/admission/podtopologylabels/admission_test.go b/plugin/pkg/admission/podtopologylabels/admission_test.go index 1b79203bbf3..f85aba70c54 100644 --- a/plugin/pkg/admission/podtopologylabels/admission_test.go +++ b/plugin/pkg/admission/podtopologylabels/admission_test.go @@ -26,7 +26,6 @@ import ( corev1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/version" "k8s.io/apiserver/pkg/admission" genericadmissioninitializer "k8s.io/apiserver/pkg/admission/initializer" admissiontesting "k8s.io/apiserver/pkg/admission/testing" @@ -50,7 +49,7 @@ func TestPodTopology(t *testing.T) { featureDisabled bool // configure whether the SetPodTopologyLabels feature gate should be disabled. }{ { - name: "copies topology.k8s.io/zone and region labels to binding annotations", + name: "copies topology.k8s.io/zone and region labels to binding labels", targetNodeLabels: map[string]string{ "topology.k8s.io/zone": "zone1", "topology.k8s.io/region": "region1", @@ -123,46 +122,102 @@ func TestPodTopology(t *testing.T) { Labels: test.targetNodeLabels, }, } - // Pod we bind during test cases. - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name}, - Spec: corev1.PodSpec{}, - } - featuregatetesting.SetFeatureGateEmulationVersionDuringTest(t, feature.DefaultFeatureGate, version.MustParse("1.33")) - featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, - !test.featureDisabled) - mockClient := fake.NewSimpleClientset(namespace, node, pod) - handler, informerFactory, err := newHandlerForTest(mockClient) - if err != nil { - t.Fatalf("unexpected error initializing handler: %v", err) - } - stopCh := make(chan struct{}) - defer close(stopCh) - informerFactory.Start(stopCh) - target := test.bindingTarget - if target == nil { - target = &api.ObjectReference{ - Kind: "Node", - Name: node.Name, + featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, kubefeatures.PodTopologyLabelsAdmission, !test.featureDisabled) + + t.Run("using Pod directly (update)", func(t *testing.T) { + // set up the client and informers + mockClient := fake.NewSimpleClientset(namespace, node) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) } - } - binding := &api.Binding{ - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - Labels: test.existingBindingLabels, - }, - Target: *target, - } - if err := admissiontesting.WithReinvocationTesting(t, handler). - Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), pod.Namespace, pod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil { - t.Errorf("failed running admission plugin: %v", err) - } - updatedBindingLabels := binding.Labels - if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) { - t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels)) - } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + oldPod := &api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels}, + Spec: api.PodSpec{}, + } + pod := oldPod.DeepCopy() + pod.Spec.NodeName = node.Name + + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(pod, oldPod, + api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, + api.Resource("pods").WithVersion("version"), "", admission.Update, &metav1.UpdateOptions{}, + false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + }) + + t.Run("using Pod directly (create)", func(t *testing.T) { + // set up the client and informers + mockClient := fake.NewSimpleClientset(namespace, node) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + pod := &api.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name, Labels: test.existingBindingLabels}, + Spec: api.PodSpec{ + NodeName: node.Name, + }, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(pod, nil, + api.Kind("Pod").WithVersion("version"), pod.Namespace, pod.Name, + api.Resource("pods").WithVersion("version"), "", admission.Create, &metav1.UpdateOptions{}, + false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + }) + + t.Run("using Binding subresource", func(t *testing.T) { + // Pod we bind during test cases. + existingPod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{Name: "testPod", Namespace: namespace.Name}, + Spec: corev1.PodSpec{}, + } + mockClient := fake.NewSimpleClientset(namespace, node, existingPod) + handler, informerFactory, err := newHandlerForTest(mockClient) + if err != nil { + t.Fatalf("unexpected error initializing handler: %v", err) + } + stopCh := make(chan struct{}) + defer close(stopCh) + informerFactory.Start(stopCh) + + // create and submit a Binding object + target := test.bindingTarget + if target == nil { + target = &api.ObjectReference{ + Kind: "Node", + Name: node.Name, + } + } + binding := &api.Binding{ + ObjectMeta: metav1.ObjectMeta{ + Name: existingPod.Name, + Namespace: existingPod.Namespace, + Labels: test.existingBindingLabels, + }, + Target: *target, + } + if err := admissiontesting.WithReinvocationTesting(t, handler). + Admit(context.TODO(), admission.NewAttributesRecord(binding, nil, api.Kind("Binding").WithVersion("version"), existingPod.Namespace, existingPod.Name, api.Resource("pods").WithVersion("version"), "binding", admission.Create, &metav1.CreateOptions{}, false, nil), nil); err != nil { + t.Errorf("failed running admission plugin: %v", err) + } + updatedBindingLabels := binding.Labels + if !apiequality.Semantic.DeepEqual(updatedBindingLabels, test.expectedBindingLabels) { + t.Errorf("Unexpected label values: %v", cmp.Diff(updatedBindingLabels, test.expectedBindingLabels)) + } + }) }) } } diff --git a/plugin/pkg/admission/podtopologylabels/doc.go b/plugin/pkg/admission/podtopologylabels/doc.go index 2ae51a8e03f..56131f0b70a 100644 --- a/plugin/pkg/admission/podtopologylabels/doc.go +++ b/plugin/pkg/admission/podtopologylabels/doc.go @@ -15,10 +15,11 @@ limitations under the License. */ // Package podtopologylabels is a plugin that mutates `pod/binding` requests -// by copying the `topology.k8s.io/{zone,region}` and `kubernetes.io/hostname` -// labels from the assigned Node object (in the Binding being admitted) onto -// the Binding so that it can be persisted onto the Pod object when the Pod -// is being scheduled. +// by copying the `topology.k8s.io/{zone,region}` labels from the assigned Node +// object (in the Binding being admitted) onto the Binding so that it can be +// persisted onto the Pod object when the Pod is being scheduled. +// Requests for the regular `pods` resource that set the `spec.nodeName` will +// also trigger the plugin to copy the labels as described. // If the binding target is NOT a Node object, no action is taken. // If the referenced Node object does not exist, no action is taken. package podtopologylabels // import "k8s.io/kubernetes/plugin/pkg/admission/podtopologylabels"