Allow updating node affinity, selector and tolerations for suspended jobs that never started

This commit is contained in:
Abdullah Gharaibeh 2021-10-04 22:04:34 -04:00
parent d9896a23bc
commit 335817cbce
6 changed files with 420 additions and 14 deletions

View File

@ -214,7 +214,7 @@ func validateJobStatus(status *batch.JobStatus, fldPath *field.Path) field.Error
}
// ValidateJobUpdate validates an update to a Job and returns an ErrorList with any errors.
func ValidateJobUpdate(job, oldJob *batch.Job, opts apivalidation.PodValidationOptions) field.ErrorList {
func ValidateJobUpdate(job, oldJob *batch.Job, opts JobValidationOptions) field.ErrorList {
allErrs := apivalidation.ValidateObjectMetaUpdate(&job.ObjectMeta, &oldJob.ObjectMeta, field.NewPath("metadata"))
allErrs = append(allErrs, ValidateJobSpecUpdate(job.Spec, oldJob.Spec, field.NewPath("spec"), opts)...)
return allErrs
@ -228,16 +228,43 @@ func ValidateJobUpdateStatus(job, oldJob *batch.Job) field.ErrorList {
}
// ValidateJobSpecUpdate validates an update to a JobSpec and returns an ErrorList with any errors.
func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts apivalidation.PodValidationOptions) field.ErrorList {
func ValidateJobSpecUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts)...)
allErrs = append(allErrs, ValidateJobSpec(&spec, fldPath, opts.PodValidationOptions)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Completions, oldSpec.Completions, fldPath.Child("completions"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Selector, oldSpec.Selector, fldPath.Child("selector"))...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.Template, oldSpec.Template, fldPath.Child("template"))...)
allErrs = append(allErrs, validatePodTemplateUpdate(spec, oldSpec, fldPath, opts)...)
allErrs = append(allErrs, apivalidation.ValidateImmutableField(spec.CompletionMode, oldSpec.CompletionMode, fldPath.Child("completionMode"))...)
return allErrs
}
func validatePodTemplateUpdate(spec, oldSpec batch.JobSpec, fldPath *field.Path, opts JobValidationOptions) field.ErrorList {
allErrs := field.ErrorList{}
template := &spec.Template
oldTemplate := &oldSpec.Template
if opts.AllowMutableSchedulingDirectives {
oldTemplate = oldSpec.Template.DeepCopy() // +k8s:verify-mutation:reason=clone
switch {
case template.Spec.Affinity == nil && oldTemplate.Spec.Affinity != nil:
// allow the Affinity field to be cleared if the old template had no affinity directives other than NodeAffinity
oldTemplate.Spec.Affinity.NodeAffinity = nil // +k8s:verify-mutation:reason=clone
if (*oldTemplate.Spec.Affinity) == (api.Affinity{}) {
oldTemplate.Spec.Affinity = nil // +k8s:verify-mutation:reason=clone
}
case template.Spec.Affinity != nil && oldTemplate.Spec.Affinity == nil:
// allow the NodeAffinity field to skip immutability checking
oldTemplate.Spec.Affinity = &api.Affinity{NodeAffinity: template.Spec.Affinity.NodeAffinity} // +k8s:verify-mutation:reason=clone
case template.Spec.Affinity != nil && oldTemplate.Spec.Affinity != nil:
// allow the NodeAffinity field to skip immutability checking
oldTemplate.Spec.Affinity.NodeAffinity = template.Spec.Affinity.NodeAffinity // +k8s:verify-mutation:reason=clone
}
oldTemplate.Spec.NodeSelector = template.Spec.NodeSelector // +k8s:verify-mutation:reason=clone
oldTemplate.Spec.Tolerations = template.Spec.Tolerations // +k8s:verify-mutation:reason=clone
}
allErrs = append(allErrs, apivalidation.ValidateImmutableField(template, oldTemplate, fldPath.Child("template"))...)
return allErrs
}
// ValidateJobStatusUpdate validates an update to a JobStatus and returns an ErrorList with any errors.
func ValidateJobStatusUpdate(status, oldStatus batch.JobStatus) field.ErrorList {
allErrs := field.ErrorList{}
@ -346,4 +373,6 @@ type JobValidationOptions struct {
apivalidation.PodValidationOptions
// Allow Job to have the annotation batch.kubernetes.io/job-tracking
AllowTrackingAnnotation bool
// Allow mutable node affinity, selector and tolerations of the template
AllowMutableSchedulingDirectives bool
}

View File

@ -372,9 +372,57 @@ func TestValidateJob(t *testing.T) {
func TestValidateJobUpdate(t *testing.T) {
validGeneratedSelector := getValidGeneratedSelector()
validPodTemplateSpecForGenerated := getValidPodTemplateSpecForGenerated(validGeneratedSelector)
validNodeAffinity := &api.Affinity{
NodeAffinity: &api.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &api.NodeSelector{
NodeSelectorTerms: []api.NodeSelectorTerm{
{
MatchExpressions: []api.NodeSelectorRequirement{
{
Key: "foo",
Operator: api.NodeSelectorOpIn,
Values: []string{"bar", "value2"},
},
},
},
},
},
},
}
validPodTemplateWithAffinity := getValidPodTemplateSpecForGenerated(validGeneratedSelector)
validPodTemplateWithAffinity.Spec.Affinity = &api.Affinity{
NodeAffinity: &api.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &api.NodeSelector{
NodeSelectorTerms: []api.NodeSelectorTerm{
{
MatchExpressions: []api.NodeSelectorRequirement{
{
Key: "foo",
Operator: api.NodeSelectorOpIn,
Values: []string{"bar", "value"},
},
},
},
},
},
},
}
// This is to test immutability of the selector, both the new and old
// selector should match the labels in the template, which is immutable
// on its own; therfore, the only way to test selector immutability is
// when the new selector is changed but still matches the existing labels.
newSelector := getValidGeneratedSelector()
newSelector.MatchLabels["foo"] = "bar"
validTolerations := []api.Toleration{{
Key: "foo",
Operator: api.TolerationOpEqual,
Value: "bar",
Effect: api.TaintEffectPreferNoSchedule,
}}
cases := map[string]struct {
old batch.Job
update func(*batch.Job)
opts JobValidationOptions
err *field.Error
}{
"mutable fields": {
@ -416,11 +464,11 @@ func TestValidateJobUpdate(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
Template: getValidPodTemplateSpecForGenerated(newSelector),
},
},
update: func(job *batch.Job) {
job.Spec.Selector.MatchLabels["foo"] = "bar"
job.Spec.Selector = newSelector
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
@ -436,7 +484,7 @@ func TestValidateJobUpdate(t *testing.T) {
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Containers = nil
job.Spec.Template.Spec.DNSPolicy = api.DNSClusterFirstWithHostNet
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
@ -461,6 +509,144 @@ func TestValidateJobUpdate(t *testing.T) {
Field: "spec.completionMode",
},
},
"immutable node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = validNodeAffinity
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.template",
},
},
"add node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = validNodeAffinity
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"update node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateWithAffinity,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = validNodeAffinity
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"remove node affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateWithAffinity,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity.NodeAffinity = nil
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"remove affinity": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateWithAffinity,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Affinity = nil
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"immutable tolerations": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Tolerations = validTolerations
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.template",
},
},
"mutable tolerations": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.Tolerations = validTolerations
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
"immutable node selector": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
err: &field.Error{
Type: field.ErrorTypeInvalid,
Field: "spec.template",
},
},
"mutable node selector": {
old: batch.Job{
ObjectMeta: metav1.ObjectMeta{Name: "abc", Namespace: metav1.NamespaceDefault},
Spec: batch.JobSpec{
Selector: validGeneratedSelector,
Template: validPodTemplateSpecForGenerated,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
opts: JobValidationOptions{
AllowMutableSchedulingDirectives: true,
},
},
}
ignoreValueAndDetail := cmpopts.IgnoreFields(field.Error{}, "BadValue", "Detail")
for k, tc := range cases {
@ -468,7 +654,7 @@ func TestValidateJobUpdate(t *testing.T) {
tc.old.ResourceVersion = "1"
update := tc.old.DeepCopy()
tc.update(update)
errs := ValidateJobUpdate(&tc.old, update, corevalidation.PodValidationOptions{})
errs := ValidateJobUpdate(update, &tc.old, tc.opts)
var wantErrs field.ErrorList
if tc.err != nil {
wantErrs = append(wantErrs, tc.err)

View File

@ -759,6 +759,14 @@ const (
// See https://groups.google.com/g/kubernetes-sig-architecture/c/Nxsc7pfe5rw/m/vF2djJh0BAAJ
// for details about the removal of this feature gate.
CPUManagerPolicyBetaOptions featuregate.Feature = "CPUManagerPolicyBetaOptions"
// owner: @ahg
// beta: v1.23
//
// Allow updating node scheduling directives in the pod template of jobs. Specifically,
// node affinity, selector and tolerations. This is allowed only for suspended jobs
// that have never been unsuspended before.
JobMutableNodeSchedulingDirectives featuregate.Feature = "JobMutableNodeSchedulingDirectives"
)
func init() {
@ -871,6 +879,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
ControllerManagerLeaderMigration: {Default: true, PreRelease: featuregate.Beta},
CPUManagerPolicyAlphaOptions: {Default: false, PreRelease: featuregate.Alpha},
CPUManagerPolicyBetaOptions: {Default: true, PreRelease: featuregate.Beta},
JobMutableNodeSchedulingDirectives: {Default: true, PreRelease: featuregate.Beta},
// inherited features from generic apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -191,7 +191,16 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) validation.JobValidation
// existing jobs, we allow the annotation only if the Job already had it,
// regardless of the feature gate.
opts.AllowTrackingAnnotation = hasJobTrackingAnnotation(oldJob)
// Updating node affinity, node selector and tolerations is allowed
// only for suspended jobs that never started before.
suspended := oldJob.Spec.Suspend != nil && *oldJob.Spec.Suspend
notStarted := oldJob.Status.StartTime == nil
opts.AllowMutableSchedulingDirectives = utilfeature.DefaultFeatureGate.Enabled(features.JobMutableNodeSchedulingDirectives) &&
suspended && notStarted
}
return opts
}
@ -271,7 +280,7 @@ func (jobStrategy) ValidateUpdate(ctx context.Context, obj, old runtime.Object)
opts := validationOptionsForJob(job, oldJob)
validationErrorList := validation.ValidateJob(job, opts)
updateErrorList := validation.ValidateJobUpdate(job, oldJob, opts.PodValidationOptions)
updateErrorList := validation.ValidateJobUpdate(job, oldJob, opts)
return append(validationErrorList, updateErrorList...)
}

View File

@ -253,11 +253,13 @@ func TestJobStrategyValidateUpdate(t *testing.T) {
Containers: []api.Container{{Name: "abc", Image: "image", ImagePullPolicy: "IfNotPresent", TerminationMessagePolicy: api.TerminationMessageReadFile}},
},
}
now := metav1.Now()
cases := map[string]struct {
job *batch.Job
update func(*batch.Job)
wantErrs field.ErrorList
trackingWithFinalizersEnabled bool
job *batch.Job
update func(*batch.Job)
wantErrs field.ErrorList
trackingWithFinalizersEnabled bool
mutableSchedulingDirectivesEnabled bool
}{
"update parallelism": {
job: &batch.Job{
@ -366,10 +368,106 @@ func TestJobStrategyValidateUpdate(t *testing.T) {
job.Annotations["foo"] = "bar"
},
},
"updating node selector for unsuspended job disallowed": {
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
Annotations: map[string]string{"foo": "bar"},
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: pointer.BoolPtr(true),
Parallelism: pointer.Int32Ptr(1),
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
mutableSchedulingDirectivesEnabled: true,
},
"updating node selector for suspended but previously started job disallowed": {
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
Annotations: map[string]string{"foo": "bar"},
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: pointer.BoolPtr(true),
Parallelism: pointer.Int32Ptr(1),
Suspend: pointer.BoolPtr(true),
},
Status: batch.JobStatus{
StartTime: &now,
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
mutableSchedulingDirectivesEnabled: true,
},
"updating node selector for suspended and not previously started job allowed": {
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
Annotations: map[string]string{"foo": "bar"},
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: pointer.BoolPtr(true),
Parallelism: pointer.Int32Ptr(1),
Suspend: pointer.BoolPtr(true),
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
mutableSchedulingDirectivesEnabled: true,
},
"updating node selector whilte gate disabled disallowed": {
job: &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Name: "myjob",
Namespace: metav1.NamespaceDefault,
ResourceVersion: "0",
Annotations: map[string]string{"foo": "bar"},
},
Spec: batch.JobSpec{
Selector: validSelector,
Template: validPodTemplateSpec,
ManualSelector: pointer.BoolPtr(true),
Parallelism: pointer.Int32Ptr(1),
Suspend: pointer.BoolPtr(true),
},
},
update: func(job *batch.Job) {
job.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
},
wantErrs: field.ErrorList{
{Type: field.ErrorTypeInvalid, Field: "spec.template"},
},
mutableSchedulingDirectivesEnabled: false,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizersEnabled)()
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobMutableNodeSchedulingDirectives, tc.mutableSchedulingDirectivesEnabled)()
newJob := tc.job.DeepCopy()
tc.update(newJob)
errs := Strategy.ValidateUpdate(ctx, newJob, tc.job)
@ -378,7 +476,6 @@ func TestJobStrategyValidateUpdate(t *testing.T) {
}
})
}
}
func TestJobStrategyWithGeneration(t *testing.T) {

View File

@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"
@ -629,6 +630,81 @@ func TestSuspendJobControllerRestart(t *testing.T) {
}, false)
}
func TestNodeSelectorUpdate(t *testing.T) {
for name, featureGate := range map[string]bool{
"feature gate disabled": false,
"feature gate enabled": true,
} {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobMutableNodeSchedulingDirectives, featureGate)()
closeFn, restConfig, clientSet, ns := setup(t, "suspend")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
defer cancel()
job, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(1),
Suspend: pointer.BoolPtr(true),
}})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
jobName := job.Name
jobNamespace := job.Namespace
// (1) Unsuspend and set node selector in the same update.
nodeSelector := map[string]string{"foo": "bar"}
job.Spec.Template.Spec.NodeSelector = nodeSelector
job.Spec.Suspend = pointer.BoolPtr(false)
_, err = clientSet.BatchV1().Jobs(jobNamespace).Update(ctx, job, metav1.UpdateOptions{})
if !featureGate {
if err == nil || !strings.Contains(err.Error(), "spec.template: Invalid value") {
t.Errorf("Expected \"spec.template: Invalid value\" error, got: %v", err)
}
} else if featureGate && err != nil {
t.Errorf("Unexpected error: %v", err)
}
// (2) Check that the pod was created using the expected node selector.
if featureGate {
var pod *v1.Pod
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
pods, err := clientSet.CoreV1().Pods(jobNamespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
if len(pods.Items) == 0 {
return false, nil
}
pod = &pods.Items[0]
return true, nil
}); err != nil || pod == nil {
t.Fatalf("pod not found: %v", err)
}
// if the feature gate is enabled, then the job should now be unsuspended and
// the pod has the node selector.
if diff := cmp.Diff(nodeSelector, pod.Spec.NodeSelector); diff != "" {
t.Errorf("Unexpected nodeSelector (-want,+got):\n%s", diff)
}
}
// (3) Update node selector again. It should fail since the job is unsuspended.
var updatedJob *batchv1.Job
if updatedJob, err = clientSet.BatchV1().Jobs(jobNamespace).Get(ctx, jobName, metav1.GetOptions{}); err != nil {
t.Fatalf("can't find the job: %v", err)
}
updatedJob.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "baz"}
_, err = clientSet.BatchV1().Jobs(jobNamespace).Update(ctx, updatedJob, metav1.UpdateOptions{})
if err == nil || !strings.Contains(err.Error(), "spec.template: Invalid value") {
t.Errorf("Expected \"spec.template: Invalid value\" error, got: %v", err)
}
})
}
}
type podsByStatus struct {
Active int
Failed int