mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
Merge pull request #113510 from alculquicondor/finalizers-stable
Graduate JobTrackingWithFinalizers to stable
This commit is contained in:
commit
ac95e5b701
2
api/openapi-spec/swagger.json
generated
2
api/openapi-spec/swagger.json
generated
@ -3235,7 +3235,7 @@
|
||||
},
|
||||
"uncountedTerminatedPods": {
|
||||
"$ref": "#/definitions/io.k8s.api.batch.v1.UncountedTerminatedPods",
|
||||
"description": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nThis field is beta-level. The job controller only makes use of this field when the feature gate JobTrackingWithFinalizers is enabled (enabled by default). Old jobs might not be tracked using this field, in which case the field remains null."
|
||||
"description": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nOld jobs might not be tracked using this field, in which case the field remains null."
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
|
@ -454,7 +454,7 @@
|
||||
"$ref": "#/components/schemas/io.k8s.api.batch.v1.UncountedTerminatedPods"
|
||||
}
|
||||
],
|
||||
"description": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nThis field is beta-level. The job controller only makes use of this field when the feature gate JobTrackingWithFinalizers is enabled (enabled by default). Old jobs might not be tracked using this field, in which case the field remains null."
|
||||
"description": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nOld jobs might not be tracked using this field, in which case the field remains null."
|
||||
}
|
||||
},
|
||||
"type": "object"
|
||||
|
@ -24,10 +24,13 @@ import (
|
||||
|
||||
// JobTrackingFinalizer is a finalizer for Job's pods. It prevents them from
|
||||
// being deleted before being accounted in the Job status.
|
||||
// The apiserver and job controller use this string as a Job annotation, to
|
||||
// mark Jobs that are being tracked using pod finalizers. Two releases after
|
||||
// the JobTrackingWithFinalizers graduates to GA, JobTrackingFinalizer will
|
||||
// no longer be used as a Job annotation.
|
||||
//
|
||||
// Additionally, the apiserver and job controller use this string as a Job
|
||||
// annotation, to mark Jobs that are being tracked using pod finalizers.
|
||||
// However, this behavior is deprecated in kubernetes 1.26. This means that, in
|
||||
// 1.27+, one release after JobTrackingWithFinalizers graduates to GA, the
|
||||
// apiserver and job controller will ignore this annotation and they will
|
||||
// always track jobs using finalizers.
|
||||
const JobTrackingFinalizer = "batch.kubernetes.io/job-tracking"
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
@ -405,9 +408,6 @@ type JobStatus struct {
|
||||
// (3) Remove the pod UID from the array while increasing the corresponding
|
||||
// counter.
|
||||
//
|
||||
// This field is beta-level. The job controller only makes use of this field
|
||||
// when the feature gate JobTrackingWithFinalizers is enabled (enabled
|
||||
// by default).
|
||||
// Old jobs might not be tracked using this field, in which case the field
|
||||
// remains null.
|
||||
// +optional
|
||||
|
@ -53,7 +53,7 @@ type orderedIntervals []interval
|
||||
// the indexes that succeeded since the last sync.
|
||||
func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
|
||||
var prevIntervals orderedIntervals
|
||||
withFinalizers := trackingUncountedPods(job)
|
||||
withFinalizers := hasJobTrackingAnnotation(job)
|
||||
if withFinalizers {
|
||||
prevIntervals = succeededIndexesFromJob(job)
|
||||
}
|
||||
|
@ -22,10 +22,6 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
batch "k8s.io/api/batch/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
featuregatetesting "k8s.io/component-base/featuregate/testing"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
@ -219,13 +215,7 @@ func TestCalculateSucceededIndexes(t *testing.T) {
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizers)()
|
||||
job := &batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
batch.JobTrackingFinalizer: "",
|
||||
},
|
||||
},
|
||||
Status: batch.JobStatus{
|
||||
CompletedIndexes: tc.prevSucceeded,
|
||||
},
|
||||
@ -233,6 +223,11 @@ func TestCalculateSucceededIndexes(t *testing.T) {
|
||||
Completions: pointer.Int32Ptr(tc.completions),
|
||||
},
|
||||
}
|
||||
if tc.trackingWithFinalizers {
|
||||
job.Annotations = map[string]string{
|
||||
batch.JobTrackingFinalizer: "",
|
||||
}
|
||||
}
|
||||
pods := hollowPodsWithIndexPhase(tc.pods)
|
||||
for _, p := range pods {
|
||||
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
|
||||
|
@ -714,17 +714,13 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (forget bool, rEr
|
||||
|
||||
var expectedRmFinalizers sets.String
|
||||
var uncounted *uncountedTerminatedPods
|
||||
if trackingUncountedPods(&job) {
|
||||
if hasJobTrackingAnnotation(&job) {
|
||||
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
|
||||
if job.Status.UncountedTerminatedPods == nil {
|
||||
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
|
||||
}
|
||||
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
|
||||
expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key)
|
||||
} else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
|
||||
if err := jm.patchJobHandler(ctx, &job, patch); err != nil {
|
||||
return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
|
||||
@ -1476,7 +1472,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
|
||||
if isIndexedJob(job) {
|
||||
addCompletionIndexEnvVariables(podTemplate)
|
||||
}
|
||||
if trackingUncountedPods(job) {
|
||||
if hasJobTrackingAnnotation(job) {
|
||||
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
|
||||
}
|
||||
|
||||
@ -1635,10 +1631,6 @@ func getCompletionMode(job *batch.Job) string {
|
||||
return string(batch.NonIndexedCompletion)
|
||||
}
|
||||
|
||||
func trackingUncountedPods(job *batch.Job) bool {
|
||||
return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job)
|
||||
}
|
||||
|
||||
func hasJobTrackingAnnotation(job *batch.Job) bool {
|
||||
if job.Annotations == nil {
|
||||
return false
|
||||
@ -1669,21 +1661,6 @@ func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
|
||||
return patchBytes
|
||||
}
|
||||
|
||||
func removeTrackingAnnotationPatch(job *batch.Job) []byte {
|
||||
if !hasJobTrackingAnnotation(job) {
|
||||
return nil
|
||||
}
|
||||
patch := map[string]interface{}{
|
||||
"metadata": map[string]interface{}{
|
||||
"annotations": map[string]interface{}{
|
||||
batch.JobTrackingFinalizer: nil,
|
||||
},
|
||||
},
|
||||
}
|
||||
patchBytes, _ := json.Marshal(patch)
|
||||
return patchBytes
|
||||
}
|
||||
|
||||
type uncountedTerminatedPods struct {
|
||||
succeeded sets.String
|
||||
failed sets.String
|
||||
|
@ -135,7 +135,7 @@ func newPodList(count int, status v1.PodPhase, job *batch.Job) []*v1.Pod {
|
||||
for i := 0; i < count; i++ {
|
||||
newPod := newPod(fmt.Sprintf("pod-%v", rand.String(10)), job)
|
||||
newPod.Status = v1.PodStatus{Phase: status}
|
||||
if trackingUncountedPods(job) {
|
||||
if hasJobTrackingAnnotation(job) {
|
||||
newPod.Finalizers = append(newPod.Finalizers, batch.JobTrackingFinalizer)
|
||||
}
|
||||
pods = append(pods, newPod)
|
||||
@ -178,7 +178,7 @@ func setPodsStatusesWithIndexes(podIndexer cache.Indexer, job *batch.Job, status
|
||||
}
|
||||
p.Spec.Hostname = fmt.Sprintf("%s-%s", job.Name, s.Index)
|
||||
}
|
||||
if trackingUncountedPods(job) {
|
||||
if hasJobTrackingAnnotation(job) {
|
||||
p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
|
||||
}
|
||||
podIndexer.Add(p)
|
||||
@ -343,15 +343,15 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
parallelism: 2,
|
||||
completions: 5,
|
||||
backoffLimit: 6,
|
||||
podControllerError: fmt.Errorf("fake error"),
|
||||
jobKeyForget: true,
|
||||
activePods: 1,
|
||||
succeededPods: 1,
|
||||
failedPods: 1,
|
||||
expectedCreations: 1,
|
||||
expectedActive: 1,
|
||||
expectedActive: 2,
|
||||
expectedSucceeded: 1,
|
||||
expectedFailed: 1,
|
||||
expectedPodPatches: 2,
|
||||
},
|
||||
"new failed pod": {
|
||||
parallelism: 2,
|
||||
@ -728,7 +728,6 @@ func TestControllerSyncJob(t *testing.T) {
|
||||
t.Skipf("Test is exclusive for wFinalizers=%t", *tc.wFinalizersExclusive)
|
||||
}
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.jobReadyPodsEnabled)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
|
||||
// job manager setup
|
||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
@ -918,13 +917,12 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI
|
||||
}
|
||||
|
||||
// TestSyncJobLegacyTracking makes sure that a Job is only tracked with
|
||||
// finalizers only when the feature is enabled and the job has the finalizer.
|
||||
// finalizers when the job has the annotation.
|
||||
func TestSyncJobLegacyTracking(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
job batch.Job
|
||||
trackingWithFinalizersEnabled bool
|
||||
wantUncounted bool
|
||||
wantPatches int
|
||||
job batch.Job
|
||||
wantUncounted bool
|
||||
wantPatches int
|
||||
}{
|
||||
"no annotation": {
|
||||
job: batch.Job{
|
||||
@ -937,19 +935,7 @@ func TestSyncJobLegacyTracking(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
"no annotation, feature enabled": {
|
||||
job: batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "ns",
|
||||
},
|
||||
Spec: batch.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(1),
|
||||
},
|
||||
},
|
||||
trackingWithFinalizersEnabled: true,
|
||||
},
|
||||
"tracking annotation, feature disabled": {
|
||||
"tracking annotation": {
|
||||
job: batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
@ -962,26 +948,9 @@ func TestSyncJobLegacyTracking(t *testing.T) {
|
||||
Parallelism: pointer.Int32Ptr(1),
|
||||
},
|
||||
},
|
||||
// Finalizer removed.
|
||||
wantPatches: 1,
|
||||
wantUncounted: true,
|
||||
},
|
||||
"tracking annotation, feature enabled": {
|
||||
job: batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
Namespace: "ns",
|
||||
Annotations: map[string]string{
|
||||
batch.JobTrackingFinalizer: "",
|
||||
},
|
||||
},
|
||||
Spec: batch.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(1),
|
||||
},
|
||||
},
|
||||
trackingWithFinalizersEnabled: true,
|
||||
wantUncounted: true,
|
||||
},
|
||||
"different annotation, feature enabled": {
|
||||
"different annotation": {
|
||||
job: batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "foo",
|
||||
@ -994,13 +963,10 @@ func TestSyncJobLegacyTracking(t *testing.T) {
|
||||
Parallelism: pointer.Int32Ptr(1),
|
||||
},
|
||||
},
|
||||
trackingWithFinalizersEnabled: true,
|
||||
},
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizersEnabled)()
|
||||
|
||||
// Job manager setup.
|
||||
clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
manager, sharedInformerFactory := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc)
|
||||
@ -2909,7 +2875,6 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
for name, tc := range testCases {
|
||||
t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
@ -2992,15 +2957,9 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||
updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
|
||||
wantRequeue: true,
|
||||
},
|
||||
"conflict error, with finalizers": {
|
||||
withFinalizers: true,
|
||||
updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
|
||||
wantRequeue: true,
|
||||
},
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.withFinalizers)()
|
||||
manager, sharedInformerFactory := newControllerFromClient(clientset, controller.NoResyncPeriodFunc)
|
||||
fakePodControl := controller.FakePodControl{}
|
||||
manager.podControl = &fakePodControl
|
||||
@ -4204,7 +4163,6 @@ func TestEnsureJobConditions(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFinalizersRemovedExpectations(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
clientset := fake.NewSimpleClientset()
|
||||
sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||
manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset)
|
||||
|
@ -450,6 +450,7 @@ const (
|
||||
// owner: @alculquicondor
|
||||
// alpha: v1.22
|
||||
// beta: v1.23
|
||||
// stable: v1.26
|
||||
//
|
||||
// Track Job completion without relying on Pod remaining in the cluster
|
||||
// indefinitely. Pod finalizers, in addition to a field in the Job status
|
||||
@ -951,7 +952,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
|
||||
|
||||
JobReadyPods: {Default: true, PreRelease: featuregate.Beta},
|
||||
|
||||
JobTrackingWithFinalizers: {Default: true, PreRelease: featuregate.Beta},
|
||||
JobTrackingWithFinalizers: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28
|
||||
|
||||
KubeletCredentialProviders: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // remove in 1.28
|
||||
|
||||
|
2
pkg/generated/openapi/zz_generated.openapi.go
generated
2
pkg/generated/openapi/zz_generated.openapi.go
generated
@ -13605,7 +13605,7 @@ func schema_k8sio_api_batch_v1_JobStatus(ref common.ReferenceCallback) common.Op
|
||||
},
|
||||
"uncountedTerminatedPods": {
|
||||
SchemaProps: spec.SchemaProps{
|
||||
Description: "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nThis field is beta-level. The job controller only makes use of this field when the feature gate JobTrackingWithFinalizers is enabled (enabled by default). Old jobs might not be tracked using this field, in which case the field remains null.",
|
||||
Description: "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nOld jobs might not be tracked using this field, in which case the field remains null.",
|
||||
Ref: ref("k8s.io/api/batch/v1.UncountedTerminatedPods"),
|
||||
},
|
||||
},
|
||||
|
@ -93,13 +93,10 @@ func (jobStrategy) PrepareForCreate(ctx context.Context, obj runtime.Object) {
|
||||
|
||||
job.Generation = 1
|
||||
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) {
|
||||
// Until this feature graduates to GA and soaks in clusters, we use an
|
||||
// annotation to mark whether jobs are tracked with it.
|
||||
addJobTrackingAnnotation(job)
|
||||
} else {
|
||||
dropJobTrackingAnnotation(job)
|
||||
}
|
||||
// While legacy tracking is supported, we use an annotation to mark whether
|
||||
// jobs are tracked with finalizers.
|
||||
addJobTrackingAnnotation(job)
|
||||
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
|
||||
job.Spec.PodFailurePolicy = nil
|
||||
}
|
||||
@ -122,20 +119,12 @@ func hasJobTrackingAnnotation(job *batch.Job) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func dropJobTrackingAnnotation(job *batch.Job) {
|
||||
delete(job.Annotations, batchv1.JobTrackingFinalizer)
|
||||
}
|
||||
|
||||
// PrepareForUpdate clears fields that are not allowed to be set by end users on update.
|
||||
func (jobStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object) {
|
||||
newJob := obj.(*batch.Job)
|
||||
oldJob := old.(*batch.Job)
|
||||
newJob.Status = oldJob.Status
|
||||
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && !hasJobTrackingAnnotation(oldJob) {
|
||||
dropJobTrackingAnnotation(newJob)
|
||||
}
|
||||
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) && oldJob.Spec.PodFailurePolicy == nil {
|
||||
newJob.Spec.PodFailurePolicy = nil
|
||||
}
|
||||
@ -147,6 +136,13 @@ func (jobStrategy) PrepareForUpdate(ctx context.Context, obj, old runtime.Object
|
||||
if !apiequality.Semantic.DeepEqual(newJob.Spec, oldJob.Spec) {
|
||||
newJob.Generation = oldJob.Generation + 1
|
||||
}
|
||||
|
||||
// While legacy tracking is supported, we use an annotation to mark whether
|
||||
// jobs are tracked with finalizers. This annotation cannot be removed by
|
||||
// users.
|
||||
if hasJobTrackingAnnotation(oldJob) {
|
||||
addJobTrackingAnnotation(newJob)
|
||||
}
|
||||
}
|
||||
|
||||
// Validate validates a new job.
|
||||
@ -170,7 +166,7 @@ func validationOptionsForJob(newJob, oldJob *batch.Job) validation.JobValidation
|
||||
}
|
||||
opts := validation.JobValidationOptions{
|
||||
PodValidationOptions: pod.GetValidationOptionsFromPodTemplate(newPodTemplate, oldPodTemplate),
|
||||
AllowTrackingAnnotation: utilfeature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers),
|
||||
AllowTrackingAnnotation: true,
|
||||
}
|
||||
if oldJob != nil {
|
||||
// Because we don't support the tracking with finalizers for already
|
||||
|
@ -184,6 +184,30 @@ func TestJobStrategy_PrepareForUpdate(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
"add tracking annotation back": {
|
||||
job: batch.Job{
|
||||
ObjectMeta: getValidObjectMetaWithAnnotations(0, map[string]string{batchv1.JobTrackingFinalizer: ""}),
|
||||
Spec: batch.JobSpec{
|
||||
Selector: validSelector,
|
||||
Template: validPodTemplateSpec,
|
||||
PodFailurePolicy: podFailurePolicy,
|
||||
},
|
||||
},
|
||||
updatedJob: batch.Job{
|
||||
ObjectMeta: getValidObjectMeta(0),
|
||||
Spec: batch.JobSpec{
|
||||
Selector: validSelector,
|
||||
Template: validPodTemplateSpec,
|
||||
},
|
||||
},
|
||||
wantJob: batch.Job{
|
||||
ObjectMeta: getValidObjectMetaWithAnnotations(1, map[string]string{batchv1.JobTrackingFinalizer: ""}),
|
||||
Spec: batch.JobSpec{
|
||||
Selector: validSelector,
|
||||
Template: validPodTemplateSpec,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for name, tc := range cases {
|
||||
@ -279,24 +303,6 @@ func TestJobStrategy_PrepareForCreate(t *testing.T) {
|
||||
|
||||
// TODO(#111514): refactor by spliting into dedicated test functions
|
||||
func TestJobStrategy(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
trackingWithFinalizersEnabled bool
|
||||
}{
|
||||
"features disabled": {},
|
||||
"new job tracking enabled": {
|
||||
trackingWithFinalizersEnabled: true,
|
||||
},
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizersEnabled)()
|
||||
testJobStrategy(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func testJobStrategy(t *testing.T) {
|
||||
trackingWithFinalizersEnabled := utilfeature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers)
|
||||
ctx := genericapirequest.NewDefaultContext()
|
||||
if !Strategy.NamespaceScoped() {
|
||||
t.Errorf("Job must be namespace scoped")
|
||||
@ -356,9 +362,9 @@ func testJobStrategy(t *testing.T) {
|
||||
if job.Spec.CompletionMode == nil {
|
||||
t.Errorf("Job should allow setting .spec.completionMode")
|
||||
}
|
||||
wantAnnotations := map[string]string{"foo": "bar"}
|
||||
if trackingWithFinalizersEnabled {
|
||||
wantAnnotations[batchv1.JobTrackingFinalizer] = ""
|
||||
wantAnnotations := map[string]string{
|
||||
"foo": "bar",
|
||||
batchv1.JobTrackingFinalizer: "",
|
||||
}
|
||||
if diff := cmp.Diff(wantAnnotations, job.Annotations); diff != "" {
|
||||
t.Errorf("Job has annotations (-want,+got):\n%s", diff)
|
||||
@ -405,9 +411,8 @@ func testJobStrategy(t *testing.T) {
|
||||
if updatedJob.Generation != 2 {
|
||||
t.Errorf("expected Generation=2, got %d", updatedJob.Generation)
|
||||
}
|
||||
wantAnnotations = make(map[string]string)
|
||||
if trackingWithFinalizersEnabled {
|
||||
wantAnnotations[batchv1.JobTrackingFinalizer] = ""
|
||||
wantAnnotations = map[string]string{
|
||||
batchv1.JobTrackingFinalizer: "",
|
||||
}
|
||||
if diff := cmp.Diff(wantAnnotations, updatedJob.Annotations); diff != "" {
|
||||
t.Errorf("Job has annotations (-want,+got):\n%s", diff)
|
||||
@ -418,17 +423,6 @@ func testJobStrategy(t *testing.T) {
|
||||
t.Errorf("Expected a validation error")
|
||||
}
|
||||
|
||||
// Ensure going from legacy tracking Job to tracking with finalizers is
|
||||
// disallowed.
|
||||
job = job.DeepCopy()
|
||||
job.Annotations = nil
|
||||
updatedJob = job.DeepCopy()
|
||||
updatedJob.Annotations = map[string]string{batch.JobTrackingFinalizer: ""}
|
||||
errs = Strategy.ValidateUpdate(ctx, updatedJob, job)
|
||||
if len(errs) != 1 {
|
||||
t.Errorf("Expected update validation error")
|
||||
}
|
||||
|
||||
// Test updating suspend false->true and nil-> true when the feature gate is
|
||||
// disabled. We don't care about other combinations.
|
||||
job.Spec.Suspend, updatedJob.Spec.Suspend = pointer.BoolPtr(false), pointer.BoolPtr(true)
|
||||
@ -475,7 +469,6 @@ func TestJobStrategyValidateUpdate(t *testing.T) {
|
||||
job *batch.Job
|
||||
update func(*batch.Job)
|
||||
wantErrs field.ErrorList
|
||||
trackingWithFinalizersEnabled bool
|
||||
mutableSchedulingDirectivesEnabled bool
|
||||
}{
|
||||
"update parallelism": {
|
||||
@ -518,7 +511,7 @@ func TestJobStrategyValidateUpdate(t *testing.T) {
|
||||
{Type: field.ErrorTypeInvalid, Field: "spec.completions"},
|
||||
},
|
||||
},
|
||||
"adding tracking annotation disallowed, gate disabled": {
|
||||
"adding tracking annotation disallowed": {
|
||||
job: &batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "myjob",
|
||||
@ -540,30 +533,7 @@ func TestJobStrategyValidateUpdate(t *testing.T) {
|
||||
{Type: field.ErrorTypeForbidden, Field: "metadata.annotations[batch.kubernetes.io/job-tracking]"},
|
||||
},
|
||||
},
|
||||
"adding tracking annotation disallowed, gate enabled": {
|
||||
job: &batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "myjob",
|
||||
Namespace: metav1.NamespaceDefault,
|
||||
ResourceVersion: "0",
|
||||
Annotations: map[string]string{"foo": "bar"},
|
||||
},
|
||||
Spec: batch.JobSpec{
|
||||
Selector: validSelector,
|
||||
Template: validPodTemplateSpec,
|
||||
ManualSelector: pointer.BoolPtr(true),
|
||||
Parallelism: pointer.Int32Ptr(1),
|
||||
},
|
||||
},
|
||||
update: func(job *batch.Job) {
|
||||
job.Annotations[batch.JobTrackingFinalizer] = ""
|
||||
},
|
||||
wantErrs: field.ErrorList{
|
||||
{Type: field.ErrorTypeForbidden, Field: "metadata.annotations[batch.kubernetes.io/job-tracking]"},
|
||||
},
|
||||
trackingWithFinalizersEnabled: true,
|
||||
},
|
||||
"preserving tracking annotation, feature disabled": {
|
||||
"preserving tracking annotation": {
|
||||
job: &batch.Job{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "myjob",
|
||||
@ -683,7 +653,6 @@ func TestJobStrategyValidateUpdate(t *testing.T) {
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizersEnabled)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.JobMutableNodeSchedulingDirectives, tc.mutableSchedulingDirectivesEnabled)()
|
||||
newJob := tc.job.DeepCopy()
|
||||
tc.update(newJob)
|
||||
|
@ -355,9 +355,6 @@ message JobStatus {
|
||||
// (3) Remove the pod UID from the arrays while increasing the corresponding
|
||||
// counter.
|
||||
//
|
||||
// This field is beta-level. The job controller only makes use of this field
|
||||
// when the feature gate JobTrackingWithFinalizers is enabled (enabled
|
||||
// by default).
|
||||
// Old jobs might not be tracked using this field, in which case the field
|
||||
// remains null.
|
||||
// +optional
|
||||
|
@ -27,10 +27,13 @@ const (
|
||||
|
||||
// JobTrackingFinalizer is a finalizer for Job's pods. It prevents them from
|
||||
// being deleted before being accounted in the Job status.
|
||||
// The apiserver and job controller use this string as a Job annotation, to
|
||||
// mark Jobs that are being tracked using pod finalizers. Two releases after
|
||||
// the JobTrackingWithFinalizers graduates to GA, JobTrackingFinalizer will
|
||||
// no longer be used as a Job annotation.
|
||||
//
|
||||
// Additionally, the apiserver and job controller use this string as a Job
|
||||
// annotation, to mark Jobs that are being tracked using pod finalizers.
|
||||
// However, this behavior is deprecated in kubernetes 1.26. This means that, in
|
||||
// 1.27+, one release after JobTrackingWithFinalizers graduates to GA, the
|
||||
// apiserver and job controller will ignore this annotation and they will
|
||||
// always track jobs using finalizers.
|
||||
JobTrackingFinalizer = "batch.kubernetes.io/job-tracking"
|
||||
)
|
||||
|
||||
@ -384,9 +387,6 @@ type JobStatus struct {
|
||||
// (3) Remove the pod UID from the arrays while increasing the corresponding
|
||||
// counter.
|
||||
//
|
||||
// This field is beta-level. The job controller only makes use of this field
|
||||
// when the feature gate JobTrackingWithFinalizers is enabled (enabled
|
||||
// by default).
|
||||
// Old jobs might not be tracked using this field, in which case the field
|
||||
// remains null.
|
||||
// +optional
|
||||
|
@ -138,7 +138,7 @@ var map_JobStatus = map[string]string{
|
||||
"succeeded": "The number of pods which reached phase Succeeded.",
|
||||
"failed": "The number of pods which reached phase Failed.",
|
||||
"completedIndexes": "CompletedIndexes holds the completed indexes when .spec.completionMode = \"Indexed\" in a text format. The indexes are represented as decimal integers separated by commas. The numbers are listed in increasing order. Three or more consecutive numbers are compressed and represented by the first and last element of the series, separated by a hyphen. For example, if the completed indexes are 1, 3, 4, 5 and 7, they are represented as \"1,3-5,7\".",
|
||||
"uncountedTerminatedPods": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nThis field is beta-level. The job controller only makes use of this field when the feature gate JobTrackingWithFinalizers is enabled (enabled by default). Old jobs might not be tracked using this field, in which case the field remains null.",
|
||||
"uncountedTerminatedPods": "UncountedTerminatedPods holds the UIDs of Pods that have terminated but the job controller hasn't yet accounted for in the status counters.\n\nThe job controller creates pods with a finalizer. When a pod terminates (succeeded or failed), the controller does three steps to account for it in the job status: (1) Add the pod UID to the arrays in this field. (2) Remove the pod finalizer. (3) Remove the pod UID from the arrays while increasing the corresponding\n counter.\n\nOld jobs might not be tracked using this field, in which case the field remains null.",
|
||||
"ready": "The number of pods which have a Ready condition.\n\nThis field is beta-level. The job controller populates the field when the feature gate JobReadyPods is enabled (enabled by default).",
|
||||
}
|
||||
|
||||
|
@ -71,8 +71,6 @@ type metricLabelsWithValue struct {
|
||||
func TestMetricsOnSuccesses(t *testing.T) {
|
||||
nonIndexedCompletion := batchv1.NonIndexedCompletion
|
||||
indexedCompletion := batchv1.IndexedCompletion
|
||||
wFinalizers := true
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
|
||||
// setup the job controller
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
@ -135,7 +133,7 @@ func TestMetricsOnSuccesses(t *testing.T) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: int(*jobObj.Spec.Parallelism),
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
})
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, int(*jobObj.Spec.Parallelism)); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
@ -150,7 +148,6 @@ func TestMetricsOnSuccesses(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestJobFinishedNumReasonMetric(t *testing.T) {
|
||||
wFinalizers := true
|
||||
// setup the job controller
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
@ -257,7 +254,6 @@ func TestJobFinishedNumReasonMetric(t *testing.T) {
|
||||
job_index := 0 // job index to avoid collisions between job names created by different test cases
|
||||
for name, tc := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, tc.enableJobPodFailurePolicy)()
|
||||
resetMetrics()
|
||||
// create a single job and wait for its completion
|
||||
@ -271,7 +267,7 @@ func TestJobFinishedNumReasonMetric(t *testing.T) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: int(*jobObj.Spec.Parallelism),
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
})
|
||||
|
||||
op := func(p *v1.Pod) bool {
|
||||
p.Status = tc.podStatus
|
||||
@ -375,8 +371,6 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
|
||||
},
|
||||
},
|
||||
}
|
||||
wFinalizers := true
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, true)()
|
||||
closeFn, restConfig, cs, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
@ -400,7 +394,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
|
||||
validateJobPodsStatus(ctx, t, cs, jobObj, podsByStatus{
|
||||
Active: count,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
})
|
||||
|
||||
jobPods, err := getJobPods(ctx, t, cs, jobObj)
|
||||
if err != nil {
|
||||
@ -624,70 +618,9 @@ func TestJobPodFailurePolicy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
for name, test := range testCases {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("%s; finalizers=%t", name, wFinalizers), func(t *testing.T) {
|
||||
resetMetrics()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
|
||||
if err != nil {
|
||||
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
|
||||
op := func(p *v1.Pod) bool {
|
||||
p.Status = test.podStatus
|
||||
return true
|
||||
}
|
||||
|
||||
if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
|
||||
t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
|
||||
}
|
||||
|
||||
if test.restartController {
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
}
|
||||
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: test.wantActive,
|
||||
Failed: test.wantFailed,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
|
||||
if test.wantJobConditionType == batchv1.JobComplete {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
|
||||
}
|
||||
}
|
||||
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
|
||||
if wFinalizers && test.wantPodFailuresHandledByPolicyRuleMetric != nil {
|
||||
validateCounterMetric(t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
|
||||
}
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestNonParallelJob tests that a Job that only executes one Pod. The test
|
||||
// recreates the Job controller at some points to make sure a new controller
|
||||
// is able to pickup.
|
||||
func TestNonParallelJob(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
t.Run(name, func(t *testing.T) {
|
||||
resetMetrics()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobPodFailurePolicy, test.enableJobPodFailurePolicy)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
@ -696,71 +629,115 @@ func TestNonParallelJob(t *testing.T) {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &test.job)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
|
||||
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
|
||||
t.Fatalf("Error %q while creating the job %q", err, jobObj.Name)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
})
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// Failed Pod is replaced.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
op := func(p *v1.Pod) bool {
|
||||
p.Status = test.podStatus
|
||||
return true
|
||||
}
|
||||
|
||||
if err, _ := updateJobPodsStatus(ctx, clientSet, jobObj, op, 1); err != nil {
|
||||
t.Fatalf("Error %q while updating pod status for Job: %q", err, jobObj.Name)
|
||||
}
|
||||
|
||||
if test.restartController {
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
}
|
||||
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Failed: 1,
|
||||
Active: test.wantActive,
|
||||
Failed: test.wantFailed,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
})
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// No more Pods are created after the Pod succeeds.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
if test.wantJobConditionType == batchv1.JobComplete {
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %q on Job Pod: %q", v1.PodSucceeded, err)
|
||||
}
|
||||
}
|
||||
validateJobCondition(ctx, t, clientSet, jobObj, test.wantJobConditionType)
|
||||
if test.wantPodFailuresHandledByPolicyRuleMetric != nil {
|
||||
validateCounterMetric(t, metrics.PodFailuresHandledByFailurePolicy, *test.wantPodFailuresHandledByPolicyRuleMetric)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, false)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestNonParallelJob tests that a Job that only executes one Pod. The test
|
||||
// recreates the Job controller at some points to make sure a new controller
|
||||
// is able to pickup.
|
||||
func TestNonParallelJob(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if !hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("apiserver created job without tracking annotation")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// Failed Pod is replaced.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Failed: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
|
||||
// Restarting controller.
|
||||
cancel()
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// No more Pods are created after the Pod succeeds.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
}
|
||||
|
||||
func TestParallelJob(t *testing.T) {
|
||||
cases := map[string]struct {
|
||||
trackWithFinalizers bool
|
||||
enableReadyPods bool
|
||||
}{
|
||||
"none": {},
|
||||
"with finalizers": {
|
||||
trackWithFinalizers: true,
|
||||
},
|
||||
"ready pods": {
|
||||
enableReadyPods: true,
|
||||
},
|
||||
"all": {
|
||||
trackWithFinalizers: true,
|
||||
enableReadyPods: true,
|
||||
},
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
@ -781,7 +758,7 @@ func TestParallelJob(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32Ptr(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
|
||||
// Tracks ready pods, if enabled.
|
||||
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 2); err != nil {
|
||||
@ -790,7 +767,7 @@ func TestParallelJob(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
*want.Ready = 2
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
|
||||
// Failed Pods are replaced.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
@ -803,7 +780,7 @@ func TestParallelJob(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
// Once one Pod succeeds, no more Pods are created, even if some fail.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
@ -816,7 +793,7 @@ func TestParallelJob(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
@ -828,7 +805,7 @@ func TestParallelJob(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
// No more Pods are created after remaining Pods succeed.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
@ -841,7 +818,7 @@ func TestParallelJob(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
if tc.trackWithFinalizers {
|
||||
validateTerminatedPodsTrackingFinalizerMetric(t, 7)
|
||||
@ -851,63 +828,57 @@ func TestParallelJob(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestParallelJobParallelism(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer cancel()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "parallel")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer cancel()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
BackoffLimit: pointer.Int32(2),
|
||||
Parallelism: pointer.Int32Ptr(5),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 5,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
|
||||
// Reduce parallelism by a number greater than backoffLimit.
|
||||
patch := []byte(`{"spec":{"parallelism":2}}`)
|
||||
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Updating Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 2,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
|
||||
// Increase parallelism again.
|
||||
patch = []byte(`{"spec":{"parallelism":4}}`)
|
||||
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Updating Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 4,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
|
||||
// Succeed Job
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Succeeded: 4,
|
||||
Ready: pointer.Int32(0),
|
||||
}, false)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
BackoffLimit: pointer.Int32(2),
|
||||
Parallelism: pointer.Int32Ptr(5),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 5,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
|
||||
// Reduce parallelism by a number greater than backoffLimit.
|
||||
patch := []byte(`{"spec":{"parallelism":2}}`)
|
||||
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Updating Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 2,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
|
||||
// Increase parallelism again.
|
||||
patch = []byte(`{"spec":{"parallelism":4}}`)
|
||||
jobObj, err = clientSet.BatchV1().Jobs(ns.Name).Patch(ctx, jobObj.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Updating Job: %v", err)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 4,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
|
||||
// Succeed Job
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 4); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||
}
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Succeeded: 4,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
}
|
||||
|
||||
func TestParallelJobWithCompletions(t *testing.T) {
|
||||
@ -916,24 +887,15 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10))
|
||||
t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10))
|
||||
cases := map[string]struct {
|
||||
trackWithFinalizers bool
|
||||
enableReadyPods bool
|
||||
enableReadyPods bool
|
||||
}{
|
||||
"none": {},
|
||||
"with finalizers": {
|
||||
trackWithFinalizers: true,
|
||||
},
|
||||
"ready pods": {
|
||||
enableReadyPods: true,
|
||||
},
|
||||
"all": {
|
||||
trackWithFinalizers: true,
|
||||
enableReadyPods: true,
|
||||
},
|
||||
}
|
||||
for name, tc := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackWithFinalizers)()
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobReadyPods, tc.enableReadyPods)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "completions")
|
||||
defer closeFn()
|
||||
@ -949,14 +911,14 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if got := hasJobTrackingAnnotation(jobObj); got != tc.trackWithFinalizers {
|
||||
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, tc.trackWithFinalizers)
|
||||
if !hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("apiserver created job without tracking annotation")
|
||||
}
|
||||
want := podsByStatus{Active: 54}
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32Ptr(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
|
||||
// Tracks ready pods, if enabled.
|
||||
if err, _ := setJobPodsReady(ctx, clientSet, jobObj, 52); err != nil {
|
||||
@ -965,7 +927,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(52)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
|
||||
// Failed Pods are replaced.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
@ -978,7 +940,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(50)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
// Pods are created until the number of succeeded Pods equals completions.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||
@ -991,7 +953,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, tc.trackWithFinalizers)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
// No more Pods are created after the Job completes.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
|
||||
@ -1004,84 +966,76 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
||||
if tc.enableReadyPods {
|
||||
want.Ready = pointer.Int32(0)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, want)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestIndexedJob(t *testing.T) {
|
||||
for _, wFinalizers := range []bool{false, true} {
|
||||
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer cancel()
|
||||
resetMetrics()
|
||||
|
||||
mode := batchv1.IndexedCompletion
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(3),
|
||||
Completions: pointer.Int32Ptr(4),
|
||||
CompletionMode: &mode,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
|
||||
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
|
||||
|
||||
// One Pod succeeds.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 1")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Succeeded: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// One Pod fails, which should be recreated.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 2")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, wFinalizers)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// Remaining Pods succeed.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatal("Failed trying to succeed remaining pods")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 0,
|
||||
Failed: 1,
|
||||
Succeeded: 4,
|
||||
Ready: pointer.Int32(0),
|
||||
}, false)
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
if wFinalizers {
|
||||
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
|
||||
}
|
||||
})
|
||||
mode := batchv1.IndexedCompletion
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(3),
|
||||
Completions: pointer.Int32Ptr(4),
|
||||
CompletionMode: &mode,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if !hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("apiserver created job without tracking annotation")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
|
||||
|
||||
// One Pod succeeds.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 1")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Succeeded: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// One Pod fails, which should be recreated.
|
||||
if err := setJobPhaseForIndex(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||
t.Fatal("Failed trying to succeed pod with index 2")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 3,
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
|
||||
|
||||
// Remaining Pods succeed.
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||
t.Fatal("Failed trying to succeed remaining pods")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 0,
|
||||
Failed: 1,
|
||||
Succeeded: 4,
|
||||
Ready: pointer.Int32(0),
|
||||
})
|
||||
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
|
||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
|
||||
}
|
||||
|
||||
// BenchmarkLargeIndexedJob benchmarks the completion of an Indexed Job.
|
||||
@ -1163,87 +1117,7 @@ func BenchmarkLargeIndexedJob(b *testing.B) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDisableJobTrackingWithFinalizers ensures that when the
|
||||
// JobTrackingWithFinalizers feature is disabled, tracking finalizers are
|
||||
// removed from all pods, but Job continues to be tracked.
|
||||
// This test can be removed once the feature graduates to GA.
|
||||
func TestDisableJobTrackingWithFinalizers(t *testing.T) {
|
||||
// Step 1: job created while feature is enabled.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
defer func() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||
Spec: batchv1.JobSpec{
|
||||
Parallelism: pointer.Int32Ptr(2),
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create Job: %v", err)
|
||||
}
|
||||
if !hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("apiserver didn't add the tracking annotation")
|
||||
}
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 2,
|
||||
Ready: pointer.Int32(0),
|
||||
}, true)
|
||||
|
||||
// Step 2: Disable tracking with finalizers.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
|
||||
cancel()
|
||||
|
||||
// Fail a pod while Job controller is stopped.
|
||||
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// Ensure Job continues to be tracked and finalizers are removed.
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 2,
|
||||
Failed: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, false)
|
||||
|
||||
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Obtaining updated Job object: %v", err)
|
||||
}
|
||||
if hasJobTrackingAnnotation(jobObj) {
|
||||
t.Error("controller didn't remove the tracking annotation")
|
||||
}
|
||||
|
||||
// Step 3: Reenable tracking with finalizers.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
cancel()
|
||||
|
||||
// Succeed a pod while Job controller is stopped.
|
||||
if err, _ := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
|
||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
|
||||
}
|
||||
|
||||
// Restart controller.
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
|
||||
// Ensure Job continues to be tracked and finalizers are removed.
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Failed: 1,
|
||||
Succeeded: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, false)
|
||||
}
|
||||
|
||||
func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
for _, policy := range []metav1.DeletionPropagation{metav1.DeletePropagationOrphan, metav1.DeletePropagationBackground, metav1.DeletePropagationForeground} {
|
||||
t.Run(string(policy), func(t *testing.T) {
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
@ -1276,7 +1150,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 2,
|
||||
Ready: pointer.Int32(0),
|
||||
}, true)
|
||||
})
|
||||
|
||||
// Delete Job. The GC should delete the pods in cascade.
|
||||
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(ctx, jobObj.Name, metav1.DeleteOptions{
|
||||
@ -1293,8 +1167,6 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
@ -1335,8 +1207,6 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
|
||||
// TestJobFailedWithInterrupts tests that a job were one pod fails and the rest
|
||||
// succeed is marked as Failed, even if the controller fails in the middle.
|
||||
func TestJobFailedWithInterrupts(t *testing.T) {
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
@ -1361,7 +1231,7 @@ func TestJobFailedWithInterrupts(t *testing.T) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 10,
|
||||
Ready: pointer.Int32(0),
|
||||
}, true)
|
||||
})
|
||||
t.Log("Finishing pods")
|
||||
if err, _ := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 1); err != nil {
|
||||
t.Fatalf("Could not fail a pod: %v", err)
|
||||
@ -1406,10 +1276,8 @@ func validateNoOrphanPodsWithFinalizers(ctx context.Context, t *testing.T, clien
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
|
||||
// Step 0: job created while feature is enabled.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
|
||||
|
||||
func TestOrphanPodsFinalizersClearedOnRestart(t *testing.T) {
|
||||
// Step 0: create job.
|
||||
closeFn, restConfig, clientSet, ns := setup(t, "simple")
|
||||
defer closeFn()
|
||||
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
|
||||
@ -1431,38 +1299,19 @@ func TestOrphanPodsFinalizersClearedWithFeatureDisabled(t *testing.T) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||
Active: 1,
|
||||
Ready: pointer.Int32(0),
|
||||
}, true)
|
||||
})
|
||||
|
||||
// Step 2: Disable tracking with finalizers.
|
||||
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
|
||||
// Step 2: Delete the Job while the controller is stopped.
|
||||
cancel()
|
||||
|
||||
// Delete the Job while controller is stopped.
|
||||
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(context.Background(), jobObj.Name, metav1.DeleteOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to delete job: %v", err)
|
||||
}
|
||||
|
||||
// Restart controller.
|
||||
// Step 3: Restart controller.
|
||||
ctx, cancel = startJobControllerAndWaitForCaches(restConfig)
|
||||
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to list Job Pods: %v", err)
|
||||
}
|
||||
sawPods := false
|
||||
for _, pod := range pods.Items {
|
||||
if metav1.IsControlledBy(&pod, jobObj) {
|
||||
if hasJobTrackingFinalizer(&pod) {
|
||||
return false, nil
|
||||
}
|
||||
sawPods = true
|
||||
}
|
||||
}
|
||||
return sawPods, nil
|
||||
}); err != nil {
|
||||
t.Errorf("Waiting for finalizers to be removed: %v", err)
|
||||
}
|
||||
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
|
||||
}
|
||||
|
||||
func TestSuspendJob(t *testing.T) {
|
||||
@ -1518,7 +1367,7 @@ func TestSuspendJob(t *testing.T) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
|
||||
Active: active,
|
||||
Ready: pointer.Int32(0),
|
||||
}, feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers))
|
||||
})
|
||||
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get Job after %s: %v", s, err)
|
||||
@ -1561,7 +1410,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
|
||||
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
|
||||
Active: 0,
|
||||
Ready: pointer.Int32(0),
|
||||
}, true)
|
||||
})
|
||||
}
|
||||
|
||||
func TestNodeSelectorUpdate(t *testing.T) {
|
||||
@ -1646,7 +1495,7 @@ type podsByStatus struct {
|
||||
Succeeded int
|
||||
}
|
||||
|
||||
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, wFinalizer bool) {
|
||||
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) {
|
||||
t.Helper()
|
||||
var actualCounts podsByStatus
|
||||
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
@ -1686,8 +1535,8 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
|
||||
}
|
||||
}
|
||||
for _, p := range active {
|
||||
if got := hasJobTrackingFinalizer(p); got != wFinalizer {
|
||||
t.Errorf("Pod %s has tracking finalizer %t, want %t", p.Name, got, wFinalizer)
|
||||
if !hasJobTrackingFinalizer(p) {
|
||||
t.Errorf("Active pod %s doesn't have tracking finalizer", p.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user