diff --git a/tools/watch/informerwatcher_test.go b/tools/watch/informerwatcher_test.go index cd5eb440..1f8e16c2 100644 --- a/tools/watch/informerwatcher_test.go +++ b/tools/watch/informerwatcher_test.go @@ -32,7 +32,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/dump" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" fakeclientset "k8s.io/client-go/kubernetes/fake" testcore "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" @@ -134,71 +137,151 @@ func (a byEventTypeAndName) Less(i, j int) bool { return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name } +func newTestSecret(name, key, value string) *corev1.Secret { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + StringData: map[string]string{ + key: value, + }, + } +} + func TestNewInformerWatcher(t *testing.T) { // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. tt := []struct { - name string - objects []runtime.Object - events []watch.Event + name string + watchListFeatureEnabled bool + objects []runtime.Object + inputEvents []watch.Event + outputEvents []watch.Event }{ { - name: "basic test", + name: "WatchListClient feature disabled", + watchListFeatureEnabled: false, objects: []runtime.Object{ - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-1", - }, - StringData: map[string]string{ - "foo-1": "initial", - }, + newTestSecret("pod-1", "foo-1", "initial"), + newTestSecret("pod-2", "foo-2", "initial"), + newTestSecret("pod-3", "foo-3", "initial"), + }, + inputEvents: []watch.Event{ + { + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-2", - }, - StringData: map[string]string{ - "foo-2": "initial", - }, + { + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-3", - }, - StringData: map[string]string{ - "foo-3": "initial", - }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), }, }, - events: []watch.Event{ + outputEvents: []watch.Event{ + // When WatchListClient is disabled, ListAndWatch creates fake + // ADDED events for each object listed. { - Type: watch.Added, + Type: watch.Added, + Object: newTestSecret("pod-1", "foo-1", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-2", "foo-2", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + // Normal events follow. + { + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), + }, + { + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), + }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + }, + }, + { + name: "WatchListClient feature enabled", + watchListFeatureEnabled: true, + objects: []runtime.Object{ + newTestSecret("pod-1", "foo-1", "initial"), + newTestSecret("pod-2", "foo-2", "initial"), + newTestSecret("pod-3", "foo-3", "initial"), + }, + inputEvents: []watch.Event{ + { + Type: watch.Added, + Object: newTestSecret("pod-1", "foo-1", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-2", "foo-2", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + // ListWatch bookmark indicates that initial listing is done + { + Type: watch.Bookmark, Object: &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: "pod-4", - }, - StringData: map[string]string{ - "foo-4": "initial", + Annotations: map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + }, }, }, }, { - Type: watch.Modified, - Object: &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-2", - }, - StringData: map[string]string{ - "foo-2": "new", - }, - }, + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), }, { - Type: watch.Deleted, - Object: &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-3", - }, - }, + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), + }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + }, + outputEvents: []watch.Event{ + // When WatchListClient is enabled, WatchList receives + // ADDED events from the server for each existing object. + { + Type: watch.Added, + Object: newTestSecret("pod-1", "foo-1", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-2", "foo-2", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + // Bookmark event at the end of listing is not sent to the client. + // Normal events follow. + { + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), + }, + { + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), + }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), }, }, }, @@ -206,24 +289,28 @@ func TestNewInformerWatcher(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - var expected []watch.Event - for _, o := range tc.objects { - expected = append(expected, watch.Event{ - Type: watch.Added, - Object: o.DeepCopyObject(), - }) - } - for _, e := range tc.events { - expected = append(expected, *e.DeepCopy()) - } + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, tc.watchListFeatureEnabled) fake := fakeclientset.NewSimpleClientset(tc.objects...) - fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false) - fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil)) - - for _, e := range tc.events { - fakeWatch.Action(e.Type, e.Object) - } + inputCh := make(chan watch.Event) + inputWatcher := watch.NewProxyWatcher(inputCh) + // Indexer should stop the input watcher when the output watcher is stopped. + // But stop it at the end of the test, just in case. + defer inputWatcher.Stop() + inputStopCh := inputWatcher.StopChan() + fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(inputWatcher, nil)) + // Send events and then close the done channel + inputDoneCh := make(chan struct{}) + go func() { + defer close(inputDoneCh) + for _, e := range tc.inputEvents { + select { + case inputCh <- e: + case <-inputStopCh: + return + } + } + }() lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -233,46 +320,58 @@ func TestNewInformerWatcher(t *testing.T) { return fake.CoreV1().Secrets("").Watch(context.TODO(), options) }, } - _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{}) - + _, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{}) + outputCh := outputWatcher.ResultChan() + timeoutCh := time.After(wait.ForeverTestTimeout) var result []watch.Event loop: for { - var event watch.Event - var ok bool select { - case event, ok = <-w.ResultChan(): + case event, ok := <-outputCh: if !ok { - t.Errorf("Failed to read event: channel is already closed!") - return + t.Errorf("Output result channel closed prematurely") + break loop } - result = append(result, *event.DeepCopy()) - case <-time.After(time.Second * 1): - // All the events are buffered -> this means we are done - // Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event + if len(result) >= len(tc.outputEvents) { + break loop + } + case <-timeoutCh: + t.Error("Timed out waiting for events") break loop } } - // Informers don't guarantee event order so we need to sort these arrays to compare them - sort.Sort(byEventTypeAndName(expected)) + // Informers don't guarantee event order so we need to sort these arrays to compare them. + sort.Sort(byEventTypeAndName(tc.outputEvents)) sort.Sort(byEventTypeAndName(result)) - if !reflect.DeepEqual(expected, result) { - t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(expected), dump.Pretty(result), cmp.Diff(expected, result)) + if !reflect.DeepEqual(tc.outputEvents, result) { + t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(tc.outputEvents), dump.Pretty(result), cmp.Diff(tc.outputEvents, result)) return } - // Fill in some data to test watch closing while there are some events to be read - for _, e := range tc.events { - fakeWatch.Action(e.Type, e.Object) - } + // Send some more events, but don't read them. + // Stop producing events when the consumer stops the watcher. + go func() { + defer close(inputCh) + for _, e := range tc.inputEvents { + select { + case inputCh <- e: + case <-inputStopCh: + return + } + } + }() // Stop before reading all the data to make sure the informer can deal with closed channel - w.Stop() + outputWatcher.Stop() - <-done + select { + case <-informerDoneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Error("Timed out waiting for informer to cleanup") + } }) }