Lower the constants for the rate limiter in Job controller

This commit is contained in:
Michal Wozniak 2023-06-16 16:04:28 +02:00
parent c51a422d78
commit 74c5ff97f1
3 changed files with 27 additions and 23 deletions

View File

@ -69,10 +69,14 @@ const (
var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
var ( var (
// DefaultJobBackOff is the default backoff period. Exported for tests. // DefaultJobApiBackOff is the default backoff period. Exported for tests.
DefaultJobBackOff = 10 * time.Second DefaultJobApiBackOff = 1 * time.Second
// MaxJobBackOff is the max backoff period. Exported for tests. // MaxJobApiBackOff is the max backoff period. Exported for tests.
MaxJobBackOff = 360 * time.Second MaxJobApiBackOff = 60 * time.Second
// DefaultJobPodFailureBackOff is the default backoff period. Exported for tests.
DefaultJobPodFailureBackOff = 10 * time.Second
// MaxJobPodFailureBackOff is the max backoff period. Exported for tests.
MaxJobPodFailureBackOff = 360 * time.Second
// MaxUncountedPods is the maximum size the slices in // MaxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation // .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB. Exported for tests // roughly below 20 KB. Exported for tests
@ -148,8 +152,8 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
}, },
expectations: controller.NewControllerExpectations(), expectations: controller.NewControllerExpectations(),
finalizerExpectations: newUIDTrackingExpectations(), finalizerExpectations: newUIDTrackingExpectations(),
queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)), queue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff)),
orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff)), orphanQueue: workqueue.NewRateLimitingQueueWithDelayingInterface(workqueue.NewDelayingQueueWithCustomClock(clock, "job_orphan_pod"), workqueue.NewItemExponentialFailureRateLimiter(DefaultJobApiBackOff, MaxJobApiBackOff)),
broadcaster: eventBroadcaster, broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
clock: clock, clock: clock,
@ -1436,7 +1440,7 @@ func (jm *Controller) manageJob(ctx context.Context, job *batch.Job, activePods
} }
if active < wantActive { if active < wantActive {
remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobBackOff, MaxJobBackOff) remainingTime := newBackoffRecord.getRemainingTime(jm.clock, DefaultJobPodFailureBackOff, MaxJobPodFailureBackOff)
if remainingTime > 0 { if remainingTime > 0 {
jm.enqueueSyncJobWithDelay(logger, job, remainingTime) jm.enqueueSyncJobWithDelay(logger, job, remainingTime)
return 0, metrics.JobSyncActionPodsCreated, nil return 0, metrics.JobSyncActionPodsCreated, nil

View File

@ -3106,8 +3106,8 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
func TestSyncJobUpdateRequeue(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobBackOff = 10 * time.Second }() defer func() { DefaultJobApiBackOff = 1 * time.Second }()
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
cases := map[string]struct { cases := map[string]struct {
updateErr error updateErr error
wantRequeuedImmediately bool wantRequeuedImmediately bool
@ -3136,7 +3136,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job) sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
manager.queue.Add(testutil.GetKey(job, t)) manager.queue.Add(testutil.GetKey(job, t))
manager.processNextWorkItem(context.TODO()) manager.processNextWorkItem(context.TODO())
// With DefaultJobBackOff=0, the queueing is synchronous. // With DefaultJobApiBackOff=0, the queueing is synchronous.
requeued := manager.queue.Len() > 0 requeued := manager.queue.Len() > 0
if requeued != tc.wantRequeuedImmediately { if requeued != tc.wantRequeuedImmediately {
t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeuedImmediately) t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeuedImmediately)
@ -3934,8 +3934,8 @@ func TestJobBackoffReset(t *testing.T) {
for name, tc := range testCases { for name, tc := range testCases {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
defer func() { DefaultJobBackOff = 10 * time.Second }() defer func() { DefaultJobApiBackOff = 1 * time.Second }()
DefaultJobBackOff = time.Duration(0) // overwrite the default value for testing DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc) manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{} fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl manager.podControl = &fakePodControl

View File

@ -1379,9 +1379,9 @@ func TestFinalizersClearedWhenBackoffLimitExceeded(t *testing.T) {
func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) { func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
// overwrite the default value for faster testing // overwrite the default value for faster testing
oldBackoff := jobcontroller.DefaultJobBackOff oldBackoff := jobcontroller.DefaultJobPodFailureBackOff
defer func() { jobcontroller.DefaultJobBackOff = oldBackoff }() defer func() { jobcontroller.DefaultJobPodFailureBackOff = oldBackoff }()
jobcontroller.DefaultJobBackOff = 2 * time.Second jobcontroller.DefaultJobPodFailureBackOff = 2 * time.Second
closeFn, restConfig, clientSet, ns := setup(t, "simple") closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn() defer closeFn()
@ -1441,25 +1441,25 @@ func TestJobPodsCreatedWithExponentialBackoff(t *testing.T) {
return finishTime[i].Before(finishTime[j]) return finishTime[i].Before(finishTime[j])
}) })
if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobBackOff.Seconds() { if creationTime[1].Sub(finishTime[0]).Seconds() < jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobBackOff) t.Fatalf("Second pod should be created at least %v seconds after the first pod", jobcontroller.DefaultJobPodFailureBackOff)
} }
if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobBackOff.Seconds() { if creationTime[1].Sub(finishTime[0]).Seconds() >= 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobBackOff) t.Fatalf("Second pod should be created before %v seconds after the first pod", 2*jobcontroller.DefaultJobPodFailureBackOff)
} }
diff := creationTime[2].Sub(finishTime[1]).Seconds() diff := creationTime[2].Sub(finishTime[1]).Seconds()
// The third pod should not be created before 4 seconds // The third pod should not be created before 4 seconds
if diff < 2*jobcontroller.DefaultJobBackOff.Seconds() { if diff < 2*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobBackOff) t.Fatalf("Third pod should be created at least %v seconds after the second pod", 2*jobcontroller.DefaultJobPodFailureBackOff)
} }
// The third pod should be created within 8 seconds // The third pod should be created within 8 seconds
// This check rules out double counting // This check rules out double counting
if diff >= 4*jobcontroller.DefaultJobBackOff.Seconds() { if diff >= 4*jobcontroller.DefaultJobPodFailureBackOff.Seconds() {
t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobBackOff) t.Fatalf("Third pod should be created before %v seconds after the second pod", 4*jobcontroller.DefaultJobPodFailureBackOff)
} }
} }