From 7c5e9038dd9c177c8f3566e97a845e345c66221e Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Mon, 3 Jun 2024 17:31:59 -0700 Subject: [PATCH] Update TestNewInformerWatcher for WatchListClient - Switch to using the ProxyWatcher to validate the dance between closing the stop channel and closing the result channel. - Use the new clientfeaturestesting.SetFeatureDuringTest to test with the WatchListClient enabled and disabled. These should result in almost the exact same output events from the informer (list ordering not garenteed), but with different input events recieved from the apiserver. Kubernetes-commit: 28e3a728e5e6fe651d7a17839d33ce42204c0b4e --- tools/watch/informerwatcher_test.go | 265 +++++++++++++++++++--------- 1 file changed, 182 insertions(+), 83 deletions(-) diff --git a/tools/watch/informerwatcher_test.go b/tools/watch/informerwatcher_test.go index cd5eb440..1f8e16c2 100644 --- a/tools/watch/informerwatcher_test.go +++ b/tools/watch/informerwatcher_test.go @@ -32,7 +32,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/dump" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" fakeclientset "k8s.io/client-go/kubernetes/fake" testcore "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" @@ -134,71 +137,151 @@ func (a byEventTypeAndName) Less(i, j int) bool { return a[i].Object.(*corev1.Secret).Name < a[j].Object.(*corev1.Secret).Name } +func newTestSecret(name, key, value string) *corev1.Secret { + return &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + StringData: map[string]string{ + key: value, + }, + } +} + func TestNewInformerWatcher(t *testing.T) { // Make sure there are no 2 same types of events on a secret with the same name or that might be flaky. tt := []struct { - name string - objects []runtime.Object - events []watch.Event + name string + watchListFeatureEnabled bool + objects []runtime.Object + inputEvents []watch.Event + outputEvents []watch.Event }{ { - name: "basic test", + name: "WatchListClient feature disabled", + watchListFeatureEnabled: false, objects: []runtime.Object{ - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-1", - }, - StringData: map[string]string{ - "foo-1": "initial", - }, + newTestSecret("pod-1", "foo-1", "initial"), + newTestSecret("pod-2", "foo-2", "initial"), + newTestSecret("pod-3", "foo-3", "initial"), + }, + inputEvents: []watch.Event{ + { + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-2", - }, - StringData: map[string]string{ - "foo-2": "initial", - }, + { + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), }, - &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-3", - }, - StringData: map[string]string{ - "foo-3": "initial", - }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), }, }, - events: []watch.Event{ + outputEvents: []watch.Event{ + // When WatchListClient is disabled, ListAndWatch creates fake + // ADDED events for each object listed. { - Type: watch.Added, + Type: watch.Added, + Object: newTestSecret("pod-1", "foo-1", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-2", "foo-2", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + // Normal events follow. + { + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), + }, + { + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), + }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + }, + }, + { + name: "WatchListClient feature enabled", + watchListFeatureEnabled: true, + objects: []runtime.Object{ + newTestSecret("pod-1", "foo-1", "initial"), + newTestSecret("pod-2", "foo-2", "initial"), + newTestSecret("pod-3", "foo-3", "initial"), + }, + inputEvents: []watch.Event{ + { + Type: watch.Added, + Object: newTestSecret("pod-1", "foo-1", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-2", "foo-2", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + // ListWatch bookmark indicates that initial listing is done + { + Type: watch.Bookmark, Object: &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ - Name: "pod-4", - }, - StringData: map[string]string{ - "foo-4": "initial", + Annotations: map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + }, }, }, }, { - Type: watch.Modified, - Object: &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-2", - }, - StringData: map[string]string{ - "foo-2": "new", - }, - }, + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), }, { - Type: watch.Deleted, - Object: &corev1.Secret{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod-3", - }, - }, + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), + }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + }, + outputEvents: []watch.Event{ + // When WatchListClient is enabled, WatchList receives + // ADDED events from the server for each existing object. + { + Type: watch.Added, + Object: newTestSecret("pod-1", "foo-1", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-2", "foo-2", "initial"), + }, + { + Type: watch.Added, + Object: newTestSecret("pod-3", "foo-3", "initial"), + }, + // Bookmark event at the end of listing is not sent to the client. + // Normal events follow. + { + Type: watch.Added, + Object: newTestSecret("pod-4", "foo-4", "initial"), + }, + { + Type: watch.Modified, + Object: newTestSecret("pod-2", "foo-2", "new"), + }, + { + Type: watch.Deleted, + Object: newTestSecret("pod-3", "foo-3", "initial"), }, }, }, @@ -206,24 +289,28 @@ func TestNewInformerWatcher(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { - var expected []watch.Event - for _, o := range tc.objects { - expected = append(expected, watch.Event{ - Type: watch.Added, - Object: o.DeepCopyObject(), - }) - } - for _, e := range tc.events { - expected = append(expected, *e.DeepCopy()) - } + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, tc.watchListFeatureEnabled) fake := fakeclientset.NewSimpleClientset(tc.objects...) - fakeWatch := watch.NewFakeWithChanSize(len(tc.events), false) - fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(fakeWatch, nil)) - - for _, e := range tc.events { - fakeWatch.Action(e.Type, e.Object) - } + inputCh := make(chan watch.Event) + inputWatcher := watch.NewProxyWatcher(inputCh) + // Indexer should stop the input watcher when the output watcher is stopped. + // But stop it at the end of the test, just in case. + defer inputWatcher.Stop() + inputStopCh := inputWatcher.StopChan() + fake.PrependWatchReactor("secrets", testcore.DefaultWatchReactor(inputWatcher, nil)) + // Send events and then close the done channel + inputDoneCh := make(chan struct{}) + go func() { + defer close(inputDoneCh) + for _, e := range tc.inputEvents { + select { + case inputCh <- e: + case <-inputStopCh: + return + } + } + }() lw := &cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { @@ -233,46 +320,58 @@ func TestNewInformerWatcher(t *testing.T) { return fake.CoreV1().Secrets("").Watch(context.TODO(), options) }, } - _, _, w, done := NewIndexerInformerWatcher(lw, &corev1.Secret{}) - + _, _, outputWatcher, informerDoneCh := NewIndexerInformerWatcher(lw, &corev1.Secret{}) + outputCh := outputWatcher.ResultChan() + timeoutCh := time.After(wait.ForeverTestTimeout) var result []watch.Event loop: for { - var event watch.Event - var ok bool select { - case event, ok = <-w.ResultChan(): + case event, ok := <-outputCh: if !ok { - t.Errorf("Failed to read event: channel is already closed!") - return + t.Errorf("Output result channel closed prematurely") + break loop } - result = append(result, *event.DeepCopy()) - case <-time.After(time.Second * 1): - // All the events are buffered -> this means we are done - // Also the one sec will make sure that we would detect RetryWatcher's incorrect behaviour after last event + if len(result) >= len(tc.outputEvents) { + break loop + } + case <-timeoutCh: + t.Error("Timed out waiting for events") break loop } } - // Informers don't guarantee event order so we need to sort these arrays to compare them - sort.Sort(byEventTypeAndName(expected)) + // Informers don't guarantee event order so we need to sort these arrays to compare them. + sort.Sort(byEventTypeAndName(tc.outputEvents)) sort.Sort(byEventTypeAndName(result)) - if !reflect.DeepEqual(expected, result) { - t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(expected), dump.Pretty(result), cmp.Diff(expected, result)) + if !reflect.DeepEqual(tc.outputEvents, result) { + t.Errorf("\nexpected: %s,\ngot: %s,\ndiff: %s", dump.Pretty(tc.outputEvents), dump.Pretty(result), cmp.Diff(tc.outputEvents, result)) return } - // Fill in some data to test watch closing while there are some events to be read - for _, e := range tc.events { - fakeWatch.Action(e.Type, e.Object) - } + // Send some more events, but don't read them. + // Stop producing events when the consumer stops the watcher. + go func() { + defer close(inputCh) + for _, e := range tc.inputEvents { + select { + case inputCh <- e: + case <-inputStopCh: + return + } + } + }() // Stop before reading all the data to make sure the informer can deal with closed channel - w.Stop() + outputWatcher.Stop() - <-done + select { + case <-informerDoneCh: + case <-time.After(wait.ForeverTestTimeout): + t.Error("Timed out waiting for informer to cleanup") + } }) }