Merge pull request #128400 from tenzen-y/use-uid-typed-instead-of-string

Job: Refactor uncountedTerminatedPods to avoid casting everywhere
This commit is contained in:
Kubernetes Prow Robot 2024-10-29 06:32:56 +00:00 committed by GitHub
commit 66b3dc1a38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 53 additions and 60 deletions

View File

@ -25,6 +25,7 @@ import (
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
@ -405,7 +406,7 @@ func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) {
cases := map[string]struct {
job batch.Job
pods []*v1.Pod
expectedRmFinalizers sets.Set[string]
expectedRmFinalizers sets.Set[types.UID]
wantPodsWithDelayedDeletionPerIndex []string
}{
"failed pods are kept corresponding to non-failed indexes are kept": {
@ -444,7 +445,7 @@ func TestGetPodsWithDelayedDeletionPerIndex(t *testing.T) {
pods: []*v1.Pod{
buildPod().uid("a").indexFailureCount("0").phase(v1.PodFailed).index("0").trackingFinalizer().Pod,
},
expectedRmFinalizers: sets.New("a"),
expectedRmFinalizers: sets.New[types.UID]("a"),
wantPodsWithDelayedDeletionPerIndex: []string{},
},
"failed pod with index outside of completions; the pod's deletion is not delayed": {

View File

@ -136,7 +136,7 @@ type syncJobCtx struct {
succeededIndexes orderedIntervals
failedIndexes *orderedIntervals
newBackoffRecord backoffRecord
expectedRmFinalizers sets.Set[string]
expectedRmFinalizers sets.Set[types.UID]
uncounted *uncountedTerminatedPods
podsWithDelayedDeletionPerIndex map[int]*v1.Pod
terminating *int32
@ -370,7 +370,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
if finalizerRemoved {
key, err := controller.KeyFunc(job)
if err == nil {
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, curPod.UID)
}
}
jm.enqueueSyncJobBatched(logger, job)
@ -386,7 +386,7 @@ func (jm *Controller) updatePod(logger klog.Logger, old, cur interface{}) {
if finalizerRemoved {
key, err := controller.KeyFunc(job)
if err == nil {
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, string(curPod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, key, curPod.UID)
}
}
jm.enqueueSyncJobBatched(logger, job)
@ -460,7 +460,7 @@ func (jm *Controller) deletePod(logger klog.Logger, obj interface{}, final bool)
// Consider the finalizer removed if this is the final delete. Otherwise,
// it's an update for the deletion timestamp, then check finalizer.
if final || !hasFinalizer {
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, pod.UID)
}
jm.enqueueSyncJobBatched(logger, job)
@ -1167,11 +1167,10 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
// Sort to introduce completed Indexes in order.
sort.Sort(byCompletionIndex(jobCtx.pods))
}
uidsWithFinalizer := make(sets.Set[string], len(jobCtx.pods))
uidsWithFinalizer := make(sets.Set[types.UID], len(jobCtx.pods))
for _, p := range jobCtx.pods {
uid := string(p.UID)
if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(uid) {
uidsWithFinalizer.Insert(uid)
if hasJobTrackingFinalizer(p) && !jobCtx.expectedRmFinalizers.Has(p.UID) {
uidsWithFinalizer.Insert(p.UID)
}
}
@ -1183,7 +1182,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
podFailureCountByPolicyAction := map[string]int{}
reachedMaxUncountedPods := false
for _, pod := range jobCtx.pods {
if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(string(pod.UID)) {
if !hasJobTrackingFinalizer(pod) || jobCtx.expectedRmFinalizers.Has(pod.UID) {
// This pod was processed in a previous sync.
continue
}
@ -1192,7 +1191,7 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
continue
}
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(string(pod.UID)) {
if pod.Status.Phase == v1.PodSucceeded && !jobCtx.uncounted.failed.Has(pod.UID) {
if isIndexed {
// The completion index is enough to avoid recounting succeeded pods.
// No need to track UIDs.
@ -1201,14 +1200,14 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(ctx context.Context, job
newSucceededIndexes = append(newSucceededIndexes, ix)
needsFlush = true
}
} else if !jobCtx.uncounted.succeeded.Has(string(pod.UID)) {
} else if !jobCtx.uncounted.succeeded.Has(pod.UID) {
needsFlush = true
uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
}
} else if considerPodFailed || (jobCtx.finishedCondition != nil && !isSuccessCriteriaMetCondition(jobCtx.finishedCondition)) {
// When the job is considered finished, every non-terminated pod is considered failed.
ix := getCompletionIndex(pod.Annotations)
if !jobCtx.uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
if !jobCtx.uncounted.failed.Has(pod.UID) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*jobCtx.job.Spec.Completions))) {
if jobCtx.job.Spec.PodFailurePolicy != nil {
_, countFailed, action := matchPodFailurePolicy(jobCtx.job.Spec.PodFailurePolicy, pod)
if action != nil {
@ -1333,7 +1332,7 @@ func canRemoveFinalizer(logger klog.Logger, jobCtx *syncJobCtx, pod *v1.Pod, con
//
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[string], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, jobCtx *syncJobCtx, podsToRemoveFinalizer []*v1.Pod, uidsWithFinalizer sets.Set[types.UID], oldCounters *batch.JobStatus, podFailureCountByPolicyAction map[string]int, needsFlush bool) (*batch.Job, bool, error) {
logger := klog.FromContext(ctx)
var err error
if needsFlush {
@ -1367,7 +1366,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
rmSucceded, rmErr = jm.removeTrackingFinalizerFromPods(ctx, jobKey, podsToRemoveFinalizer)
for i, p := range podsToRemoveFinalizer {
if rmSucceded[i] {
uidsWithFinalizer.Delete(string(p.UID))
uidsWithFinalizer.Delete(p.UID)
}
}
}
@ -1388,7 +1387,7 @@ func (jm *Controller) flushUncountedAndRemoveFinalizers(ctx context.Context, job
// .status.uncountedTerminatedPods for which the finalizer was successfully
// removed and increments the corresponding status counters.
// Returns whether there was any status change.
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[string]) bool {
func cleanUncountedPodsWithoutFinalizers(status *batch.JobStatus, uidsWithFinalizer sets.Set[types.UID]) bool {
updated := false
uncountedStatus := status.UncountedTerminatedPods
newUncounted := filterInUncountedUIDs(uncountedStatus.Succeeded, uidsWithFinalizer)
@ -1414,9 +1413,9 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe
logger := klog.FromContext(ctx)
errCh := make(chan error, len(pods))
succeeded := make([]bool, len(pods))
uids := make([]string, len(pods))
uids := make([]types.UID, len(pods))
for i, p := range pods {
uids[i] = string(p.UID)
uids[i] = p.UID
}
if jobKey != "" {
err := jm.finalizerExpectations.expectFinalizersRemoved(logger, jobKey, uids)
@ -1435,7 +1434,7 @@ func (jm *Controller) removeTrackingFinalizerFromPods(ctx context.Context, jobKe
// In case of any failure, we don't expect a Pod update for the
// finalizer removed. Clear expectation now.
if jobKey != "" {
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, string(pod.UID))
jm.finalizerExpectations.finalizerRemovalObserved(logger, jobKey, pod.UID)
}
if !apierrors.IsNotFound(err) {
errCh <- err
@ -1495,10 +1494,10 @@ func (jm *Controller) recordJobFinished(job *batch.Job, finishedCond *batch.JobC
return true
}
func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[string]) []types.UID {
func filterInUncountedUIDs(uncounted []types.UID, include sets.Set[types.UID]) []types.UID {
var newUncounted []types.UID
for _, uid := range uncounted {
if include.Has(string(uid)) {
if include.Has(uid) {
newUncounted = append(newUncounted, uid)
}
}
@ -1852,14 +1851,12 @@ func (jm *Controller) patchJob(ctx context.Context, job *batch.Job, data []byte)
// getValidPodsWithFilter returns the valid pods that pass the filter.
// Pods are valid if they have a finalizer or in uncounted set
// and, for Indexed Jobs, a valid completion index.
func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[string], filter func(*v1.Pod) bool) []*v1.Pod {
func getValidPodsWithFilter(jobCtx *syncJobCtx, uncounted sets.Set[types.UID], filter func(*v1.Pod) bool) []*v1.Pod {
var result []*v1.Pod
for _, p := range jobCtx.pods {
uid := string(p.UID)
// Pods that don't have a completion finalizer are in the uncounted set or
// have already been accounted for in the Job status.
if !hasJobTrackingFinalizer(p) || uncounted.Has(uid) || jobCtx.expectedRmFinalizers.Has(uid) {
if !hasJobTrackingFinalizer(p) || uncounted.Has(p.UID) || jobCtx.expectedRmFinalizers.Has(p.UID) {
continue
}
if isIndexedJob(jobCtx.job) {
@ -1906,32 +1903,25 @@ func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
}
type uncountedTerminatedPods struct {
succeeded sets.Set[string]
failed sets.Set[string]
succeeded sets.Set[types.UID]
failed sets.Set[types.UID]
}
func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods {
obj := uncountedTerminatedPods{
succeeded: make(sets.Set[string], len(in.Succeeded)),
failed: make(sets.Set[string], len(in.Failed)),
return &uncountedTerminatedPods{
succeeded: sets.New(in.Succeeded...),
failed: sets.New(in.Failed...),
}
for _, v := range in.Succeeded {
obj.succeeded.Insert(string(v))
}
for _, v := range in.Failed {
obj.failed.Insert(string(v))
}
return &obj
}
func (u *uncountedTerminatedPods) Succeeded() sets.Set[string] {
func (u *uncountedTerminatedPods) Succeeded() sets.Set[types.UID] {
if u == nil {
return nil
}
return u.succeeded
}
func (u *uncountedTerminatedPods) Failed() sets.Set[string] {
func (u *uncountedTerminatedPods) Failed() sets.Set[types.UID] {
if u == nil {
return nil
}

View File

@ -1453,7 +1453,7 @@ func TestGetNewFinshedPods(t *testing.T) {
cases := map[string]struct {
job batch.Job
pods []*v1.Pod
expectedRmFinalizers sets.Set[string]
expectedRmFinalizers sets.Set[types.UID]
wantSucceeded int32
wantFailed int32
}{
@ -1510,7 +1510,7 @@ func TestGetNewFinshedPods(t *testing.T) {
},
},
},
expectedRmFinalizers: sets.New("b", "f"),
expectedRmFinalizers: sets.New[types.UID]("b", "f"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).Pod,
buildPod().uid("b").phase(v1.PodSucceeded).trackingFinalizer().Pod,
@ -1575,7 +1575,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
job batch.Job
pods []*v1.Pod
finishedCond *batch.JobCondition
expectedRmFinalizers sets.Set[string]
expectedRmFinalizers sets.Set[types.UID]
needsFlush bool
statusUpdateErr error
podControlErr error
@ -1686,7 +1686,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
},
},
},
expectedRmFinalizers: sets.New("c", "d", "g", "h"),
expectedRmFinalizers: sets.New[types.UID]("c", "d", "g", "h"),
pods: []*v1.Pod{
buildPod().uid("a").phase(v1.PodSucceeded).trackingFinalizer().Pod,
buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod,
@ -7655,11 +7655,11 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
pods := append(newPodList(2, v1.PodSucceeded, job), newPodList(2, v1.PodFailed, job)...)
podInformer := sharedInformers.Core().V1().Pods().Informer()
podIndexer := podInformer.GetIndexer()
uids := sets.New[string]()
uids := sets.New[types.UID]()
for i := range pods {
clientset.Tracker().Add(pods[i])
podIndexer.Add(pods[i])
uids.Insert(string(pods[i].UID))
uids.Insert(pods[i].UID)
}
jobKey := testutil.GetKey(job, t)
@ -7725,7 +7725,7 @@ func TestFinalizersRemovedExpectations(t *testing.T) {
t.Errorf("Deleting pod that had finalizer: %v", err)
}
uids = sets.New(string(pods[2].UID))
uids = sets.New(pods[2].UID)
var diff string
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
gotExpectedUIDs = manager.finalizerExpectations.getExpectedUIDs(jobKey)

View File

@ -22,6 +22,7 @@ import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
@ -40,7 +41,7 @@ var uidSetKeyFunc = func(obj interface{}) (string, error) {
// uidTrackingExpectations to remember which UID it has seen/still waiting for.
type uidSet struct {
sync.RWMutex
set sets.Set[string]
set sets.Set[types.UID]
key string
}
@ -60,7 +61,7 @@ func (u *uidTrackingExpectations) getSet(controllerKey string) *uidSet {
return nil
}
func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[string] {
func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set[types.UID] {
uids := u.getSet(controllerKey)
if uids == nil {
return nil
@ -74,14 +75,14 @@ func (u *uidTrackingExpectations) getExpectedUIDs(controllerKey string) sets.Set
// ExpectDeletions records expectations for the given deleteKeys, against the
// given job-key.
// This is thread-safe across different job keys.
func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []string) error {
func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jobKey string, deletedKeys []types.UID) error {
logger.V(4).Info("Expecting tracking finalizers removed", "key", jobKey, "podUIDs", deletedKeys)
uids := u.getSet(jobKey)
if uids == nil {
uids = &uidSet{
key: jobKey,
set: sets.New[string](),
set: sets.New[types.UID](),
}
if err := u.store.Add(uids); err != nil {
return err
@ -94,7 +95,7 @@ func (u *uidTrackingExpectations) expectFinalizersRemoved(logger klog.Logger, jo
}
// FinalizerRemovalObserved records the given deleteKey as a deletion, for the given job.
func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey, deleteKey string) {
func (u *uidTrackingExpectations) finalizerRemovalObserved(logger klog.Logger, jobKey string, deleteKey types.UID) {
uids := u.getSet(jobKey)
if uids != nil {
uids.Lock()

View File

@ -25,6 +25,7 @@ import (
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/component-base/metrics/testutil"
"k8s.io/klog/v2/ktesting"
@ -35,23 +36,23 @@ func TestUIDTrackingExpectations(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
tracks := []struct {
job string
firstRound []string
secondRound []string
firstRound []types.UID
secondRound []types.UID
}{
{
job: "foo",
firstRound: []string{"a", "b", "c", "d"},
secondRound: []string{"e", "f"},
firstRound: []types.UID{"a", "b", "c", "d"},
secondRound: []types.UID{"e", "f"},
},
{
job: "bar",
firstRound: []string{"x", "y", "z"},
secondRound: []string{"u", "v", "w"},
firstRound: []types.UID{"x", "y", "z"},
secondRound: []types.UID{"u", "v", "w"},
},
{
job: "baz",
firstRound: []string{"w"},
secondRound: []string{"a"},
firstRound: []types.UID{"w"},
secondRound: []types.UID{"a"},
},
}
expectations := newUIDTrackingExpectations()