mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 02:09:56 +00:00
Merge pull request #127228 from hakuna-matatah/master
Improve Job Controller Performance
This commit is contained in:
commit
60cbbdf4b3
@ -479,12 +479,6 @@ func (jm *Controller) updateJob(logger klog.Logger, old, cur interface{}) {
|
|||||||
jm.enqueueSyncJobImmediately(logger, curJob)
|
jm.enqueueSyncJobImmediately(logger, curJob)
|
||||||
}
|
}
|
||||||
|
|
||||||
// The job shouldn't be marked as finished until all pod finalizers are removed.
|
|
||||||
// This is a backup operation in this case.
|
|
||||||
if util.IsJobFinished(curJob) {
|
|
||||||
jm.cleanupPodFinalizers(curJob)
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if need to add a new rsync for ActiveDeadlineSeconds
|
// check if need to add a new rsync for ActiveDeadlineSeconds
|
||||||
if curJob.Status.StartTime != nil {
|
if curJob.Status.StartTime != nil {
|
||||||
curADS := curJob.Spec.ActiveDeadlineSeconds
|
curADS := curJob.Spec.ActiveDeadlineSeconds
|
||||||
@ -772,6 +766,9 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) {
|
|||||||
|
|
||||||
// if job was finished previously, we don't want to redo the termination
|
// if job was finished previously, we don't want to redo the termination
|
||||||
if util.IsJobFinished(&job) {
|
if util.IsJobFinished(&job) {
|
||||||
|
// The job shouldn't be marked as finished until all pod finalizers are removed.
|
||||||
|
// Cleaning pod finalizers one more time just in case.
|
||||||
|
jm.cleanupPodFinalizers(&job)
|
||||||
err := jm.podBackoffStore.removeBackoffRecord(key)
|
err := jm.podBackoffStore.removeBackoffRecord(key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// re-syncing here as the record has to be removed for finished/deleted jobs
|
// re-syncing here as the record has to be removed for finished/deleted jobs
|
||||||
|
@ -7624,9 +7624,9 @@ func TestFinalizerCleanup(t *testing.T) {
|
|||||||
// Start the Pod and Job informers.
|
// Start the Pod and Job informers.
|
||||||
sharedInformers.Start(ctx.Done())
|
sharedInformers.Start(ctx.Done())
|
||||||
sharedInformers.WaitForCacheSync(ctx.Done())
|
sharedInformers.WaitForCacheSync(ctx.Done())
|
||||||
// Initialize the controller with 0 workers to make sure the
|
// Initialize the controller with 1 worker to make sure the
|
||||||
// pod finalizers are not removed by the "syncJob" function.
|
// pod finalizers are removed by the "syncJob" function.
|
||||||
go manager.Run(ctx, 0)
|
go manager.Run(ctx, 1)
|
||||||
// Make sure the pod finalizers are removed by the "orphanWorker" function.
|
// Make sure the pod finalizers are removed by the "orphanWorker" function.
|
||||||
go wait.UntilWithContext(ctx, manager.orphanWorker, time.Second)
|
go wait.UntilWithContext(ctx, manager.orphanWorker, time.Second)
|
||||||
|
|
||||||
@ -7645,22 +7645,24 @@ func TestFinalizerCleanup(t *testing.T) {
|
|||||||
t.Fatalf("Waiting for Job object to appear in jobLister: %v", err)
|
t.Fatalf("Waiting for Job object to appear in jobLister: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a Pod with the job tracking finalizer
|
selector, err := metav1.LabelSelectorAsSelector(job.Spec.Selector)
|
||||||
pod := newPod("test-pod", job)
|
|
||||||
pod.Finalizers = append(pod.Finalizers, batch.JobTrackingFinalizer)
|
|
||||||
pod, err = clientset.CoreV1().Pods(pod.GetNamespace()).Create(ctx, pod, metav1.CreateOptions{})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Creating pod: %v", err)
|
t.Fatalf("Error parsing job selector: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Await for the Pod to appear in the podStore to ensure that the pod exists when cleaning up the Job.
|
var pods []*v1.Pod
|
||||||
// In a production environment, there wouldn't be these guarantees, but the Pod would be cleaned up
|
// Wait for the Pod to be created by the Job controller
|
||||||
// by the orphan pod worker, when the Pod finishes.
|
|
||||||
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
|
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
|
||||||
pod, _ := manager.podStore.Pods(pod.GetNamespace()).Get(pod.Name)
|
pods, err = manager.podStore.Pods(job.Namespace).List(selector)
|
||||||
return pod != nil, nil
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if len(pods) > 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
t.Fatalf("Waiting for Pod to appear in podLister: %v", err)
|
t.Fatalf("Waiting for Pod to be created by Job: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark Job as complete.
|
// Mark Job as complete.
|
||||||
@ -7673,10 +7675,9 @@ func TestFinalizerCleanup(t *testing.T) {
|
|||||||
t.Fatalf("Updating job status: %v", err)
|
t.Fatalf("Updating job status: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Verify the pod finalizer is removed for a finished Job,
|
// Verify the pod finalizer is removed for a finished Job
|
||||||
// even if the jobs pods are not tracked by the main reconciliation loop.
|
|
||||||
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (bool, error) {
|
||||||
p, err := clientset.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
|
p, err := clientset.CoreV1().Pods(pods[0].Namespace).Get(ctx, pods[0].Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user