Job: Use generic Set in controller

Signed-off-by: Yuki Iwai <yuki.iwai.tz@gmail.com>
This commit is contained in:
Yuki Iwai 2023-05-08 15:02:23 +09:00
parent 50ce134595
commit e4340f0d9b
5 changed files with 40 additions and 39 deletions

View File

@ -53,7 +53,7 @@ type orderedIntervals []interval
// the indexes that succeeded since the last sync.
func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
prevIntervals := succeededIndexesFromString(job.Status.CompletedIndexes, int(*job.Spec.Completions))
newSucceeded := sets.NewInt()
newSucceeded := sets.New[int]()
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
// Succeeded Pod with valid index and, if tracking with finalizers,
@ -63,7 +63,7 @@ func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals
}
}
// List returns the items of the set in order.
result := prevIntervals.withOrderedIndexes(newSucceeded.List())
result := prevIntervals.withOrderedIndexes(sets.List(newSucceeded))
return prevIntervals, result
}
@ -194,7 +194,7 @@ func firstPendingIndexes(activePods []*v1.Pod, succeededIndexes orderedIntervals
if count == 0 {
return nil
}
active := sets.NewInt()
active := sets.New[int]()
for _, p := range activePods {
ix := getCompletionIndex(p.Annotations)
if ix != unknownCompletionIndex {
@ -202,7 +202,7 @@ func firstPendingIndexes(activePods []*v1.Pod, succeededIndexes orderedIntervals
}
}
result := make([]int, 0, count)
nonPending := succeededIndexes.withOrderedIndexes(active.List())
nonPending := succeededIndexes.withOrderedIndexes(sets.List(active))
// The following algorithm is bounded by len(nonPending) and count.
candidate := 0
for _, sInterval := range nonPending {

View File

@ -777,7 +777,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
if feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) {
if failureTargetCondition := findConditionByType(job.Status.Conditions, batch.JobFailureTarget); failureTargetCondition != nil {
finishedCondition = newFailedConditionForFailureTarget(failureTargetCondition, jm.clock.Now())
} else if failJobMessage := getFailJobMessage(&job, pods, uncounted.Failed()); failJobMessage != nil {
} else if failJobMessage := getFailJobMessage(&job, pods); failJobMessage != nil {
// Prepare the interim FailureTarget condition to record the failure message before the finalizers (allowing removal of the pods) are removed.
finishedCondition = newCondition(batch.JobFailureTarget, v1.ConditionTrue, jobConditionReasonPodFailurePolicy, *failJobMessage, jm.clock.Now())
}
@ -961,7 +961,7 @@ func (jm *Controller) deleteJobPods(ctx context.Context, job *batch.Job, jobKey
//
// It does this up to a limited number of Pods so that the size of .status
// doesn't grow too much and this sync doesn't starve other Jobs.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.String, finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error {
func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, expectedRmFinalizers sets.Set[string], finishedCond *batch.JobCondition, needsFlush bool, newBackoffRecord backoffRecord) error {
isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod
uncountedStatus := job.Status.UncountedTerminatedPods
@ -970,7 +970,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// Sort to introduce completed Indexes in order.
sort.Sort(byCompletionIndex(pods))
}
uidsWithFinalizer := make(sets.String, len(pods))
uidsWithFinalizer := make(sets.Set[string], len(pods))
for _, p := range pods {
uid := string(p.UID)
if hasJobTrackingFinalizer(p) && !expectedRmFinalizers.Has(uid) {
@ -1103,7 +1103,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
//
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.String, oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) {
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job *batch.Job, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool, newBackoffRecord backoffRecord) (*batch.Job, bool, error) {
var err error
if needsFlush {
if job, err = jm.updateStatusHandler(ctx, job); err != nil {
@ -1157,7 +1157,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
// .status.uncountedTerminatedPods for which the finalizer was successfully
// removed and increments the corresponding status counters.
// Returns whether there was any status change.
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.String) bool {
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool {
updated := false
uncountedStatus := status.UncountedTerminatedPods
newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer)
@ -1254,7 +1254,7 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC
return true
}
func filterInUncountedUIDs(uncounted []types.UID, include sets.String) []types.UID {
func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID {
var newUncounted []types.UID
for _, uid := range uncounted {
if include.Has(string(uid)) {
@ -1320,7 +1320,7 @@ func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatu
}
// getFailJobMessage returns a job failure message if the job should fail with the current counters
func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *string {
func getFailJobMessage(job *batch.Job, pods []*v1.Pod) *string {
if !feature.DefaultFeatureGate.Enabled(features.JobPodFailurePolicy) || job.Spec.PodFailurePolicy == nil {
return nil
}
@ -1337,7 +1337,7 @@ func getFailJobMessage(job *batch.Job, pods []*v1.Pod, uncounted sets.String) *s
// getNewFinishedPods returns the list of newly succeeded and failed pods that are not accounted
// in the job status. The list of failed pods can be affected by the podFailurePolicy.
func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.String) (succeededPods, failedPods []*v1.Pod) {
func getNewFinishedPods(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods, expectedRmFinalizers sets.Set[string]) (succeededPods, failedPods []*v1.Pod) {
succeededPods = getValidPodsWithFilter(job, pods, uncounted.Succeeded(), expectedRmFinalizers, func(p *v1.Pod) bool {
return p.Status.Phase == v1.PodSucceeded
})
@ -1580,7 +1580,7 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur
// getValidPodsWithFilter returns the valid pods that pass the filter.
// Pods are valid if they have a finalizer or in uncounted set
// and, for Indexed Jobs, a valid completion index.
func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, expectedRmFinalizers sets.String, filter func(*v1.Pod) bool) []*v1.Pod {
func getValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.Set[string], expectedRmFinalizers sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
var result []*v1.Pod
for _, p := range pods {
uid := string(p.UID)
@ -1634,14 +1634,14 @@ func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
}
type uncountedTerminatedPods struct {
succeeded sets.String
failed sets.String
succeeded sets.Set[string]
failed sets.Set[string]
}
func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods {
obj := uncountedTerminatedPods{
succeeded: make(sets.String, len(in.Succeeded)),
failed: make(sets.String, len(in.Failed)),
succeeded: make(sets.Set[string], len(in.Succeeded)),
failed: make(sets.Set[string], len(in.Failed)),
}
for _, v := range in.Succeeded {
obj.succeeded.Insert(string(v))
@ -1652,14 +1652,14 @@ func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerm
return &obj
}
func (u *uncountedTerminatedPods) Succeeded() sets.String {
func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] {
if u == nil {
return nil
}
return u.succeeded
}
func (u *uncountedTerminatedPods) Failed() sets.String {
func (u *uncountedTerminatedPods) Failed() sets.Set[string] {
if u == nil {
return nil
}

View File

@ -263,7 +263,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCondition *batch.JobConditionType
expectedConditionStatus v1.ConditionStatus
expectedConditionReason string
expectedCreatedIndexes sets.Int
expectedCreatedIndexes sets.Set[int]
expectedPodPatches int
// features
@ -574,7 +574,7 @@ func TestControllerSyncJob(t *testing.T) {
completionMode: batch.IndexedCompletion,
expectedCreations: 2,
expectedActive: 2,
expectedCreatedIndexes: sets.NewInt(0, 1),
expectedCreatedIndexes: sets.New(0, 1),
},
"indexed job completed": {
parallelism: 2,
@ -608,7 +608,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 1,
expectedSucceeded: 2,
expectedCompletedIdxs: "0,1",
expectedCreatedIndexes: sets.NewInt(2),
expectedCreatedIndexes: sets.New(2),
expectedPodPatches: 3,
},
"indexed job some running and completed pods": {
@ -630,7 +630,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedActive: 8,
expectedSucceeded: 6,
expectedCompletedIdxs: "2,4,5,7-9",
expectedCreatedIndexes: sets.NewInt(1, 6, 10, 11, 12, 13),
expectedCreatedIndexes: sets.New(1, 6, 10, 11, 12, 13),
expectedPodPatches: 6,
},
"indexed job some failed pods": {
@ -646,7 +646,7 @@ func TestControllerSyncJob(t *testing.T) {
expectedCreations: 2,
expectedActive: 3,
expectedFailed: 2,
expectedCreatedIndexes: sets.NewInt(0, 2),
expectedCreatedIndexes: sets.New(0, 2),
expectedPodPatches: 2,
},
"indexed job some pods without index": {
@ -954,9 +954,9 @@ func TestControllerSyncJob(t *testing.T) {
}
}
func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Int, jobName string) {
func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantIndexes sets.Set[int], jobName string) {
t.Helper()
gotIndexes := sets.NewInt()
gotIndexes := sets.New[int]()
for _, p := range control.Templates {
checkJobCompletionEnvVariable(t, &p.Spec)
ix := getCompletionIndex(p.Annotations)
@ -974,7 +974,7 @@ func checkIndexedJobPods(t *testing.T, control *controller.FakePodControl, wantI
t.Errorf("Got pod generate name %s, want %s", p.GenerateName, expectedName)
}
}
if diff := cmp.Diff(wantIndexes.List(), gotIndexes.List()); diff != "" {
if diff := cmp.Diff(sets.List(wantIndexes), sets.List(gotIndexes)); diff != "" {
t.Errorf("Unexpected created completion indexes (-want,+got):\n%s", diff)
}
}
@ -983,7 +983,7 @@ func TestGetNewFinshedPods(t *testing.T) {
cases := map[string]struct {
job batch.Job
pods []*v1.Pod
expectedRmFinalizers sets.String
expectedRmFinalizers sets.Set[string]
wantSucceeded int32
wantFailed int32
}{
@ -1040,7 +1040,7 @@ func TestGetNewFinshedPods(t *testing.T) {
},
},
},
expectedRmFinalizers: sets.NewString("b", "f"),
expectedRmFinalizers: sets.New("b", "f"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
@ -1098,7 +1098,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
job batch.Job
pods []*v1.Pod
finishedCond *batch.JobCondition
expectedRmFinalizers sets.String
expectedRmFinalizers sets.Set[string]
needsFlush bool
statusUpdateErr error
podControlErr error
@ -1203,7 +1203,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
},
},
expectedRmFinalizers: sets.NewString("c", "d", "g", "h"),
expectedRmFinalizers: sets.New("c", "d", "g", "h"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
@ -4358,7 +4358,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
podInformer := sharedInformers.Core().V1().Pods().Informer()
podIndexer := podInformer.GetIndexer()
uids := sets.NewString()
uids := sets.New[string]()
for i := range pods {
clientset.Tracker().Add(pods[i])
podIndexer.Add(pods[i])
@ -4369,7 +4369,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
manager.syncJob(context.TODO(), jobKey)
gotExpectedUIDs := manager.finalizerExpectations.getExpectedUIDs(jobKey)
if len(gotExpectedUIDs) != 0 {
t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", gotExpectedUIDs.List())
t.Errorf("Got unwanted expectations for removed finalizers after first syncJob with client failures:\n%s", sets.List(gotExpectedUIDs))
}
// Remove failures and re-sync.
@ -4426,7 +4426,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
t.Errorf("Deleting pod that had finalizer: %v", err)
}
uids = sets.NewString(string(pods[2].UID))
uids = sets.New(string(pods[2].UID))
var diff string
if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)

View File

@ -40,7 +40,7 @@ var uidSetKeyFunc = func(obj interface{}) (string, error) {
// uidTrackingExpectations to remember which UID it has seen/still waiting for.
type uidSet struct {
sync.RWMutex
set sets.String
set sets.Set[string]
key string
}
@ -60,7 +60,7 @@ func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet {
return nil
}
func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.String {
func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[string] {
uids := u.getSet(controllerKey)
if uids == nil {
return nil
@ -81,7 +81,7 @@ func (u *uidTrackingExpectations) expectFinalizersRemoved(jobKey string, deleted
if uids == nil {
uids = &uidSet{
key: jobKey,
set: sets.NewString(),
set: sets.New[string](),
}
if err := u.store.Add(uids); err != nil {
return err

View File

@ -25,6 +25,7 @@ import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/controller/job/metrics"
)
@ -76,7 +77,7 @@ func TestUIDTrackingExpectations(t *testing.T) {
uids := expectations.getSet(track.job)
if uids == nil {
t.Errorf("Set of UIDs is empty for job %s", track.job)
} else if diff := cmp.Diff(track.firstRound, uids.set.List()); diff != "" {
} else if diff := cmp.Diff(track.firstRound, sets.List(uids.set)); diff != "" {
t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff)
}
}
@ -110,7 +111,7 @@ func TestUIDTrackingExpectations(t *testing.T) {
uids := expectations.getSet(track.job)
if uids == nil {
t.Errorf("Set of UIDs is empty for job %s", track.job)
} else if diff := cmp.Diff(track.secondRound, uids.set.List()); diff != "" {
} else if diff := cmp.Diff(track.secondRound, sets.List(uids.set)); diff != "" {
t.Errorf("Unexpected keys for job %s (-want,+got):\n%s", track.job, diff)
}
}