From 8e6a8669c09753531fb29355793267b72c5a4e9c Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Fri, 31 Jul 2020 17:35:48 -0400 Subject: [PATCH] Deflake TestExpectationsOnRecreate --- .../daemon/daemon_controller_test.go | 82 +++++++++++-------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 040b05bdebd..f86942debb0 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -469,6 +469,39 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal(err) } + expectStableQueueLength := func(expected int) { + t.Helper() + for i := 0; i < 5; i++ { + if actual := dsc.queue.Len(); actual != expected { + t.Fatalf("expected queue len to remain at %d, got %d", expected, actual) + } + time.Sleep(10 * time.Millisecond) + } + } + waitForQueueLength := func(expected int, msg string) { + t.Helper() + i := 0 + err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { + current := dsc.queue.Len() + switch { + case current == expected: + return true, nil + case current > expected: + return false, fmt.Errorf("queue length %d exceeded expected length %d", current, expected) + default: + i++ + if i > 1 { + t.Logf("Waiting for queue to have %d item, currently has: %d", expected, current) + } + return false, nil + } + }) + if err != nil { + t.Fatalf("%s: %v", msg, err) + } + expectStableQueueLength(expected) + } + fakeRecorder := record.NewFakeRecorder(100) dsc.eventRecorder = fakeRecorder @@ -499,9 +532,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal("caches failed to sync") } - if dsc.queue.Len() != 0 { - t.Fatal("Unexpected item in the queue") - } + expectStableQueueLength(0) oldDS := newDaemonSet("test") oldDS, err = client.AppsV1().DaemonSets(oldDS.Namespace).Create(context.Background(), oldDS, metav1.CreateOptions{}) @@ -509,14 +540,8 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal(err) } - err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { - klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", dsc.queue.Len()) - return dsc.queue.Len() == 1, nil - }) - if err != nil { - t.Fatalf("initial DS didn't result in new item in the queue: %v", err) - } - + // create of DS adds to queue, processes + waitForQueueLength(1, "created DS") ok = dsc.processNextWorkItem() if !ok { t.Fatal("queue is shutting down") @@ -538,28 +563,28 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal(err) } if !exists { - t.Errorf("No expectations found for DaemonSet %q", oldDSKey) + t.Fatalf("No expectations found for DaemonSet %q", oldDSKey) } if dsExp.Fulfilled() { t.Errorf("There should be unfulfiled expectation for creating new pods for DaemonSet %q", oldDSKey) } - if dsc.queue.Len() != 0 { - t.Fatal("Unexpected item in the queue") + // process updates DS, update adds to queue + waitForQueueLength(1, "updated DS") + ok = dsc.processNextWorkItem() + if !ok { + t.Fatal("queue is shutting down") } + // process does not re-update the DS + expectStableQueueLength(0) + err = client.AppsV1().DaemonSets(oldDS.Namespace).Delete(context.Background(), oldDS.Name, metav1.DeleteOptions{}) if err != nil { t.Fatal(err) } - err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { - klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", dsc.queue.Len()) - return dsc.queue.Len() == 1, nil - }) - if err != nil { - t.Fatalf("Deleting DS didn't result in new item in the queue: %v", err) - } + waitForQueueLength(1, "deleted DS") _, exists, err = dsc.expectations.GetExpectations(oldDSKey) if err != nil { @@ -579,9 +604,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal("Keys should be equal!") } - if dsc.queue.Len() != 0 { - t.Fatal("Unexpected item in the queue") - } + expectStableQueueLength(0) newDS := oldDS.DeepCopy() newDS.UID = uuid.NewUUID() @@ -595,14 +618,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal("New DS has the same UID as the old one!") } - err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { - klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", dsc.queue.Len()) - return dsc.queue.Len() == 1, nil - }) - if err != nil { - t.Fatalf("Re-creating DS didn't result in new item in the queue: %v", err) - } - + waitForQueueLength(1, "recreated DS") ok = dsc.processNextWorkItem() if !ok { t.Fatal("Queue is shutting down!") @@ -617,7 +633,7 @@ func TestExpectationsOnRecreate(t *testing.T) { t.Fatal(err) } if !exists { - t.Errorf("No expectations found for DaemonSet %q", oldDSKey) + t.Fatalf("No expectations found for DaemonSet %q", oldDSKey) } if dsExp.Fulfilled() { t.Errorf("There should be unfulfiled expectation for creating new pods for DaemonSet %q", oldDSKey)