mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-19 09:52:49 +00:00
Revert "Limit number of Pods counted in a single Job sync"
This reverts commit 7d9cb88fed
.
This commit is contained in:
parent
6a84310f8b
commit
8bcb780808
@ -880,8 +880,8 @@ func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error
|
|||||||
// or the job was removed.
|
// or the job was removed.
|
||||||
// 3. Increment job counters for pods that no longer have a finalizer.
|
// 3. Increment job counters for pods that no longer have a finalizer.
|
||||||
// 4. Add Complete condition if satisfied with current counters.
|
// 4. Add Complete condition if satisfied with current counters.
|
||||||
// It does this up to a limited number of Pods so that the size of .status
|
// It does this in a controlled way such that the size of .status doesn't grow
|
||||||
// doesn't grow too much and this sync doesn't starve other Jobs.
|
// too much.
|
||||||
func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error {
|
func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error {
|
||||||
isIndexed := isIndexedJob(job)
|
isIndexed := isIndexedJob(job)
|
||||||
var podsToRemoveFinalizer []*v1.Pod
|
var podsToRemoveFinalizer []*v1.Pod
|
||||||
@ -936,15 +936,17 @@ func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
|
if len(newSucceededIndexes)+len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
|
||||||
// The controller added enough Pods already to .status.uncountedTerminatedPods
|
if len(newSucceededIndexes) > 0 {
|
||||||
// We stop counting pods and removing finalizers here to:
|
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes)
|
||||||
// 1. Ensure that the UIDs representation are under 20 KB.
|
job.Status.Succeeded = int32(succeededIndexes.total())
|
||||||
// 2. Cap the number of finalizer removals so that syncing of big Jobs
|
job.Status.CompletedIndexes = succeededIndexes.String()
|
||||||
// doesn't starve smaller ones.
|
}
|
||||||
//
|
var err error
|
||||||
// The job will be synced again because the Job status and Pod updates
|
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, uidsWithFinalizer, needsFlush); err != nil {
|
||||||
// will put the Job back to the work queue.
|
return err
|
||||||
break
|
}
|
||||||
|
podsToRemoveFinalizer = nil
|
||||||
|
newSucceededIndexes = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(newSucceededIndexes) > 0 {
|
if len(newSucceededIndexes) > 0 {
|
||||||
|
@ -1463,7 +1463,7 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
|||||||
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
|
pods = append(pods, buildPod().uid("b").phase(v1.PodFailed).trackingFinalizer().Pod)
|
||||||
return pods
|
return pods
|
||||||
}(),
|
}(),
|
||||||
wantRmFinalizers: 499,
|
wantRmFinalizers: 501,
|
||||||
wantStatusUpdates: []batch.JobStatus{
|
wantStatusUpdates: []batch.JobStatus{
|
||||||
{
|
{
|
||||||
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
|
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
|
||||||
@ -1480,11 +1480,17 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
|
UncountedTerminatedPods: &batch.UncountedTerminatedPods{
|
||||||
Failed: []types.UID{"b"},
|
Succeeded: []types.UID{"499"},
|
||||||
|
Failed: []types.UID{"b"},
|
||||||
},
|
},
|
||||||
Succeeded: 499,
|
Succeeded: 499,
|
||||||
Failed: 1,
|
Failed: 1,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
|
||||||
|
Succeeded: 500,
|
||||||
|
Failed: 2,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"too many indexed finished": {
|
"too many indexed finished": {
|
||||||
@ -1501,13 +1507,18 @@ func TestTrackJobStatusAndRemoveFinalizers(t *testing.T) {
|
|||||||
}
|
}
|
||||||
return pods
|
return pods
|
||||||
}(),
|
}(),
|
||||||
wantRmFinalizers: 500,
|
wantRmFinalizers: 501,
|
||||||
wantStatusUpdates: []batch.JobStatus{
|
wantStatusUpdates: []batch.JobStatus{
|
||||||
{
|
{
|
||||||
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
|
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
|
||||||
CompletedIndexes: "0-499",
|
CompletedIndexes: "0-499",
|
||||||
Succeeded: 500,
|
Succeeded: 500,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
CompletedIndexes: "0-500",
|
||||||
|
UncountedTerminatedPods: &batch.UncountedTerminatedPods{},
|
||||||
|
Succeeded: 501,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
@ -230,8 +230,8 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||||||
|
|
||||||
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
|
||||||
Spec: batchv1.JobSpec{
|
Spec: batchv1.JobSpec{
|
||||||
Parallelism: pointer.Int32Ptr(504),
|
Parallelism: pointer.Int32Ptr(4),
|
||||||
Completions: pointer.Int32Ptr(506),
|
Completions: pointer.Int32Ptr(6),
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -241,23 +241,23 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||||||
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
|
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
Active: 504,
|
Active: 4,
|
||||||
}, wFinalizers)
|
}, wFinalizers)
|
||||||
// Failed Pods are replaced.
|
// Failed Pods are replaced.
|
||||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
Active: 504,
|
Active: 4,
|
||||||
Failed: 2,
|
Failed: 2,
|
||||||
}, wFinalizers)
|
}, wFinalizers)
|
||||||
// Pods are created until the number of succeeded Pods equals completions.
|
// Pods are created until the number of succeeded Pods equals completions.
|
||||||
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 503); err != nil {
|
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
|
||||||
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
|
||||||
}
|
}
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
Failed: 2,
|
Failed: 2,
|
||||||
Succeeded: 503,
|
Succeeded: 3,
|
||||||
Active: 3,
|
Active: 3,
|
||||||
}, wFinalizers)
|
}, wFinalizers)
|
||||||
// No more Pods are created after the Job completes.
|
// No more Pods are created after the Job completes.
|
||||||
@ -267,7 +267,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
|
|||||||
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
validateJobSucceeded(ctx, t, clientSet, jobObj)
|
||||||
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
|
||||||
Failed: 2,
|
Failed: 2,
|
||||||
Succeeded: 506,
|
Succeeded: 6,
|
||||||
}, false)
|
}, false)
|
||||||
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
|
||||||
})
|
})
|
||||||
@ -860,11 +860,7 @@ func setup(t *testing.T, nsBaseName string) (framework.CloseFunc, *restclient.Co
|
|||||||
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
|
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
|
||||||
_, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig)
|
_, server, apiServerCloseFn := framework.RunAnAPIServer(controlPlaneConfig)
|
||||||
|
|
||||||
config := restclient.Config{
|
config := restclient.Config{Host: server.URL}
|
||||||
Host: server.URL,
|
|
||||||
QPS: 200.0,
|
|
||||||
Burst: 200,
|
|
||||||
}
|
|
||||||
clientSet, err := clientset.NewForConfig(&config)
|
clientSet, err := clientset.NewForConfig(&config)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error creating clientset: %v", err)
|
t.Fatalf("Error creating clientset: %v", err)
|
||||||
|
Loading…
Reference in New Issue
Block a user