Deflake TestExpectationsOnRecreate

This commit is contained in:
Jordan Liggitt 2020-07-31 17:35:48 -04:00
parent 6324c137ea
commit 8e6a8669c0

View File

@ -469,6 +469,39 @@ func TestExpectationsOnRecreate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
expectStableQueueLength := func(expected int) {
t.Helper()
for i := 0; i < 5; i++ {
if actual := dsc.queue.Len(); actual != expected {
t.Fatalf("expected queue len to remain at %d, got %d", expected, actual)
}
time.Sleep(10 * time.Millisecond)
}
}
waitForQueueLength := func(expected int, msg string) {
t.Helper()
i := 0
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) {
current := dsc.queue.Len()
switch {
case current == expected:
return true, nil
case current > expected:
return false, fmt.Errorf("queue length %d exceeded expected length %d", current, expected)
default:
i++
if i > 1 {
t.Logf("Waiting for queue to have %d item, currently has: %d", expected, current)
}
return false, nil
}
})
if err != nil {
t.Fatalf("%s: %v", msg, err)
}
expectStableQueueLength(expected)
}
fakeRecorder := record.NewFakeRecorder(100) fakeRecorder := record.NewFakeRecorder(100)
dsc.eventRecorder = fakeRecorder dsc.eventRecorder = fakeRecorder
@ -499,9 +532,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
t.Fatal("caches failed to sync") t.Fatal("caches failed to sync")
} }
if dsc.queue.Len() != 0 { expectStableQueueLength(0)
t.Fatal("Unexpected item in the queue")
}
oldDS := newDaemonSet("test") oldDS := newDaemonSet("test")
oldDS, err = client.AppsV1().DaemonSets(oldDS.Namespace).Create(context.Background(), oldDS, metav1.CreateOptions{}) oldDS, err = client.AppsV1().DaemonSets(oldDS.Namespace).Create(context.Background(), oldDS, metav1.CreateOptions{})
@ -509,14 +540,8 @@ func TestExpectationsOnRecreate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { // create of DS adds to queue, processes
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", dsc.queue.Len()) waitForQueueLength(1, "created DS")
return dsc.queue.Len() == 1, nil
})
if err != nil {
t.Fatalf("initial DS didn't result in new item in the queue: %v", err)
}
ok = dsc.processNextWorkItem() ok = dsc.processNextWorkItem()
if !ok { if !ok {
t.Fatal("queue is shutting down") t.Fatal("queue is shutting down")
@ -538,28 +563,28 @@ func TestExpectationsOnRecreate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if !exists { if !exists {
t.Errorf("No expectations found for DaemonSet %q", oldDSKey) t.Fatalf("No expectations found for DaemonSet %q", oldDSKey)
} }
if dsExp.Fulfilled() { if dsExp.Fulfilled() {
t.Errorf("There should be unfulfiled expectation for creating new pods for DaemonSet %q", oldDSKey) t.Errorf("There should be unfulfiled expectation for creating new pods for DaemonSet %q", oldDSKey)
} }
if dsc.queue.Len() != 0 { // process updates DS, update adds to queue
t.Fatal("Unexpected item in the queue") waitForQueueLength(1, "updated DS")
ok = dsc.processNextWorkItem()
if !ok {
t.Fatal("queue is shutting down")
} }
// process does not re-update the DS
expectStableQueueLength(0)
err = client.AppsV1().DaemonSets(oldDS.Namespace).Delete(context.Background(), oldDS.Name, metav1.DeleteOptions{}) err = client.AppsV1().DaemonSets(oldDS.Namespace).Delete(context.Background(), oldDS.Name, metav1.DeleteOptions{})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { waitForQueueLength(1, "deleted DS")
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", dsc.queue.Len())
return dsc.queue.Len() == 1, nil
})
if err != nil {
t.Fatalf("Deleting DS didn't result in new item in the queue: %v", err)
}
_, exists, err = dsc.expectations.GetExpectations(oldDSKey) _, exists, err = dsc.expectations.GetExpectations(oldDSKey)
if err != nil { if err != nil {
@ -579,9 +604,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
t.Fatal("Keys should be equal!") t.Fatal("Keys should be equal!")
} }
if dsc.queue.Len() != 0 { expectStableQueueLength(0)
t.Fatal("Unexpected item in the queue")
}
newDS := oldDS.DeepCopy() newDS := oldDS.DeepCopy()
newDS.UID = uuid.NewUUID() newDS.UID = uuid.NewUUID()
@ -595,14 +618,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
t.Fatal("New DS has the same UID as the old one!") t.Fatal("New DS has the same UID as the old one!")
} }
err = wait.PollImmediate(100*time.Millisecond, informerSyncTimeout, func() (bool, error) { waitForQueueLength(1, "recreated DS")
klog.V(8).Infof("Waiting for queue to have 1 item, currently has: %d", dsc.queue.Len())
return dsc.queue.Len() == 1, nil
})
if err != nil {
t.Fatalf("Re-creating DS didn't result in new item in the queue: %v", err)
}
ok = dsc.processNextWorkItem() ok = dsc.processNextWorkItem()
if !ok { if !ok {
t.Fatal("Queue is shutting down!") t.Fatal("Queue is shutting down!")
@ -617,7 +633,7 @@ func TestExpectationsOnRecreate(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
if !exists { if !exists {
t.Errorf("No expectations found for DaemonSet %q", oldDSKey) t.Fatalf("No expectations found for DaemonSet %q", oldDSKey)
} }
if dsExp.Fulfilled() { if dsExp.Fulfilled() {
t.Errorf("There should be unfulfiled expectation for creating new pods for DaemonSet %q", oldDSKey) t.Errorf("There should be unfulfiled expectation for creating new pods for DaemonSet %q", oldDSKey)