mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Do not set jm.syncJobBatchPeriod=0 if not needed
This commit is contained in:
parent
9e0569f2ed
commit
8ed23558b4
@ -56,9 +56,6 @@ import (
|
|||||||
"k8s.io/utils/pointer"
|
"k8s.io/utils/pointer"
|
||||||
)
|
)
|
||||||
|
|
||||||
// syncJobBatchPeriod is the batch period for controller sync invocations for a Job.
|
|
||||||
const syncJobBatchPeriod = time.Second
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// PodFailurePolicy reason indicates a job failure condition is added due to
|
// PodFailurePolicy reason indicates a job failure condition is added due to
|
||||||
// a failed pod matching a pod failure policy rule
|
// a failed pod matching a pod failure policy rule
|
||||||
@ -69,14 +66,16 @@ const (
|
|||||||
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
|
var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// DefaultJobApiBackOff is the default backoff period. Exported for tests.
|
// syncJobBatchPeriod is the batch period for controller sync invocations for a Job.
|
||||||
DefaultJobApiBackOff = 1 * time.Second
|
syncJobBatchPeriod = time.Second
|
||||||
// MaxJobApiBackOff is the max backoff period. Exported for tests.
|
// DefaultJobApiBackOff is the default API backoff period. Exported for tests.
|
||||||
MaxJobApiBackOff = 60 * time.Second
|
DefaultJobApiBackOff = time.Second
|
||||||
// DefaultJobPodFailureBackOff is the default backoff period. Exported for tests.
|
// MaxJobApiBackOff is the max API backoff period. Exported for tests.
|
||||||
|
MaxJobApiBackOff = time.Minute
|
||||||
|
// DefaultJobPodFailureBackOff is the default pod failure backoff period. Exported for tests.
|
||||||
DefaultJobPodFailureBackOff = 10 * time.Second
|
DefaultJobPodFailureBackOff = 10 * time.Second
|
||||||
// MaxJobPodFailureBackOff is the max backoff period. Exported for tests.
|
// MaxJobPodFailureBackOff is the max pod failure backoff period. Exported for tests.
|
||||||
MaxJobPodFailureBackOff = 360 * time.Second
|
MaxJobPodFailureBackOff = 10 * time.Minute
|
||||||
// MaxUncountedPods is the maximum size the slices in
|
// MaxUncountedPods is the maximum size the slices in
|
||||||
// .status.uncountedTerminatedPods should have to keep their representation
|
// .status.uncountedTerminatedPods should have to keep their representation
|
||||||
// roughly below 20 KB. Exported for tests
|
// roughly below 20 KB. Exported for tests
|
||||||
@ -125,8 +124,6 @@ type Controller struct {
|
|||||||
broadcaster record.EventBroadcaster
|
broadcaster record.EventBroadcaster
|
||||||
recorder record.EventRecorder
|
recorder record.EventRecorder
|
||||||
|
|
||||||
syncJobBatchPeriod time.Duration
|
|
||||||
|
|
||||||
clock clock.WithTicker
|
clock clock.WithTicker
|
||||||
|
|
||||||
// Store with information to compute the expotential backoff delay for pod
|
// Store with information to compute the expotential backoff delay for pod
|
||||||
@ -159,7 +156,6 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
|
|||||||
clock: clock,
|
clock: clock,
|
||||||
podBackoffStore: newBackoffStore(),
|
podBackoffStore: newBackoffStore(),
|
||||||
}
|
}
|
||||||
jm.syncJobBatchPeriod = syncJobBatchPeriod
|
|
||||||
|
|
||||||
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
@ -521,7 +517,7 @@ func (jm *Controller) enqueueSyncJobImmediately(logger klog.Logger, obj interfac
|
|||||||
// - Job status update
|
// - Job status update
|
||||||
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
||||||
func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
|
func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{}) {
|
||||||
jm.enqueueSyncJobInternal(logger, obj, jm.syncJobBatchPeriod)
|
jm.enqueueSyncJobInternal(logger, obj, syncJobBatchPeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
|
// enqueueSyncJobWithDelay tells the controller to invoke syncJob with a
|
||||||
@ -529,8 +525,8 @@ func (jm *Controller) enqueueSyncJobBatched(logger klog.Logger, obj interface{})
|
|||||||
// It is used when pod recreations are delayed due to pod failures.
|
// It is used when pod recreations are delayed due to pod failures.
|
||||||
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
// obj could be an *batch.Job, or a DeletionFinalStateUnknown marker item.
|
||||||
func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
|
func (jm *Controller) enqueueSyncJobWithDelay(logger klog.Logger, obj interface{}, delay time.Duration) {
|
||||||
if delay < jm.syncJobBatchPeriod {
|
if delay < syncJobBatchPeriod {
|
||||||
delay = jm.syncJobBatchPeriod
|
delay = syncJobBatchPeriod
|
||||||
}
|
}
|
||||||
jm.enqueueSyncJobInternal(logger, obj, delay)
|
jm.enqueueSyncJobInternal(logger, obj, delay)
|
||||||
}
|
}
|
||||||
|
@ -64,6 +64,10 @@ import (
|
|||||||
var realClock = &clock.RealClock{}
|
var realClock = &clock.RealClock{}
|
||||||
var alwaysReady = func() bool { return true }
|
var alwaysReady = func() bool { return true }
|
||||||
|
|
||||||
|
const fastSyncJobBatchPeriod = 10 * time.Millisecond
|
||||||
|
const fastJobApiBackoff = 10 * time.Millisecond
|
||||||
|
const fastRequeue = 10 * time.Millisecond
|
||||||
|
|
||||||
// testFinishedAt represents time one second later than unix epoch
|
// testFinishedAt represents time one second later than unix epoch
|
||||||
// this will be used in various test cases where we don't want back-off to kick in
|
// this will be used in various test cases where we don't want back-off to kick in
|
||||||
var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second))
|
var testFinishedAt = metav1.NewTime((time.Time{}).Add(time.Second))
|
||||||
@ -3096,25 +3100,27 @@ func TestSyncJobWithJobPodFailurePolicy(t *testing.T) {
|
|||||||
func TestSyncJobUpdateRequeue(t *testing.T) {
|
func TestSyncJobUpdateRequeue(t *testing.T) {
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
defer func() { DefaultJobApiBackOff = 1 * time.Second }()
|
|
||||||
DefaultJobApiBackOff = time.Duration(0) // overwrite the default value for testing
|
|
||||||
cases := map[string]struct {
|
cases := map[string]struct {
|
||||||
updateErr error
|
updateErr error
|
||||||
wantRequeuedImmediately bool
|
wantRequeued bool
|
||||||
}{
|
}{
|
||||||
"no error": {},
|
"no error": {
|
||||||
|
wantRequeued: false,
|
||||||
|
},
|
||||||
"generic error": {
|
"generic error": {
|
||||||
updateErr: fmt.Errorf("update error"),
|
updateErr: fmt.Errorf("update error"),
|
||||||
wantRequeuedImmediately: true,
|
wantRequeued: true,
|
||||||
},
|
},
|
||||||
"conflict error": {
|
"conflict error": {
|
||||||
updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
|
updateErr: apierrors.NewConflict(schema.GroupResource{}, "", nil),
|
||||||
wantRequeuedImmediately: true,
|
wantRequeued: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for name, tc := range cases {
|
for name, tc := range cases {
|
||||||
t.Run(name, func(t *testing.T) {
|
t.Run(name, func(t *testing.T) {
|
||||||
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff))
|
||||||
|
fakeClient := clocktesting.NewFakeClock(time.Now())
|
||||||
|
manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClient)
|
||||||
fakePodControl := controller.FakePodControl{}
|
fakePodControl := controller.FakePodControl{}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
@ -3126,17 +3132,16 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
|
|||||||
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
sharedInformerFactory.Batch().V1().Jobs().Informer().GetIndexer().Add(job)
|
||||||
manager.queue.Add(testutil.GetKey(job, t))
|
manager.queue.Add(testutil.GetKey(job, t))
|
||||||
manager.processNextWorkItem(context.TODO())
|
manager.processNextWorkItem(context.TODO())
|
||||||
// With DefaultJobApiBackOff=0, the queueing is synchronous.
|
if tc.wantRequeued {
|
||||||
requeued := manager.queue.Len() > 0
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, manager, 1)
|
||||||
if requeued != tc.wantRequeuedImmediately {
|
} else {
|
||||||
t.Errorf("Unexpected requeue, got %t, want %t", requeued, tc.wantRequeuedImmediately)
|
// We advance the clock to make sure there are not items awaiting
|
||||||
}
|
// to be added into the queue. We also sleep a little to give the
|
||||||
if requeued {
|
// delaying queue time to move the potential items from pre-queue
|
||||||
key, _ := manager.queue.Get()
|
// into the queue asynchronously.
|
||||||
expectedKey := testutil.GetKey(job, t)
|
manager.clock.Sleep(fastJobApiBackoff)
|
||||||
if key != expectedKey {
|
time.Sleep(time.Millisecond)
|
||||||
t.Errorf("Expected requeue of job with key %s got %s", expectedKey, key)
|
verifyEmptyQueue(ctx, t, manager)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -3371,15 +3376,15 @@ func TestGetPodsForJob(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestAddPod(t *testing.T) {
|
func TestAddPod(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3394,9 +3399,7 @@ func TestAddPod(t *testing.T) {
|
|||||||
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
|
||||||
|
|
||||||
jm.addPod(logger, pod1)
|
jm.addPod(logger, pod1)
|
||||||
if got, want := jm.queue.Len(), 1; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
key, done := jm.queue.Get()
|
key, done := jm.queue.Get()
|
||||||
if key == nil || done {
|
if key == nil || done {
|
||||||
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
||||||
@ -3407,9 +3410,7 @@ func TestAddPod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
jm.addPod(logger, pod2)
|
jm.addPod(logger, pod2)
|
||||||
if got, want := jm.queue.Len(), 1; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
key, done = jm.queue.Get()
|
key, done = jm.queue.Get()
|
||||||
if key == nil || done {
|
if key == nil || done {
|
||||||
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
||||||
@ -3421,13 +3422,13 @@ func TestAddPod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestAddPodOrphan(t *testing.T) {
|
func TestAddPodOrphan(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
|
||||||
logger, ctx := ktesting.NewTestContext(t)
|
logger, ctx := ktesting.NewTestContext(t)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3446,20 +3447,18 @@ func TestAddPodOrphan(t *testing.T) {
|
|||||||
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod1)
|
||||||
|
|
||||||
jm.addPod(logger, pod1)
|
jm.addPod(logger, pod1)
|
||||||
if got, want := jm.queue.Len(), 2; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdatePod(t *testing.T) {
|
func TestUpdatePod(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3476,9 +3475,7 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
prev := *pod1
|
prev := *pod1
|
||||||
bumpResourceVersion(pod1)
|
bumpResourceVersion(pod1)
|
||||||
jm.updatePod(logger, &prev, pod1)
|
jm.updatePod(logger, &prev, pod1)
|
||||||
if got, want := jm.queue.Len(), 1; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
key, done := jm.queue.Get()
|
key, done := jm.queue.Get()
|
||||||
if key == nil || done {
|
if key == nil || done {
|
||||||
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
||||||
@ -3491,9 +3488,7 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
prev = *pod2
|
prev = *pod2
|
||||||
bumpResourceVersion(pod2)
|
bumpResourceVersion(pod2)
|
||||||
jm.updatePod(logger, &prev, pod2)
|
jm.updatePod(logger, &prev, pod2)
|
||||||
if got, want := jm.queue.Len(), 1; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
key, done = jm.queue.Get()
|
key, done = jm.queue.Get()
|
||||||
if key == nil || done {
|
if key == nil || done {
|
||||||
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
||||||
@ -3505,13 +3500,13 @@ func TestUpdatePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
|
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
|
||||||
logger, ctx := ktesting.NewTestContext(t)
|
logger, ctx := ktesting.NewTestContext(t)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3529,20 +3524,18 @@ func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
|
|||||||
prev.Labels = map[string]string{"foo2": "bar2"}
|
prev.Labels = map[string]string{"foo2": "bar2"}
|
||||||
bumpResourceVersion(pod1)
|
bumpResourceVersion(pod1)
|
||||||
jm.updatePod(logger, &prev, pod1)
|
jm.updatePod(logger, &prev, pod1)
|
||||||
if got, want := jm.queue.Len(), 2; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdatePodChangeControllerRef(t *testing.T) {
|
func TestUpdatePodChangeControllerRef(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3559,20 +3552,18 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
|
|||||||
prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)}
|
prev.OwnerReferences = []metav1.OwnerReference{*metav1.NewControllerRef(job2, controllerKind)}
|
||||||
bumpResourceVersion(pod1)
|
bumpResourceVersion(pod1)
|
||||||
jm.updatePod(logger, &prev, pod1)
|
jm.updatePod(logger, &prev, pod1)
|
||||||
if got, want := jm.queue.Len(), 2; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdatePodRelease(t *testing.T) {
|
func TestUpdatePodRelease(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
logger := klog.FromContext(ctx)
|
logger := klog.FromContext(ctx)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3589,19 +3580,17 @@ func TestUpdatePodRelease(t *testing.T) {
|
|||||||
pod1.OwnerReferences = nil
|
pod1.OwnerReferences = nil
|
||||||
bumpResourceVersion(pod1)
|
bumpResourceVersion(pod1)
|
||||||
jm.updatePod(logger, &prev, pod1)
|
jm.updatePod(logger, &prev, pod1)
|
||||||
if got, want := jm.queue.Len(), 2; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 2)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeletePod(t *testing.T) {
|
func TestDeletePod(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, fastSyncJobBatchPeriod))
|
||||||
logger, ctx := ktesting.NewTestContext(t)
|
logger, ctx := ktesting.NewTestContext(t)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
jm, informer := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3616,9 +3605,7 @@ func TestDeletePod(t *testing.T) {
|
|||||||
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
|
informer.Core().V1().Pods().Informer().GetIndexer().Add(pod2)
|
||||||
|
|
||||||
jm.deletePod(logger, pod1, true)
|
jm.deletePod(logger, pod1, true)
|
||||||
if got, want := jm.queue.Len(), 1; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
key, done := jm.queue.Get()
|
key, done := jm.queue.Get()
|
||||||
if key == nil || done {
|
if key == nil || done {
|
||||||
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
t.Fatalf("failed to enqueue controller for pod %v", pod1.Name)
|
||||||
@ -3629,9 +3616,7 @@ func TestDeletePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
jm.deletePod(logger, pod2, true)
|
jm.deletePod(logger, pod2, true)
|
||||||
if got, want := jm.queue.Len(), 1; got != want {
|
verifyEmptyQueueAndAwaitForQueueLen(ctx, t, jm, 1)
|
||||||
t.Fatalf("queue.Len() = %v, want %v", got, want)
|
|
||||||
}
|
|
||||||
key, done = jm.queue.Get()
|
key, done = jm.queue.Get()
|
||||||
if key == nil || done {
|
if key == nil || done {
|
||||||
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
t.Fatalf("failed to enqueue controller for pod %v", pod2.Name)
|
||||||
@ -3643,13 +3628,13 @@ func TestDeletePod(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDeletePodOrphan(t *testing.T) {
|
func TestDeletePodOrphan(t *testing.T) {
|
||||||
|
// Disable batching of pod updates to show it does not get requeued at all
|
||||||
|
t.Cleanup(setDurationDuringTest(&syncJobBatchPeriod, 0))
|
||||||
logger, ctx := ktesting.NewTestContext(t)
|
logger, ctx := ktesting.NewTestContext(t)
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
jm, informer := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
||||||
jm.podStoreSynced = alwaysReady
|
jm.podStoreSynced = alwaysReady
|
||||||
jm.jobStoreSynced = alwaysReady
|
jm.jobStoreSynced = alwaysReady
|
||||||
// Disable batching of pod updates.
|
|
||||||
jm.syncJobBatchPeriod = 0
|
|
||||||
|
|
||||||
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
job1 := newJob(1, 1, 6, batch.NonIndexedCompletion)
|
||||||
job1.Name = "job1"
|
job1.Name = "job1"
|
||||||
@ -3889,10 +3874,12 @@ func bumpResourceVersion(obj metav1.Object) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestJobApiBackoffReset(t *testing.T) {
|
func TestJobApiBackoffReset(t *testing.T) {
|
||||||
|
t.Cleanup(setDurationDuringTest(&DefaultJobApiBackOff, fastJobApiBackoff))
|
||||||
_, ctx := ktesting.NewTestContext(t)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
|
||||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}})
|
||||||
manager, sharedInformerFactory := newControllerFromClient(ctx, clientset, controller.NoResyncPeriodFunc)
|
fakeClock := clocktesting.NewFakeClock(time.Now())
|
||||||
|
manager, sharedInformerFactory := newControllerFromClientWithClock(ctx, clientset, controller.NoResyncPeriodFunc, fakeClock)
|
||||||
fakePodControl := controller.FakePodControl{}
|
fakePodControl := controller.FakePodControl{}
|
||||||
manager.podControl = &fakePodControl
|
manager.podControl = &fakePodControl
|
||||||
manager.podStoreSynced = alwaysReady
|
manager.podStoreSynced = alwaysReady
|
||||||
@ -3916,11 +3903,9 @@ func TestJobApiBackoffReset(t *testing.T) {
|
|||||||
|
|
||||||
// the queue is emptied on success
|
// the queue is emptied on success
|
||||||
fakePodControl.Err = nil
|
fakePodControl.Err = nil
|
||||||
|
manager.clock.Sleep(fastJobApiBackoff)
|
||||||
manager.processNextWorkItem(context.TODO())
|
manager.processNextWorkItem(context.TODO())
|
||||||
retries = manager.queue.NumRequeues(key)
|
verifyEmptyQueue(ctx, t, manager)
|
||||||
if retries != 0 {
|
|
||||||
t.Fatalf("%s: expected exactly 0 retries, got %d", job.Name, retries)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}
|
var _ workqueue.RateLimitingInterface = &fakeRateLimitingQueue{}
|
||||||
@ -4434,6 +4419,33 @@ func checkJobCompletionEnvVariable(t *testing.T, spec *v1.PodSpec) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func verifyEmptyQueueAndAwaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
|
||||||
|
t.Helper()
|
||||||
|
verifyEmptyQueue(ctx, t, jm)
|
||||||
|
awaitForQueueLen(ctx, t, jm, wantQueueLen)
|
||||||
|
}
|
||||||
|
|
||||||
|
func awaitForQueueLen(ctx context.Context, t *testing.T, jm *Controller, wantQueueLen int) {
|
||||||
|
t.Helper()
|
||||||
|
verifyEmptyQueue(ctx, t, jm)
|
||||||
|
if err := wait.PollUntilContextTimeout(ctx, fastRequeue, time.Second, true, func(ctx context.Context) (bool, error) {
|
||||||
|
if requeued := jm.queue.Len() == wantQueueLen; requeued {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
jm.clock.Sleep(fastRequeue)
|
||||||
|
return false, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Errorf("Failed to await for expected queue.Len(). want %v, got: %v", wantQueueLen, jm.queue.Len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func verifyEmptyQueue(ctx context.Context, t *testing.T, jm *Controller) {
|
||||||
|
t.Helper()
|
||||||
|
if jm.queue.Len() > 0 {
|
||||||
|
t.Errorf("Unexpected queue.Len(). Want: %d, got: %d", 0, jm.queue.Len())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type podBuilder struct {
|
type podBuilder struct {
|
||||||
*v1.Pod
|
*v1.Pod
|
||||||
}
|
}
|
||||||
@ -4515,3 +4527,11 @@ func (pb podBuilder) deletionTimestamp() podBuilder {
|
|||||||
pb.DeletionTimestamp = &metav1.Time{}
|
pb.DeletionTimestamp = &metav1.Time{}
|
||||||
return pb
|
return pb
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setDurationDuringTest(val *time.Duration, newVal time.Duration) func() {
|
||||||
|
origVal := *val
|
||||||
|
*val = newVal
|
||||||
|
return func() {
|
||||||
|
*val = origVal
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user