From 01f27cd93e77f7f40a917af3aab7f380def78381 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Thu, 23 Sep 2021 12:56:19 -0400 Subject: [PATCH 1/5] Fix log line for target number of running pods --- pkg/controller/job/job_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 31d4727d897..06a7ca0d1a1 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -1231,7 +1231,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded } if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) - klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", parallelism) + klog.V(4).InfoS("Too many pods running for job", "job", klog.KObj(job), "deleted", len(podsToDelete), "target", wantActive) removed, err := jm.deleteJobPods(job, jobKey, podsToDelete) active -= removed // While it is possible for a Job to require both pod creations and From 47a957d163213edb4716a56ca27a756900a4eea6 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 22 Sep 2021 11:15:50 -0400 Subject: [PATCH 2/5] Revert "Revert "Limit number of Pods counted in a single Job sync"" This reverts commit 8bcb780808865f93e4a8fc20be5fe2de28baa854. --- pkg/controller/job/job_controller.go | 24 +++++++++++------------ pkg/controller/job/job_controller_test.go | 17 +++------------- test/integration/job/job_test.go | 20 +++++++++++-------- 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 06a7ca0d1a1..8cfad1781b1 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -883,8 +883,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error // or the job was removed. // 3. Increment job counters for pods that no longer have a finalizer. // 4. Add Complete condition if satisfied with current counters. -// It does this in a controlled way such that the size of .status doesn't grow -// too much. +// It does this up to a limited number of Pods so that the size of .status +// doesn't grow too much and this sync doesn't starve other Jobs. func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod @@ -937,17 +937,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* } } if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { - if len(newSucceededIndexes) > 0 { - succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes) - job.Status.Succeeded = int32(succeededIndexes.total()) - job.Status.CompletedIndexes = succeededIndexes.String() - } - var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { - return err - } - podsToRemoveFinalizer = nil - newSucceededIndexes = nil + // The controller added enough Pods already to .status.uncountedTerminatedPods + // We stop counting pods and removing finalizers here to: + // 1. Ensure that the UIDs representation are under 20 KB. + // 2. Cap the number of finalizer removals so that syncing of big Jobs + // doesn't starve smaller ones. + // + // The job will be synced again because the Job status and Pod updates + // will put the Job back to the work queue. + break } } if len(newSucceededIndexes) > 0 { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index de9da1548b3..fb113e9b2bf 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1462,7 +1462,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod) return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 499, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ @@ -1479,17 +1479,11 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, { UncountedTerminatedPods: &batch.UncountedTerminatedPods{ - Succeeded: []types.UID{"499"}, - Failed: []types.UID{"b"}, + Failed: []types.UID{"b"}, }, Succeeded: 499, Failed: 1, }, - { - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 500, - Failed: 2, - }, }, }, "too many indexed finished": { @@ -1506,18 +1500,13 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } return pods }(), - wantRmFinalizers: 501, + wantRmFinalizers: 500, wantStatusUpdates: []batch.JobStatus{ { UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, CompletedIndexes: "0-499", Succeeded: 500, }, - { - CompletedIndexes: "0-500", - UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, - Succeeded: 501, - }, }, }, } diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 4cddc6c7669..5b2e4866b3b 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -230,8 +230,8 @@ func TestParallelJobWithCompletions(t *testing.T) { jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(4), - Completions: pointer.Int32Ptr(6), + Parallelism: pointer.Int32Ptr(504), + Completions: pointer.Int32Ptr(506), }, }) if err != nil { @@ -241,23 +241,23 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 504, }, wFinalizers) // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 4, + Active: 504, Failed: 2, }, wFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 3, + Succeeded: 503, Active: 3, }, wFinalizers) // No more Pods are created after the Job completes. @@ -267,7 +267,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 6, + Succeeded: 506, }, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -860,7 +860,11 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig() _, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig) - config := restclient.Config{Host: server.URL} + config := restclient.Config{ + Host: server.URL, + QPS: 200.0, + Burst: 200, + } clientSet, err := clientset.NewForConfig(&config) if err != nil { t.Fatalf("Error creating clientset: %v", err) From a438f16741fbf064281e4c57e04e17ff4d41bc02 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 22 Sep 2021 11:15:53 -0400 Subject: [PATCH 3/5] Revert "Revert "Add metric job_pod_finished"" This reverts commit 7868fbbe64330bf62f8e463b19e1fe18ec184c91. --- pkg/controller/job/indexed_job_utils.go | 7 +++++ pkg/controller/job/job_controller.go | 18 ++++++++++-- pkg/controller/job/job_controller_test.go | 34 +++++++++++++++++++---- pkg/controller/job/metrics/metrics.go | 24 +++++++++++++++- 4 files changed, 74 insertions(+), 9 deletions(-) diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index 7fa9bdc22b4..a6d953fa1c4 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -338,3 +338,10 @@ func (bci byCompletionIndex) Swap(i, j int) { func (bci byCompletionIndex) Len() int { return len(bci) } + +func completionModeStr(job *batch.Job) string { + if job.Spec.CompletionMode != nil { + return string(*job.Spec.CompletionMode) + } + return string(batch.NonIndexedCompletion) +} diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 8cfad1781b1..0aa340e20ef 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -900,6 +900,8 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uidsWithFinalizer.Insert(string(p.UID)) } } + // Shallow copy, as it will only be used to detect changes in the counters. + oldCounters := job.Status if cleanUncountedPodsWithoutFinalizers(&job.Status, uidsWithFinalizer) { needsFlush = true } @@ -954,7 +956,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* job.Status.CompletedIndexes = succeededIndexes.String() } var err error - if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil { + if job, needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, &oldCounters, needsFlush); err != nil { return err } if jm.enactJobFinished(job, finishedCond) { @@ -964,6 +966,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* if _, err := jm.updateStatusHandler(job); err != nil { return fmt.Errorf("removing uncounted pods from status: %w", err) } + recordJobPodFinished(job, oldCounters) } return nil } @@ -977,12 +980,15 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* // 4. (if not all removals succeeded) flush Job status again. // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, needsFlush bool) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, needsFlush bool) (*batch.Job, bool, error) { var err error if needsFlush { if job, err = jm.updateStatusHandler(job); err != nil { return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } + recordJobPodFinished(job, *oldCounters) + // Shallow copy. + *oldCounters = job.Status needsFlush = false } var rmErr error @@ -1536,3 +1542,11 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio } return list, false } + +func recordJobPodFinished(job *batch.Job, oldCounters batch.JobStatus) { + completionMode := completionModeStr(job) + diff := job.Status.Succeeded - oldCounters.Succeeded + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded).Add(float64(diff)) + diff = job.Status.Failed - oldCounters.Failed + metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed).Add(float64(diff)) +} diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index fb113e9b2bf..1ee664aab81 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -47,8 +47,10 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" featuregatetesting "k8s.io/component-base/featuregate/testing" + metricstestutil "k8s.io/component-base/metrics/testutil" _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/job/metrics" "k8s.io/kubernetes/pkg/controller/testutil" "k8s.io/kubernetes/pkg/features" "k8s.io/utils/pointer" @@ -1515,19 +1517,20 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { clientSet := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) manager, _ := newControllerFromClient(clientSet, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControlErr} + metrics.JobPodsFinished.Reset() manager.podControl = &fakePodControl var statusUpdates []batch.JobStatus manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { statusUpdates = append(statusUpdates, *job.Status.DeepCopy()) return job, tc.statusUpdateErr } - - if tc.job.Status.UncountedTerminatedPods == nil { - tc.job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} + job := tc.job.DeepCopy() + if job.Status.UncountedTerminatedPods == nil { + job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } - uncounted := newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) - succeededIndexes := succeededIndexesFromJob(&tc.job) - err := manager.trackJobStatusAndRemoveFinalizers(&tc.job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) + uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) + succeededIndexes := succeededIndexesFromJob(job) + err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) } @@ -1538,6 +1541,25 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { if rmFinalizers != tc.wantRmFinalizers { t.Errorf("Removed %d finalizers, want %d", rmFinalizers, tc.wantRmFinalizers) } + if tc.wantErr == nil { + completionMode := completionModeStr(job) + v, err := metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Succeeded)) + if err != nil { + t.Fatalf("Obtaining succeeded job_pods_finished_total: %v", err) + } + newSucceeded := job.Status.Succeeded - tc.job.Status.Succeeded + if float64(newSucceeded) != v { + t.Errorf("Metric reports %.0f succeeded pods, want %d", v, newSucceeded) + } + v, err = metricstestutil.GetCounterMetricValue(metrics.JobPodsFinished.WithLabelValues(completionMode, metrics.Failed)) + if err != nil { + t.Fatalf("Obtaining failed job_pods_finished_total: %v", err) + } + newFailed := job.Status.Failed - tc.job.Status.Failed + if float64(newFailed) != v { + t.Errorf("Metric reports %.0f failed pods, want %d", v, newFailed) + } + } }) } } diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index b2517a09f42..b9271426661 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -68,10 +68,26 @@ var ( }, []string{"completion_mode", "result"}, ) + + // JobPodsFinished records the number of finished Pods that the job controller + // finished tracking. + // It only applies to Jobs that were created while the feature gate + // JobTrackingWithFinalizers was enabled. + // Possible label values: + // completion_mode: Indexed, NonIndexed + // result: failed, succeeded + JobPodsFinished = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "job_pods_finished_total", + Help: "The number of finished Pods that are fully tracked", + }, + []string{"completion_mode", "result"}) ) -// Possible values for the "action" label in the above metrics. const ( + // Possible values for the "action" label in the above metrics. + // JobSyncActionReconciling when the Job's pod creation/deletion expectations // are unsatisfied and the controller is waiting for issued Pod // creation/deletions to complete. @@ -88,6 +104,11 @@ const ( // if a Job is suspended or if the number of active Pods is more than // parallelism. JobSyncActionPodsDeleted = "pods_deleted" + + // Possible values for "result" label in the above metrics. + + Succeeded = "succeeded" + Failed = "failed" ) var registerMetrics sync.Once @@ -98,5 +119,6 @@ func Register() { legacyregistry.MustRegister(JobSyncDurationSeconds) legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobFinishedNum) + legacyregistry.MustRegister(JobPodsFinished) }) } From 95c2a8024ccba5e2e08636c4359b640887e37ed0 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 22 Sep 2021 13:39:43 -0400 Subject: [PATCH 4/5] Parallelize pod updates in job test To potentially reduce the number of job controller syncs. Also reduce the maximum number of pods to sync in tests. --- pkg/controller/job/job_controller.go | 29 +++++++------ test/integration/job/job_test.go | 63 +++++++++++++++++++++------- 2 files changed, 63 insertions(+), 29 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 0aa340e20ef..11fbe6d94a9 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -55,22 +55,21 @@ import ( "k8s.io/utils/integer" ) -const ( - // maxUncountedPods is the maximum size the slices in - // .status.uncountedTerminatedPods should have to keep their representation - // roughly below 20 KB. - maxUncountedPods = 500 - maxPodCreateDeletePerSync = 500 -) - // controllerKind contains the schema.GroupVersionKind for this controller type. var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var ( - // DefaultJobBackOff is the default backoff period, exported for the e2e test + // DefaultJobBackOff is the default backoff period. Exported for tests. DefaultJobBackOff = 10 * time.Second - // MaxJobBackOff is the max backoff period, exported for the e2e test + // MaxJobBackOff is the max backoff period. Exported for tests. MaxJobBackOff = 360 * time.Second + // MaxUncountedPods is the maximum size the slices in + // .status.uncountedTerminatedPods should have to keep their representation + // roughly below 20 KB. Exported for tests + MaxUncountedPods = 500 + // MaxPodCreateDeletePerSync is the maximum number of pods that can be + // created or deleted in a single sync call. Exported for tests. + MaxPodCreateDeletePerSync = 500 ) // Controller ensures that all Job objects have corresponding pods to @@ -938,7 +937,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID) } } - if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods { + if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= MaxUncountedPods { // The controller added enough Pods already to .status.uncountedTerminatedPods // We stop counting pods and removing finalizers here to: // 1. Ensure that the UIDs representation are under 20 KB. @@ -1230,8 +1229,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded rmAtLeast = 0 } podsToDelete := activePodsForRemoval(job, activePods, int(rmAtLeast)) - if len(podsToDelete) > maxPodCreateDeletePerSync { - podsToDelete = podsToDelete[:maxPodCreateDeletePerSync] + if len(podsToDelete) > MaxPodCreateDeletePerSync { + podsToDelete = podsToDelete[:MaxPodCreateDeletePerSync] } if len(podsToDelete) > 0 { jm.expectations.ExpectDeletions(jobKey, len(podsToDelete)) @@ -1247,8 +1246,8 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded if active < wantActive { diff := wantActive - active - if diff > int32(maxPodCreateDeletePerSync) { - diff = int32(maxPodCreateDeletePerSync) + if diff > int32(MaxPodCreateDeletePerSync) { + diff = int32(MaxPodCreateDeletePerSync) } jm.expectations.ExpectCreations(jobKey, int(diff)) diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 5b2e4866b3b..0a599fb4043 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -21,6 +21,7 @@ import ( "errors" "fmt" "strconv" + "sync" "testing" "time" @@ -220,6 +221,10 @@ func TestParallelJobParallelism(t *testing.T) { } func TestParallelJobWithCompletions(t *testing.T) { + // Lower limits for a job sync so that we can test partial updates with a low + // number of pods. + t.Cleanup(setDuringTest(&jobcontroller.MaxUncountedPods, 10)) + t.Cleanup(setDuringTest(&jobcontroller.MaxPodCreateDeletePerSync, 10)) for _, wFinalizers := range []bool{false, true} { t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) { defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)() @@ -230,8 +235,8 @@ func TestParallelJobWithCompletions(t *testing.T) { jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ - Parallelism: pointer.Int32Ptr(504), - Completions: pointer.Int32Ptr(506), + Parallelism: pointer.Int32Ptr(54), + Completions: pointer.Int32Ptr(56), }, }) if err != nil { @@ -241,23 +246,23 @@ func TestParallelJobWithCompletions(t *testing.T) { t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 504, + Active: 54, }, wFinalizers) // Failed Pods are replaced. if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ - Active: 504, + Active: 54, Failed: 2, }, wFinalizers) // Pods are created until the number of succeeded Pods equals completions. - if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil { + if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 53); err != nil { t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) } validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 503, + Succeeded: 53, Active: 3, }, wFinalizers) // No more Pods are created after the Job completes. @@ -267,7 +272,7 @@ func TestParallelJobWithCompletions(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ Failed: 2, - Succeeded: 506, + Succeeded: 56, }, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) }) @@ -781,22 +786,44 @@ func setJobPodsPhase(ctx context.Context, clientSet clientset.Interface, jobObj if err != nil { return fmt.Errorf("listing Job Pods: %w", err) } + updates := make([]v1.Pod, 0, cnt) for _, pod := range pods.Items { - if cnt == 0 { + if len(updates) == cnt { break } if p := pod.Status.Phase; isPodOwnedByJob(&pod, jobObj) && p != v1.PodFailed && p != v1.PodSucceeded { pod.Status.Phase = phase - _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("updating Pod status: %w", err) - } - cnt-- + updates = append(updates, pod) } } - if cnt != 0 { + if len(updates) != cnt { return fmt.Errorf("couldn't set phase on %d Job Pods", cnt) } + return updatePodStatuses(ctx, clientSet, updates) +} + +func updatePodStatuses(ctx context.Context, clientSet clientset.Interface, updates []v1.Pod) error { + wg := sync.WaitGroup{} + wg.Add(len(updates)) + errCh := make(chan error, len(updates)) + + for _, pod := range updates { + pod := pod + go func() { + _, err := clientSet.CoreV1().Pods(pod.Namespace).UpdateStatus(ctx, &pod, metav1.UpdateOptions{}) + if err != nil { + errCh <- err + } + wg.Done() + }() + } + wg.Wait() + + select { + case err := <-errCh: + return fmt.Errorf("updating Pod status: %w", err) + default: + } return nil } @@ -903,3 +930,11 @@ func hasJobTrackingAnnotation(job *batchv1.Job) bool { _, ok := job.Annotations[batchv1.JobTrackingFinalizer] return ok } + +func setDuringTest(val *int, newVal int) func() { + origVal := *val + *val = newVal + return func() { + *val = origVal + } +} From 5929ccd391e2cdcda9b2e141f1cd67c08243b206 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Tue, 28 Sep 2021 16:33:51 -0400 Subject: [PATCH 5/5] Track expected removals of Pod finalizers Add the UIDs of Pods for which we are removing finalizers to an in-memory cache. The controller removes UIDs from the cache as Pod updates or deletes come in. This avoids double counting finished Pods when Pod updates arrive after Job status updates. https://github.com/kubernetes/kubernetes/issues/105200 --- pkg/controller/job/job_controller.go | 113 +++++++++--- pkg/controller/job/job_controller_test.go | 201 ++++++++++++++++++++-- pkg/controller/job/tracking_utils.go | 117 +++++++++++++ pkg/controller/job/tracking_utils_test.go | 111 ++++++++++++ 4 files changed, 497 insertions(+), 45 deletions(-) create mode 100644 pkg/controller/job/tracking_utils.go create mode 100644 pkg/controller/job/tracking_utils_test.go diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 11fbe6d94a9..9a54f45b1e9 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -93,6 +93,10 @@ type Controller struct { // A TTLCache of pod creates/deletes each rc expects to see expectations controller.ControllerExpectationsInterface + // finalizerExpectations tracks the Pod UIDs for which the controller + // expects to observe the tracking finalizer removed. + finalizerExpectations *uidTrackingExpectations + // A store of jobs jobLister batchv1listers.JobLister @@ -125,10 +129,11 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor KubeClient: kubeClient, Recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), }, - expectations: controller.NewControllerExpectations(), - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), - orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"), - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), + expectations: controller.NewControllerExpectations(), + finalizerExpectations: newUIDTrackingExpectations(), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), + orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"), + recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), } jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -146,7 +151,9 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, - DeleteFunc: jm.deletePod, + DeleteFunc: func(obj interface{}) { + jm.deletePod(obj, true) + }, }) jm.podStore = podInformer.Lister() jm.podStoreSynced = podInformer.Informer().HasSynced @@ -227,7 +234,7 @@ func (jm *Controller) addPod(obj interface{}) { if pod.DeletionTimestamp != nil { // on a restart of the controller, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. - jm.deletePod(pod) + jm.deletePod(pod, false) return } @@ -271,19 +278,31 @@ func (jm *Controller) updatePod(old, cur interface{}) { // and after such time has passed, the kubelet actually deletes it from the store. We receive an update // for modification of the deletion timestamp and expect an job to create more pods asap, not wait // until the kubelet actually deletes the pod. - jm.deletePod(curPod) + jm.deletePod(curPod, false) return } // the only time we want the backoff to kick-in, is when the pod failed immediate := curPod.Status.Phase != v1.PodFailed + // Don't check if oldPod has the finalizer, as during ownership transfer + // finalizers might be re-added and removed again in behalf of the new owner. + // If all those Pod updates collapse into a single event, the finalizer + // might be removed in oldPod and curPod. We want to record the latest + // state. + finalizerRemoved := !hasJobTrackingFinalizer(curPod) curControllerRef := metav1.GetControllerOf(curPod) oldControllerRef := metav1.GetControllerOf(oldPod) controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef) if controllerRefChanged && oldControllerRef != nil { // The ControllerRef was changed. Sync the old controller, if any. if job := jm.resolveControllerRef(oldPod.Namespace, oldControllerRef); job != nil { + if finalizerRemoved { + key, err := controller.KeyFunc(job) + if err == nil { + jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) + } + } jm.enqueueController(job, immediate) } } @@ -294,6 +313,12 @@ func (jm *Controller) updatePod(old, cur interface{}) { if job == nil { return } + if finalizerRemoved { + key, err := controller.KeyFunc(job) + if err == nil { + jm.finalizerExpectations.finalizerRemovalObserved(key, string(curPod.UID)) + } + } jm.enqueueController(job, immediate) return } @@ -310,7 +335,7 @@ func (jm *Controller) updatePod(old, cur interface{}) { // When a pod is deleted, enqueue the job that manages the pod and update its expectations. // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item. -func (jm *Controller) deletePod(obj interface{}) { +func (jm *Controller) deletePod(obj interface{}, final bool) { pod, ok := obj.(*v1.Pod) // When a delete is dropped, the relist will notice a pod in the store not @@ -347,6 +372,13 @@ func (jm *Controller) deletePod(obj interface{}) { return } jm.expectations.DeletionObserved(jobKey) + + // Consider the finalizer removed if this is the final delete. Otherwise, + // it's an update for the deletion timestamp, then check finalizer. + if final || !hasJobTrackingFinalizer(pod) { + jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) + } + jm.enqueueController(job, true) } @@ -571,6 +603,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { if apierrors.IsNotFound(err) { klog.V(4).Infof("Job has been deleted: %v", key) jm.expectations.DeleteExpectations(key) + jm.finalizerExpectations.deleteExpectations(key) return true, nil } return false, err @@ -609,6 +642,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc() }() + var expectedRmFinalizers sets.String var uncounted *uncountedTerminatedPods if trackingUncountedPods(&job) { klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job)) @@ -616,6 +650,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{} } uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) + expectedRmFinalizers = jm.finalizerExpectations.getExpectedUIDs(key) } else if patch := removeTrackingAnnotationPatch(&job); patch != nil { if err := jm.patchJobHandler(&job, patch); err != nil { return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err) @@ -634,7 +669,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) - succeeded, failed := getStatus(&job, pods, uncounted) + succeeded, failed := getStatus(&job, pods, uncounted, expectedRmFinalizers) // Job first start. Set StartTime and start the ActiveDeadlineSeconds timer // only if the job is not in the suspended state. if job.Status.StartTime == nil && !jobSuspended(&job) { @@ -754,7 +789,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) { if uncounted != nil { needsStatusUpdate := suspendCondChanged || active != job.Status.Active job.Status.Active = active - err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate) + err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, expectedRmFinalizers, finishedCondition, needsStatusUpdate) if err != nil { return false, fmt.Errorf("tracking status: %w", err) } @@ -872,7 +907,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error if len(podsWithFinalizer) == 0 { return nil } - _, err := jm.removeTrackingFinalizerFromPods(podsWithFinalizer) + // Tracking with finalizers is disabled, no need to set expectations. + _, err := jm.removeTrackingFinalizerFromPods("", podsWithFinalizer) return err } @@ -884,7 +920,7 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error // 4. Add Complete condition if satisfied with current counters. // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. -func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error { +func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod uncountedStatus := job.Status.UncountedTerminatedPods @@ -895,8 +931,9 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* } uidsWithFinalizer := make(sets.String, len(pods)) for _, p := range pods { - if hasJobTrackingFinalizer(p) { - uidsWithFinalizer.Insert(string(p.UID)) + uid := string(p.UID) + if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) { + uidsWithFinalizer.Insert(uid) } } // Shallow copy, as it will only be used to detect changes in the counters. @@ -905,7 +942,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []* needsFlush = true } for _, pod := range pods { - if !hasJobTrackingFinalizer(pod) { + if !hasJobTrackingFinalizer(pod) || expectedRmFinalizers.Has(string(pod.UID)) { continue } podFinished := pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed @@ -986,14 +1023,18 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRe return job, needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err) } recordJobPodFinished(job, *oldCounters) - // Shallow copy. + // Shallow copy, as it will only be used to detect changes in the counters. *oldCounters = job.Status needsFlush = false } + jobKey, err := controller.KeyFunc(job) + if err != nil { + return job, needsFlush, fmt.Errorf("getting job key: %w", err) + } var rmErr error if len(podsToRemoveFinalizer) > 0 { var rmSucceded []bool - rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer) + rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(jobKey, podsToRemoveFinalizer) for i, p := range podsToRemoveFinalizer { if rmSucceded[i] { uidsWithFinalizer.Delete(string(p.UID)) @@ -1039,9 +1080,19 @@ func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinali // returns an array of booleans where the i-th value is true if the finalizer // of the i-th Pod was successfully removed (if the pod was deleted when this // function was called, it's considered as the finalizer was removed successfully). -func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, error) { +func (jm *Controller) removeTrackingFinalizerFromPods(jobKey string, pods []*v1.Pod) ([]bool, error) { errCh := make(chan error, len(pods)) succeeded := make([]bool, len(pods)) + uids := make([]string, len(pods)) + for i, p := range pods { + uids[i] = string(p.UID) + } + if jobKey != "" { + err := jm.finalizerExpectations.expectFinalizersRemoved(jobKey, uids) + if err != nil { + return succeeded, fmt.Errorf("setting expected removed finalizers: %w", err) + } + } wg := sync.WaitGroup{} wg.Add(len(pods)) for i := range pods { @@ -1049,10 +1100,17 @@ func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]bool, e pod := pods[i] defer wg.Done() if patch := removeTrackingFinalizerPatch(pod); patch != nil { - if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) { - errCh <- err - utilruntime.HandleError(err) - return + if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil { + // In case of any failure, we don't expect a Pod update for the + // finalizer removed. Clear expectation now. + if jobKey != "" { + jm.finalizerExpectations.finalizerRemovalObserved(jobKey, string(pod.UID)) + } + if !apierrors.IsNotFound(err) { + errCh <- err + utilruntime.HandleError(err) + return + } } succeeded[i] = true } @@ -1155,15 +1213,15 @@ func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatu } // getStatus returns number of succeeded and failed pods running a job -func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods) (succeeded, failed int32) { +func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeeded, failed int32) { if uncounted != nil { succeeded = job.Status.Succeeded failed = job.Status.Failed } - succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), func(p *v1.Pod) bool { + succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { return p.Status.Phase == v1.PodSucceeded })) - failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), func(p *v1.Pod) bool { + failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), expectedRmFinalizers, func(p *v1.Pod) bool { if p.Status.Phase == v1.PodFailed { return true } @@ -1395,12 +1453,13 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur // countValidPodsWithFilter returns number of valid pods that pass the filter. // Pods are valid if they have a finalizer and, for Indexed Jobs, a valid // completion index. -func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, filter func(*v1.Pod) bool) int { +func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) int { result := len(uncounted) for _, p := range pods { + uid := string(p.UID) // Pods that don't have a completion finalizer are in the uncounted set or // have already been accounted for in the Job status. - if uncounted != nil && (!hasJobTrackingFinalizer(p) || uncounted.Has(string(p.UID))) { + if uncounted != nil && (!hasJobTrackingFinalizer(p) || uncounted.Has(uid) || expectedRmFinalizers.Has(uid)) { continue } if isIndexedJob(job) { diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 1ee664aab81..3631c1dae3b 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -116,6 +116,7 @@ func newPod(name string, job *batch.Job) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: name, + UID: types.UID(name), Labels: job.Spec.Selector.MatchLabels, Namespace: job.Namespace, OwnerReferences: []metav1.OwnerReference{*metav1.NewControllerRef(job, controllerKind)}, @@ -1003,10 +1004,11 @@ func TestSyncJobLegacyTracking(t *testing.T) { func TestGetStatus(t *testing.T) { cases := map[string]struct { - job batch.Job - pods []*v1.Pod - wantSucceeded int32 - wantFailed int32 + job batch.Job + pods []*v1.Pod + expectedRmFinalizers sets.String + wantSucceeded int32 + wantFailed int32 }{ "without finalizers": { job: batch.Job{ @@ -1068,6 +1070,30 @@ func TestGetStatus(t *testing.T) { wantSucceeded: 4, wantFailed: 4, }, + "with expected removed finalizers": { + job: batch.Job{ + Status: batch.JobStatus{ + Succeeded: 2, + Failed: 2, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Succeeded: []types.UID{"a"}, + Failed: []types.UID{"d"}, + }, + }, + }, + expectedRmFinalizers: sets.NewString("b", "f"), + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodSucceeded).Pod, + buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("d").phase(v1.PodFailed).Pod, + buildPod().uid("e").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("g").phase(v1.PodFailed).trackingFinalizer().Pod, + }, + wantSucceeded: 4, + wantFailed: 5, + }, "deleted pods": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).deletionTimestamp().Pod, @@ -1104,7 +1130,7 @@ func TestGetStatus(t *testing.T) { if tc.job.Status.UncountedTerminatedPods != nil { uncounted = newUncountedTerminatedPods(*tc.job.Status.UncountedTerminatedPods) } - succeeded, failed := getStatus(&tc.job, tc.pods, uncounted) + succeeded, failed := getStatus(&tc.job, tc.pods, uncounted, tc.expectedRmFinalizers) if succeeded != tc.wantSucceeded { t.Errorf("getStatus reports %d succeeded pods, want %d", succeeded, tc.wantSucceeded) } @@ -1121,15 +1147,16 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { indexedCompletion := batch.IndexedCompletion mockErr := errors.New("mock error") cases := map[string]struct { - job batch.Job - pods []*v1.Pod - finishedCond *batch.JobCondition - needsFlush bool - statusUpdateErr error - podControlErr error - wantErr error - wantRmFinalizers int - wantStatusUpdates []batch.JobStatus + job batch.Job + pods []*v1.Pod + finishedCond *batch.JobCondition + expectedRmFinalizers sets.String + needsFlush bool + statusUpdateErr error + podControlErr error + wantErr error + wantRmFinalizers int + wantStatusUpdates []batch.JobStatus }{ "no updates": {}, "new active": { @@ -1211,6 +1238,45 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, }, + "expecting removed finalizers": { + job: batch.Job{ + Status: batch.JobStatus{ + Succeeded: 2, + Failed: 3, + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Succeeded: []types.UID{"a", "g"}, + Failed: []types.UID{"b", "h"}, + }, + }, + }, + expectedRmFinalizers: sets.NewString("c", "d", "g", "h"), + pods: []*v1.Pod{ + buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("c").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("d").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("e").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("f").phase(v1.PodFailed).trackingFinalizer().Pod, + buildPod().uid("g").phase(v1.PodSucceeded).trackingFinalizer().Pod, + buildPod().uid("h").phase(v1.PodFailed).trackingFinalizer().Pod, + }, + wantRmFinalizers: 4, + wantStatusUpdates: []batch.JobStatus{ + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{ + Succeeded: []types.UID{"a", "e"}, + Failed: []types.UID{"b", "f"}, + }, + Succeeded: 3, + Failed: 4, + }, + { + UncountedTerminatedPods: &batch.UncountedTerminatedPods{}, + Succeeded: 5, + Failed: 6, + }, + }, + }, "succeeding job": { pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, @@ -1530,7 +1596,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { } uncounted := newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods) succeededIndexes := succeededIndexesFromJob(job) - err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.finishedCond, tc.needsFlush) + err := manager.trackJobStatusAndRemoveFinalizers(job, tc.pods, succeededIndexes, *uncounted, tc.expectedRmFinalizers, tc.finishedCond, tc.needsFlush) if !errors.Is(err, tc.wantErr) { t.Errorf("Got error %v, want %v", err, tc.wantErr) } @@ -2285,7 +2351,7 @@ func TestDeletePod(t *testing.T) { informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2) - jm.deletePod(pod1) + jm.deletePod(pod1, true) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -2298,7 +2364,7 @@ func TestDeletePod(t *testing.T) { t.Errorf("queue.Get() = %v, want %v", got, want) } - jm.deletePod(pod2) + jm.deletePod(pod2, true) if got, want := jm.queue.Len(), 1; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -2333,7 +2399,7 @@ func TestDeletePodOrphan(t *testing.T) { pod1.OwnerReferences = nil informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1) - jm.deletePod(pod1) + jm.deletePod(pod1, true) if got, want := jm.queue.Len(), 0; got != want { t.Fatalf("queue.Len() = %v, want %v", got, want) } @@ -2977,6 +3043,105 @@ func TestEnsureJobConditions(t *testing.T) { } } +func TestFinalizersRemovedExpectations(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)() + clientset := fake.NewSimpleClientset() + sharedInformers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) + manager := NewController(sharedInformers.Core().V1().Pods(), sharedInformers.Batch().V1().Jobs(), clientset) + manager.podStoreSynced = alwaysReady + manager.jobStoreSynced = alwaysReady + manager.podControl = &controller.FakePodControl{Err: errors.New("fake pod controller error")} + manager.updateStatusHandler = func(job *batch.Job) (*batch.Job, error) { + return job, nil + } + + job := newJob(2, 2, 6, batch.NonIndexedCompletion) + job.Annotations = map[string]string{ + batch.JobTrackingFinalizer: "", + } + sharedInformers.Batch().V1().Jobs().Informer().GetIndexer().Add(job) + pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) + podInformer := sharedInformers.Core().V1().Pods().Informer() + podIndexer := podInformer.GetIndexer() + uids := sets.NewString() + for i := range pods { + clientset.Tracker().Add(&pods[i]) + podIndexer.Add(&pods[i]) + uids.Insert(string(pods[i].UID)) + } + jobKey := testutil.GetKey(job, t) + + manager.syncJob(jobKey) + gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey) + if len(gotExpectedUIDs) != 0 { + t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List()) + } + + // Remove failures and re-sync. + manager.podControl.(*controller.FakePodControl).Err = nil + manager.syncJob(jobKey) + gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) + if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { + t.Errorf("Different expectations for removed finalizers after syncJob (-want,+got):\n%s", diff) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + go sharedInformers.Core().V1().Pods().Informer().Run(stopCh) + cache.WaitForCacheSync(stopCh, podInformer.HasSynced) + + // Make sure the first syncJob sets the expectations, even after the caches synced. + gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) + if diff := cmp.Diff(uids, gotExpectedUIDs); diff != "" { + t.Errorf("Different expectations for removed finalizers after syncJob and cacheSync (-want,+got):\n%s", diff) + } + + // Change pods in different ways. + + podsResource := schema.GroupVersionResource{Version: "v1", Resource: "pods"} + + update := pods[0].DeepCopy() + update.Finalizers = nil + update.ResourceVersion = "1" + err := clientset.Tracker().Update(podsResource, update, update.Namespace) + if err != nil { + t.Errorf("Removing finalizer: %v", err) + } + + update = pods[1].DeepCopy() + update.Finalizers = nil + update.DeletionTimestamp = &metav1.Time{Time: time.Now()} + update.ResourceVersion = "1" + err = clientset.Tracker().Update(podsResource, update, update.Namespace) + if err != nil { + t.Errorf("Removing finalizer and setting deletion timestamp: %v", err) + } + + // Preserve the finalizer. + update = pods[2].DeepCopy() + update.DeletionTimestamp = &metav1.Time{Time: time.Now()} + update.ResourceVersion = "1" + err = clientset.Tracker().Update(podsResource, update, update.Namespace) + if err != nil { + t.Errorf("Setting deletion timestamp: %v", err) + } + + err = clientset.Tracker().Delete(podsResource, pods[3].Namespace, pods[3].Name) + if err != nil { + t.Errorf("Deleting pod that had finalizer: %v", err) + } + + uids = sets.NewString(string(pods[2].UID)) + var diff string + if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { + gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) + diff = cmp.Diff(uids, gotExpectedUIDs) + return diff == "", nil + }); err != nil { + t.Errorf("Timeout waiting for expectations (-want, +got):\n%s", diff) + } +} + func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) { t.Helper() want := []v1.EnvVar{ diff --git a/pkg/controller/job/tracking_utils.go b/pkg/controller/job/tracking_utils.go new file mode 100644 index 00000000000..55996a2a2b8 --- /dev/null +++ b/pkg/controller/job/tracking_utils.go @@ -0,0 +1,117 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// uidSetKeyFunc to parse out the key from a uidSet. +var uidSetKeyFunc = func(obj interface{}) (string, error) { + if u, ok := obj.(*uidSet); ok { + return u.key, nil + } + return "", fmt.Errorf("could not find key for obj %#v", obj) +} + +// uidSet holds a key and a set of UIDs. Used by the +// uidTrackingExpectations to remember which UID it has seen/still waiting for. +type uidSet struct { + sync.RWMutex + set sets.String + key string +} + +// uidTrackingExpectations tracks the UIDs of Pods the controller is waiting to +// observe tracking finalizer deletions. +type uidTrackingExpectations struct { + store cache.Store +} + +// GetUIDs is a convenience method to avoid exposing the set of expected uids. +// The returned set is not thread safe, all modifications must be made holding +// the uidStoreLock. +func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet { + if obj, exists, err := u.store.GetByKey(controllerKey); err == nil && exists { + return obj.(*uidSet) + } + return nil +} + +func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.String { + uids := u.getSet(controllerKey) + if uids == nil { + return nil + } + uids.RLock() + set := sets.NewString(uids.set.UnsortedList()...) + uids.RUnlock() + return set +} + +// ExpectDeletions records expectations for the given deleteKeys, against the +// given job-key. +// This is thread-safe across different job keys. +func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deletedKeys []string) error { + klog.V(4).InfoS("Expecting tracking finalizers removed", "job", jobKey, "podUIDs", deletedKeys) + + uids := u.getSet(jobKey) + if uids == nil { + uids = &uidSet{ + key: jobKey, + set: sets.NewString(), + } + if err := u.store.Add(uids); err != nil { + return err + } + } + uids.Lock() + uids.set.Insert(deletedKeys...) + uids.Unlock() + return nil +} + +// FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job. +func (u *uidTrackingExpectations) finalizerRemovalObserved(jobKey, deleteKey string) { + uids := u.getSet(jobKey) + if uids != nil { + uids.Lock() + if uids.set.Has(deleteKey) { + klog.V(4).InfoS("Observed tracking finalizer removed", "job", jobKey, "podUID", deleteKey) + uids.set.Delete(deleteKey) + } + uids.Unlock() + } +} + +// DeleteExpectations deletes the UID set. +func (u *uidTrackingExpectations) deleteExpectations(jobKey string) { + if err := u.store.Delete(jobKey); err != nil { + klog.ErrorS(err, "deleting tracking annotation UID expectations", "job", jobKey) + } +} + +// NewUIDTrackingControllerExpectations returns a wrapper around +// ControllerExpectations that is aware of deleteKeys. +func newUIDTrackingExpectations() *uidTrackingExpectations { + return &uidTrackingExpectations{store: cache.NewStore(uidSetKeyFunc)} +} diff --git a/pkg/controller/job/tracking_utils_test.go b/pkg/controller/job/tracking_utils_test.go new file mode 100644 index 00000000000..1fc595271d0 --- /dev/null +++ b/pkg/controller/job/tracking_utils_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package job + +import ( + "sync" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestUIDTrackingExpectations(t *testing.T) { + tracks := []struct { + job string + firstRound []string + secondRound []string + }{ + { + job: "foo", + firstRound: []string{"a", "b", "c", "d"}, + secondRound: []string{"e", "f"}, + }, + { + job: "bar", + firstRound: []string{"x", "y", "z"}, + secondRound: []string{"u", "v", "w"}, + }, + { + job: "baz", + firstRound: []string{"w"}, + secondRound: []string{"a"}, + }, + } + expectations := newUIDTrackingExpectations() + + // Insert first round of keys in parallel. + + var wg sync.WaitGroup + wg.Add(len(tracks)) + errs := make([]error, len(tracks)) + for i := range tracks { + track := tracks[i] + go func(errID int) { + errs[errID] = expectations.expectFinalizersRemoved(track.job, track.firstRound) + wg.Done() + }(i) + } + wg.Wait() + for i, err := range errs { + if err != nil { + t.Errorf("Failed adding first round of UIDs for job %s: %v", tracks[i].job, err) + } + } + + for _, track := range tracks { + uids := expectations.getSet(track.job) + if uids == nil { + t.Errorf("Set of UIDs is empty for job %s", track.job) + } else if diff := cmp.Diff(track.firstRound, uids.set.List()); diff != "" { + t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff) + } + } + + // Delete the first round of keys and add the second round in parallel. + + for i, track := range tracks { + wg.Add(len(track.firstRound) + 1) + track := track + for _, uid := range track.firstRound { + uid := uid + go func() { + expectations.finalizerRemovalObserved(track.job, uid) + wg.Done() + }() + } + go func(errID int) { + errs[errID] = expectations.expectFinalizersRemoved(track.job, track.secondRound) + wg.Done() + }(i) + } + wg.Wait() + + for i, err := range errs { + if err != nil { + t.Errorf("Failed adding second round of UIDs for job %s: %v", tracks[i].job, err) + } + } + + for _, track := range tracks { + uids := expectations.getSet(track.job) + if uids == nil { + t.Errorf("Set of UIDs is empty for job %s", track.job) + } else if diff := cmp.Diff(track.secondRound, uids.set.List()); diff != "" { + t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff) + } + } +}