From a23e7a42d3a6fb8f85397c007a4ffcd9bb386a04 Mon Sep 17 00:00:00 2001 From: Yuki Iwai Date: Tue, 29 Oct 2024 13:12:35 +0900 Subject: [PATCH] Job: Refactor uncountedTerminatedPods to avoid casting everywhere Signed-off-by: Yuki Iwai --- pkg/controller/job/indexed_job_utils_test.go | 5 +- pkg/controller/job/job_controller.go | 66 +++++++++----------- pkg/controller/job/job_controller_test.go | 14 ++--- pkg/controller/job/tracking_utils.go | 11 ++-- pkg/controller/job/tracking_utils_test.go | 17 ++--- 5 files changed, 53 insertions(+), 60 deletions(-) diff --git a/pkg/controller/job/indexed_job_utils_test.go b/pkg/controller/job/indexed_job_utils_test.go index bece27e9bce..dc6574df560 100644 --- a/pkg/controller/job/indexed_job_utils_test.go +++ b/pkg/controller/job/indexed_job_utils_test.go @@ -25,6 +25,7 @@ import ( "github.com/google/go-cmp/cmp" batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/util/feature" featuregatetesting "k8s.io/component-base/featuregate/testing" @@ -405,7 +406,7 @@ func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) { cases := map[string]struct { job batch.Job pods []*v1.Pod - expectedRmFinalizers sets.Set[string] + expectedRmFinalizers sets.Set[types.UID] wantPodsWithDelayedDeletionPerIndex []string }{ "failed pods are kept corresponding to non-failed indexes are kept": { @@ -444,7 +445,7 @@ func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) { pods: []*v1.Pod{ buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod, }, - expectedRmFinalizers: sets.New("a"), + expectedRmFinalizers: sets.New[types.UID]("a"), wantPodsWithDelayedDeletionPerIndex: []string{}, }, "failed pod with index outside of completions; the pod's deletion is not delayed": { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 3650dc53570..04b1c9520c8 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -136,7 +136,7 @@ type syncJobCtx struct { succeededIndexes orderedIntervals failedIndexes *orderedIntervals newBackoffRecord backoffRecord - expectedRmFinalizers sets.Set[string] + expectedRmFinalizers sets.Set[types.UID] uncounted *uncountedTerminatedPods podsWithDelayedDeletionPerIndex map[int]*v1.Pod terminating *int32 @@ -370,7 +370,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) { if finalizerRemoved { key, err := controller.KeyFunc(job) if err == nil { - jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, key, curPod.UID) } } jm.enqueueSyncJobBatched(logger, job) @@ -386,7 +386,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) { if finalizerRemoved { key, err := controller.KeyFunc(job) if err == nil { - jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, key, curPod.UID) } } jm.enqueueSyncJobBatched(logger, job) @@ -460,7 +460,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool) // Consider the finalizer removed if this is the final delete. Otherwise, // it's an update for the deletion timestamp, then check finalizer. if final || !hasFinalizer { - jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, pod.UID) } jm.enqueueSyncJobBatched(logger, job) @@ -1167,11 +1167,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job // Sort to introduce completed Indexes in order. sort.Sort(byCompletionIndex(jobCtx.pods)) } - uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods)) + uidsWithFinalizer := make(sets.Set[types.UID], len(jobCtx.pods)) for _, p := range jobCtx.pods { - uid := string(p.UID) - if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) { - uidsWithFinalizer.Insert(uid) + if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(p.UID) { + uidsWithFinalizer.Insert(p.UID) } } @@ -1183,7 +1182,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job podFailureCountByPolicyAction := map[string]int{} reachedMaxUncountedPods := false for _, pod := range jobCtx.pods { - if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) { + if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(pod.UID) { // This pod was processed in a previous sync. continue } @@ -1192,7 +1191,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job continue } podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod) - if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) { + if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(pod.UID) { if isIndexed { // The completion index is enough to avoid recounting succeeded pods. // No need to track UIDs. @@ -1201,14 +1200,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job newSucceededIndexes = append(newSucceededIndexes, ix) needsFlush = true } - } else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) { + } else if !jobCtx.uncounted.succeeded.Has(pod.UID) { needsFlush = true uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID) } } else if considerPodFailed || (jobCtx.finishedCondition != nil && !isSuccessCriteriaMetCondition(jobCtx.finishedCondition)) { // When the job is considered finished, every non-terminated pod is considered failed. ix := getCompletionIndex(pod.Annotations) - if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) { + if !jobCtx.uncounted.failed.Has(pod.UID) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) { if jobCtx.job.Spec.PodFailurePolicy != nil { _, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod) if action != nil { @@ -1333,7 +1332,7 @@ func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, con // // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[types.UID], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) { logger := klog.FromContext(ctx) var err error if needsFlush { @@ -1367,7 +1366,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(ctx, jobKey, podsToRemoveFinalizer) for i, p := range podsToRemoveFinalizer { if rmSucceded[i] { - uidsWithFinalizer.Delete(string(p.UID)) + uidsWithFinalizer.Delete(p.UID) } } } @@ -1388,7 +1387,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job // .status.uncountedTerminatedPods for which the finalizer was successfully // removed and increments the corresponding status counters. // Returns whether there was any status change. -func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool { +func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[types.UID]) bool { updated := false uncountedStatus := status.UncountedTerminatedPods newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer) @@ -1414,9 +1413,9 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe logger := klog.FromContext(ctx) errCh := make(chan error, len(pods)) succeeded := make([]bool, len(pods)) - uids := make([]string, len(pods)) + uids := make([]types.UID, len(pods)) for i, p := range pods { - uids[i] = string(p.UID) + uids[i] = p.UID } if jobKey != "" { err := jm.finalizerExpectations.expectFinalizersRemoved(logger, jobKey, uids) @@ -1435,7 +1434,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe // In case of any failure, we don't expect a Pod update for the // finalizer removed. Clear expectation now. if jobKey != "" { - jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID)) + jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, pod.UID) } if !apierrors.IsNotFound(err) { errCh <- err @@ -1495,10 +1494,10 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC return true } -func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID { +func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[types.UID]) []types.UID { var newUncounted []types.UID for _, uid := range uncounted { - if include.Has(string(uid)) { + if include.Has(uid) { newUncounted = append(newUncounted, uid) } } @@ -1852,14 +1851,12 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte) // getValidPodsWithFilter returns the valid pods that pass the filter. // Pods are valid if they have a finalizer or in uncounted set // and, for Indexed Jobs, a valid completion index. -func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod { +func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[types.UID], filter func(*v1.Pod) bool) []*v1.Pod { var result []*v1.Pod for _, p := range jobCtx.pods { - uid := string(p.UID) - // Pods that don't have a completion finalizer are in the uncounted set or // have already been accounted for in the Job status. - if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) { + if !hasJobTrackingFinalizer(p) || uncounted.Has(p.UID) || jobCtx.expectedRmFinalizers.Has(p.UID) { continue } if isIndexedJob(jobCtx.job) { @@ -1906,32 +1903,25 @@ func removeTrackingFinalizerPatch(pod *v1.Pod) []byte { } type uncountedTerminatedPods struct { - succeeded sets.Set[string] - failed sets.Set[string] + succeeded sets.Set[types.UID] + failed sets.Set[types.UID] } func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods { - obj := uncountedTerminatedPods{ - succeeded: make(sets.Set[string], len(in.Succeeded)), - failed: make(sets.Set[string], len(in.Failed)), + return &uncountedTerminatedPods{ + succeeded: sets.New(in.Succeeded...), + failed: sets.New(in.Failed...), } - for _, v := range in.Succeeded { - obj.succeeded.Insert(string(v)) - } - for _, v := range in.Failed { - obj.failed.Insert(string(v)) - } - return &obj } -func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] { +func (u *uncountedTerminatedPods) Succeeded() sets.Set[types.UID] { if u == nil { return nil } return u.succeeded } -func (u *uncountedTerminatedPods) Failed() sets.Set[string] { +func (u *uncountedTerminatedPods) Failed() sets.Set[types.UID] { if u == nil { return nil } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index 7e7fba1f95d..6f853957ddd 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -1453,7 +1453,7 @@ func TestGetNewFinshedPods(t *testing.T) { cases := map[string]struct { job batch.Job pods []*v1.Pod - expectedRmFinalizers sets.Set[string] + expectedRmFinalizers sets.Set[types.UID] wantSucceeded int32 wantFailed int32 }{ @@ -1510,7 +1510,7 @@ func TestGetNewFinshedPods(t *testing.T) { }, }, }, - expectedRmFinalizers: sets.New("b", "f"), + expectedRmFinalizers: sets.New[types.UID]("b", "f"), pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, @@ -1575,7 +1575,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { job batch.Job pods []*v1.Pod finishedCond *batch.JobCondition - expectedRmFinalizers sets.Set[string] + expectedRmFinalizers sets.Set[types.UID] needsFlush bool statusUpdateErr error podControlErr error @@ -1686,7 +1686,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, }, - expectedRmFinalizers: sets.New("c", "d", "g", "h"), + expectedRmFinalizers: sets.New[types.UID]("c", "d", "g", "h"), pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, @@ -7655,11 +7655,11 @@ func TestFinalizersRemovedExpectations(t *testing.T) { pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) podInformer := sharedInformers.Core().V1().Pods().Informer() podIndexer := podInformer.GetIndexer() - uids := sets.New[string]() + uids := sets.New[types.UID]() for i := range pods { clientset.Tracker().Add(pods[i]) podIndexer.Add(pods[i]) - uids.Insert(string(pods[i].UID)) + uids.Insert(pods[i].UID) } jobKey := testutil.GetKey(job, t) @@ -7725,7 +7725,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { t.Errorf("Deleting pod that had finalizer: %v", err) } - uids = sets.New(string(pods[2].UID)) + uids = sets.New(pods[2].UID) var diff string if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) diff --git a/pkg/controller/job/tracking_utils.go b/pkg/controller/job/tracking_utils.go index cca3bc2bb19..7e5147eb76f 100644 --- a/pkg/controller/job/tracking_utils.go +++ b/pkg/controller/job/tracking_utils.go @@ -22,6 +22,7 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -40,7 +41,7 @@ var uidSetKeyFunc = func(obj interface{}) (string, error) { // uidTrackingExpectations to remember which UID it has seen/still waiting for. type uidSet struct { sync.RWMutex - set sets.Set[string] + set sets.Set[types.UID] key string } @@ -60,7 +61,7 @@ func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet { return nil } -func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[string] { +func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[types.UID] { uids := u.getSet(controllerKey) if uids == nil { return nil @@ -74,14 +75,14 @@ func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set // ExpectDeletions records expectations for the given deleteKeys, against the // given job-key. // This is thread-safe across different job keys. -func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []string) error { +func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []types.UID) error { logger.V(4).Info("Expecting tracking finalizers removed", "key", jobKey, "podUIDs", deletedKeys) uids := u.getSet(jobKey) if uids == nil { uids = &uidSet{ key: jobKey, - set: sets.New[string](), + set: sets.New[types.UID](), } if err := u.store.Add(uids); err != nil { return err @@ -94,7 +95,7 @@ func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jo } // FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job. -func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey, deleteKey string) { +func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey string, deleteKey types.UID) { uids := u.getSet(jobKey) if uids != nil { uids.Lock() diff --git a/pkg/controller/job/tracking_utils_test.go b/pkg/controller/job/tracking_utils_test.go index 9f380fe2b18..dd83e3d146d 100644 --- a/pkg/controller/job/tracking_utils_test.go +++ b/pkg/controller/job/tracking_utils_test.go @@ -25,6 +25,7 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-base/metrics/testutil" "k8s.io/klog/v2/ktesting" @@ -35,23 +36,23 @@ func TestUIDTrackingExpectations(t *testing.T) { logger, _ := ktesting.NewTestContext(t) tracks := []struct { job string - firstRound []string - secondRound []string + firstRound []types.UID + secondRound []types.UID }{ { job: "foo", - firstRound: []string{"a", "b", "c", "d"}, - secondRound: []string{"e", "f"}, + firstRound: []types.UID{"a", "b", "c", "d"}, + secondRound: []types.UID{"e", "f"}, }, { job: "bar", - firstRound: []string{"x", "y", "z"}, - secondRound: []string{"u", "v", "w"}, + firstRound: []types.UID{"x", "y", "z"}, + secondRound: []types.UID{"u", "v", "w"}, }, { job: "baz", - firstRound: []string{"w"}, - secondRound: []string{"a"}, + firstRound: []types.UID{"w"}, + secondRound: []types.UID{"a"}, }, } expectations := newUIDTrackingExpectations()