diff --git a/pkg/controller/job/indexed_job_utils.go b/pkg/controller/job/indexed_job_utils.go index e2a78e557b5..3a1744fe1a6 100644 --- a/pkg/controller/job/indexed_job_utils.go +++ b/pkg/controller/job/indexed_job_utils.go @@ -53,7 +53,7 @@ type orderedIntervals []interval // the indexes that succeeded since the last sync. func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) { prevIntervals := succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions)) - newSucceeded := sets.NewInt() + newSucceeded := sets.New[int]() for _, p := range pods { ix := getCompletionIndex(p.Annotations) // Succeeded Pod with valid index and, if tracking with finalizers, @@ -63,7 +63,7 @@ func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals } } // List returns the items of the set in order. - result := prevIntervals.withOrderedIndexes(newSucceeded.List()) + result := prevIntervals.withOrderedIndexes(sets.List(newSucceeded)) return prevIntervals, result } @@ -194,7 +194,7 @@ func firstPendingIndexes(activePods []*v1.Pod, succeededIndexes orderedIntervals if count == 0 { return nil } - active := sets.NewInt() + active := sets.New[int]() for _, p := range activePods { ix := getCompletionIndex(p.Annotations) if ix != unknownCompletionIndex { @@ -202,7 +202,7 @@ func firstPendingIndexes(activePods []*v1.Pod, succeededIndexes orderedIntervals } } result := make([]int, 0, count) - nonPending := succeededIndexes.withOrderedIndexes(active.List()) + nonPending := succeededIndexes.withOrderedIndexes(sets.List(active)) // The following algorithm is bounded by len(nonPending) and count. candidate := 0 for _, sInterval := range nonPending { diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 505ecac1197..2c2bafcc256 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -777,7 +777,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) { if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil { finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now()) - } else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil { + } else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil { // Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed. finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now()) } @@ -961,7 +961,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey // // It does this up to a limited number of Pods so that the size of .status // doesn't grow too much and this sync doesn't starve other Jobs. -func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error { +func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error { isIndexed := isIndexedJob(job) var podsToRemoveFinalizer []*v1.Pod uncountedStatus := job.Status.UncountedTerminatedPods @@ -970,7 +970,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job // Sort to introduce completed Indexes in order. sort.Sort(byCompletionIndex(pods)) } - uidsWithFinalizer := make(sets.String, len(pods)) + uidsWithFinalizer := make(sets.Set[string], len(pods)) for _, p := range pods { uid := string(p.UID) if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) { @@ -1103,7 +1103,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job // // Returns whether there are pending changes in the Job status that need to be // flushed in subsequent calls. -func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) { +func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) { var err error if needsFlush { if job, err = jm.updateStatusHandler(ctx, job); err != nil { @@ -1157,7 +1157,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job // .status.uncountedTerminatedPods for which the finalizer was successfully // removed and increments the corresponding status counters. // Returns whether there was any status change. -func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.String) bool { +func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool { updated := false uncountedStatus := status.UncountedTerminatedPods newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer) @@ -1254,7 +1254,7 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC return true } -func filterInUncountedUIDs(uncounted []types.UID, include sets.String) []types.UID { +func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID { var newUncounted []types.UID for _, uid := range uncounted { if include.Has(string(uid)) { @@ -1320,7 +1320,7 @@ func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatu } // getFailJobMessage returns a job failure message if the job should fail with the current counters -func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *string { +func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string { if !feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || job.Spec.PodFailurePolicy == nil { return nil } @@ -1337,7 +1337,7 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s // getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted // in the job status. The list of failed pods can be affected by the podFailurePolicy. -func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeededPods, failedPods []*v1.Pod) { +func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.Set[string]) (succeededPods, failedPods []*v1.Pod) { succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool { return p.Status.Phase == v1.PodSucceeded }) @@ -1580,7 +1580,7 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur // getValidPodsWithFilter returns the valid pods that pass the filter. // Pods are valid if they have a finalizer or in uncounted set // and, for Indexed Jobs, a valid completion index. -func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) []*v1.Pod { +func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Set[string], expectedRmFinalizers sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod { var result []*v1.Pod for _, p := range pods { uid := string(p.UID) @@ -1634,14 +1634,14 @@ func removeTrackingFinalizerPatch(pod *v1.Pod) []byte { } type uncountedTerminatedPods struct { - succeeded sets.String - failed sets.String + succeeded sets.Set[string] + failed sets.Set[string] } func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods { obj := uncountedTerminatedPods{ - succeeded: make(sets.String, len(in.Succeeded)), - failed: make(sets.String, len(in.Failed)), + succeeded: make(sets.Set[string], len(in.Succeeded)), + failed: make(sets.Set[string], len(in.Failed)), } for _, v := range in.Succeeded { obj.succeeded.Insert(string(v)) @@ -1652,14 +1652,14 @@ func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerm return &obj } -func (u *uncountedTerminatedPods) Succeeded() sets.String { +func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] { if u == nil { return nil } return u.succeeded } -func (u *uncountedTerminatedPods) Failed() sets.String { +func (u *uncountedTerminatedPods) Failed() sets.Set[string] { if u == nil { return nil } diff --git a/pkg/controller/job/job_controller_test.go b/pkg/controller/job/job_controller_test.go index b1f62ab5022..ec9115c3a4d 100644 --- a/pkg/controller/job/job_controller_test.go +++ b/pkg/controller/job/job_controller_test.go @@ -263,7 +263,7 @@ func TestControllerSyncJob(t *testing.T) { expectedCondition *batch.JobConditionType expectedConditionStatus v1.ConditionStatus expectedConditionReason string - expectedCreatedIndexes sets.Int + expectedCreatedIndexes sets.Set[int] expectedPodPatches int // features @@ -574,7 +574,7 @@ func TestControllerSyncJob(t *testing.T) { completionMode: batch.IndexedCompletion, expectedCreations: 2, expectedActive: 2, - expectedCreatedIndexes: sets.NewInt(0, 1), + expectedCreatedIndexes: sets.New(0, 1), }, "indexed job completed": { parallelism: 2, @@ -608,7 +608,7 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 1, expectedSucceeded: 2, expectedCompletedIdxs: "0,1", - expectedCreatedIndexes: sets.NewInt(2), + expectedCreatedIndexes: sets.New(2), expectedPodPatches: 3, }, "indexed job some running and completed pods": { @@ -630,7 +630,7 @@ func TestControllerSyncJob(t *testing.T) { expectedActive: 8, expectedSucceeded: 6, expectedCompletedIdxs: "2,4,5,7-9", - expectedCreatedIndexes: sets.NewInt(1, 6, 10, 11, 12, 13), + expectedCreatedIndexes: sets.New(1, 6, 10, 11, 12, 13), expectedPodPatches: 6, }, "indexed job some failed pods": { @@ -646,7 +646,7 @@ func TestControllerSyncJob(t *testing.T) { expectedCreations: 2, expectedActive: 3, expectedFailed: 2, - expectedCreatedIndexes: sets.NewInt(0, 2), + expectedCreatedIndexes: sets.New(0, 2), expectedPodPatches: 2, }, "indexed job some pods without index": { @@ -954,9 +954,9 @@ func TestControllerSyncJob(t *testing.T) { } } -func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int, jobName string) { +func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string) { t.Helper() - gotIndexes := sets.NewInt() + gotIndexes := sets.New[int]() for _, p := range control.Templates { checkJobCompletionEnvVariable(t, &p.Spec) ix := getCompletionIndex(p.Annotations) @@ -974,7 +974,7 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI t.Errorf("Got pod generate name %s, want %s", p.GenerateName, expectedName) } } - if diff := cmp.Diff(wantIndexes.List(), gotIndexes.List()); diff != "" { + if diff := cmp.Diff(sets.List(wantIndexes), sets.List(gotIndexes)); diff != "" { t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff) } } @@ -983,7 +983,7 @@ func TestGetNewFinshedPods(t *testing.T) { cases := map[string]struct { job batch.Job pods []*v1.Pod - expectedRmFinalizers sets.String + expectedRmFinalizers sets.Set[string] wantSucceeded int32 wantFailed int32 }{ @@ -1040,7 +1040,7 @@ func TestGetNewFinshedPods(t *testing.T) { }, }, }, - expectedRmFinalizers: sets.NewString("b", "f"), + expectedRmFinalizers: sets.New("b", "f"), pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).Pod, buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod, @@ -1098,7 +1098,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { job batch.Job pods []*v1.Pod finishedCond *batch.JobCondition - expectedRmFinalizers sets.String + expectedRmFinalizers sets.Set[string] needsFlush bool statusUpdateErr error podControlErr error @@ -1203,7 +1203,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) { }, }, }, - expectedRmFinalizers: sets.NewString("c", "d", "g", "h"), + expectedRmFinalizers: sets.New("c", "d", "g", "h"), pods: []*v1.Pod{ buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod, @@ -4358,7 +4358,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...) podInformer := sharedInformers.Core().V1().Pods().Informer() podIndexer := podInformer.GetIndexer() - uids := sets.NewString() + uids := sets.New[string]() for i := range pods { clientset.Tracker().Add(pods[i]) podIndexer.Add(pods[i]) @@ -4369,7 +4369,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { manager.syncJob(context.TODO(), jobKey) gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey) if len(gotExpectedUIDs) != 0 { - t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List()) + t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", sets.List(gotExpectedUIDs)) } // Remove failures and re-sync. @@ -4426,7 +4426,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) { t.Errorf("Deleting pod that had finalizer: %v", err) } - uids = sets.NewString(string(pods[2].UID)) + uids = sets.New(string(pods[2].UID)) var diff string if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) { gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey) diff --git a/pkg/controller/job/tracking_utils.go b/pkg/controller/job/tracking_utils.go index b6be2df7b6a..87d4425115a 100644 --- a/pkg/controller/job/tracking_utils.go +++ b/pkg/controller/job/tracking_utils.go @@ -40,7 +40,7 @@ var uidSetKeyFunc = func(obj interface{}) (string, error) { // uidTrackingExpectations to remember which UID it has seen/still waiting for. type uidSet struct { sync.RWMutex - set sets.String + set sets.Set[string] key string } @@ -60,7 +60,7 @@ func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet { return nil } -func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.String { +func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[string] { uids := u.getSet(controllerKey) if uids == nil { return nil @@ -81,7 +81,7 @@ func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deleted if uids == nil { uids = &uidSet{ key: jobKey, - set: sets.NewString(), + set: sets.New[string](), } if err := u.store.Add(uids); err != nil { return err diff --git a/pkg/controller/job/tracking_utils_test.go b/pkg/controller/job/tracking_utils_test.go index f82f2663a0e..a64a0682a3a 100644 --- a/pkg/controller/job/tracking_utils_test.go +++ b/pkg/controller/job/tracking_utils_test.go @@ -25,6 +25,7 @@ import ( batch "k8s.io/api/batch/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/component-base/metrics/testutil" "k8s.io/kubernetes/pkg/controller/job/metrics" ) @@ -76,7 +77,7 @@ func TestUIDTrackingExpectations(t *testing.T) { uids := expectations.getSet(track.job) if uids == nil { t.Errorf("Set of UIDs is empty for job %s", track.job) - } else if diff := cmp.Diff(track.firstRound, uids.set.List()); diff != "" { + } else if diff := cmp.Diff(track.firstRound, sets.List(uids.set)); diff != "" { t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff) } } @@ -110,7 +111,7 @@ func TestUIDTrackingExpectations(t *testing.T) { uids := expectations.getSet(track.job) if uids == nil { t.Errorf("Set of UIDs is empty for job %s", track.job) - } else if diff := cmp.Diff(track.secondRound, uids.set.List()); diff != "" { + } else if diff := cmp.Diff(track.secondRound, sets.List(uids.set)); diff != "" { t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff) } }