diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 84a8d269..336202e2 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -28,6 +28,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -97,19 +98,35 @@ func TestRunUntil(t *testing.T) { return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil }, } - go r.Run(stopCh) + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + r.Run(stopCh) + }() // Synchronously add a dummy pod into the watch channel so we // know the RunUntil go routine is in the watch handler. fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) + close(stopCh) - select { - case _, ok := <-fw.ResultChan(): - if ok { - t.Errorf("Watch channel left open after stopping the watch") + resultCh := fw.ResultChan() + for { + select { + case <-doneCh: + if resultCh == nil { + return // both closed + } + doneCh = nil + case _, ok := <-resultCh: + if ok { + t.Fatalf("Watch channel left open after stopping the watch") + } + if doneCh == nil { + return // both closed + } + resultCh = nil + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("the cancellation is at least %s late", wait.ForeverTestTimeout.String()) } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String()) - break } } @@ -126,26 +143,61 @@ func TestReflectorResyncChan(t *testing.T) { } } -// TestEstablishedWatchStoppedAfterStopCh ensures that -// an established watch will be closed right after -// the StopCh was also closed. -func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) { - ctx, ctxCancel := context.WithCancel(context.TODO()) - ctxCancel() - w := watch.NewFake() - require.False(t, w.IsStopped()) +// TestReflectorWatchStoppedBefore ensures that neither List nor Watch are +// called if the stop channel is closed before Reflector.watch is called. +func TestReflectorWatchStoppedBefore(t *testing.T) { + stopCh := make(chan struct{}) + close(stopCh) - // w is stopped when the stopCh is closed - target := NewReflector(nil, &v1.Pod{}, nil, 0) - err := target.watch(w, ctx.Done(), nil) - require.NoError(t, err) - require.True(t, w.IsStopped()) + lw := &ListWatch{ + ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { + t.Fatal("ListFunc called unexpectedly") + return nil, nil + }, + WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { + // If WatchFunc is never called, the watcher it returns doesn't need to be stopped. + t.Fatal("WatchFunc called unexpectedly") + return nil, nil + }, + } + target := NewReflector(lw, &v1.Pod{}, nil, 0) - // noop when the w is nil and the ctx is closed - err = target.watch(nil, ctx.Done(), nil) + err := target.watch(nil, stopCh, nil) require.NoError(t, err) } +// TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if +// the stop channel is closed after Reflector.watch has started watching. +func TestReflectorWatchStoppedAfter(t *testing.T) { + stopCh := make(chan struct{}) + + var watchers []*watch.FakeWatcher + + lw := &ListWatch{ + ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) { + t.Fatal("ListFunc called unexpectedly") + return nil, nil + }, + WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) { + // Simulate the stop channel being closed after watching has started + go func() { + time.Sleep(10 * time.Millisecond) + close(stopCh) + }() + // Use a fake watcher that never sends events + w := watch.NewFake() + watchers = append(watchers, w) + return w, nil + }, + } + target := NewReflector(lw, &v1.Pod{}, nil, 0) + + err := target.watch(nil, stopCh, nil) + require.NoError(t, err) + require.Equal(t, 1, len(watchers)) + require.True(t, watchers[0].IsStopped()) +} + func BenchmarkReflectorResyncChanMany(b *testing.B) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond) @@ -158,22 +210,148 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) { } } -func TestReflectorWatchHandlerError(t *testing.T) { +// TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when +// stopCh is already closed before handleWatch was called. It also ensures that +// ResultChan is only called once and that Stop is called after ResultChan. +func TestReflectorHandleWatchStoppedBefore(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) - fw := watch.NewFake() - go func() { - fw.Stop() - }() + stopCh := make(chan struct{}) + // Simulate the watch channel being closed before the watchHandler is called + close(stopCh) + var calls []string + resultCh := make(chan watch.Event) + fw := watch.MockWatcher{ + StopFunc: func() { + calls = append(calls, "Stop") + close(resultCh) + }, + ResultChanFunc: func() <-chan watch.Event { + calls = append(calls, "ResultChan") + return resultCh + }, + } + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) + if err == nil { + t.Errorf("unexpected non-error") + } + // Ensure the watcher methods are called exactly once in this exact order. + // TODO(karlkfi): Fix watchHandler to call Stop() + // assert.Equal(t, []string{"ResultChan", "Stop"}, calls) + assert.Equal(t, []string{"ResultChan"}, calls) +} + +// TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when +// stopCh is closed after handleWatch was called. It also ensures that +// ResultChan is only called once and that Stop is called after ResultChan. +func TestReflectorHandleWatchStoppedAfter(t *testing.T) { + s := NewStore(MetaNamespaceKeyFunc) + g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + var calls []string + stopCh := make(chan struct{}) + resultCh := make(chan watch.Event) + fw := watch.MockWatcher{ + StopFunc: func() { + calls = append(calls, "Stop") + close(resultCh) + }, + ResultChanFunc: func() <-chan watch.Event { + calls = append(calls, "ResultChan") + resultCh = make(chan watch.Event) + // Simulate the watch handler being stopped asynchronously by the + // caller, after watching has started. + go func() { + time.Sleep(10 * time.Millisecond) + close(stopCh) + }() + return resultCh + }, + } + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) + if err == nil { + t.Errorf("unexpected non-error") + } + // Ensure the watcher methods are called exactly once in this exact order. + // TODO(karlkfi): Fix watchHandler to call Stop() + // assert.Equal(t, []string{"ResultChan", "Stop"}, calls) + assert.Equal(t, []string{"ResultChan"}, calls) +} + +// TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch +// stops when the result channel is closed before handleWatch was called. +func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { + s := NewStore(MetaNamespaceKeyFunc) + g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + var calls []string + resultCh := make(chan watch.Event) + fw := watch.MockWatcher{ + StopFunc: func() { + calls = append(calls, "Stop") + }, + ResultChanFunc: func() <-chan watch.Event { + calls = append(calls, "ResultChan") + return resultCh + }, + } + // Simulate the result channel being closed by the producer before handleWatch is called. + close(resultCh) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) if err == nil { t.Errorf("unexpected non-error") } + // Ensure the watcher methods are called exactly once in this exact order. + // TODO(karlkfi): Fix watchHandler to call Stop() + // assert.Equal(t, []string{"ResultChan", "Stop"}, calls) + assert.Equal(t, []string{"ResultChan"}, calls) +} + +// TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch +// stops when the result channel is closed after handleWatch has started watching. +func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { + s := NewStore(MetaNamespaceKeyFunc) + g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + var calls []string + resultCh := make(chan watch.Event) + fw := watch.MockWatcher{ + StopFunc: func() { + calls = append(calls, "Stop") + }, + ResultChanFunc: func() <-chan watch.Event { + calls = append(calls, "ResultChan") + resultCh = make(chan watch.Event) + // Simulate the result channel being closed by the producer, after + // watching has started. + go func() { + time.Sleep(10 * time.Millisecond) + close(resultCh) + }() + return resultCh + }, + } + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) + if err == nil { + t.Errorf("unexpected non-error") + } + // Ensure the watcher methods are called exactly once in this exact order. + // TODO(karlkfi): Fix watchHandler to call Stop() + // assert.Equal(t, []string{"ResultChan", "Stop"}, calls) + assert.Equal(t, []string{"ResultChan"}, calls) } func TestReflectorWatchHandler(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) + // Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop + // watching after all the events have been consumed. This avoids race + // conditions which can happen if the producer calls Stop(), instead of the + // consumer. + stopCh := make(chan struct{}) + setLastSyncResourceVersion := func(rv string) { + g.setLastSyncResourceVersion(rv) + if rv == "32" { + close(stopCh) + } + } fw := watch.NewFake() s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}}) @@ -184,8 +362,8 @@ func TestReflectorWatchHandler(t *testing.T) { fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Stop() }() - err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop) - if err != nil { + err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh) + if !errors.Is(err, errorStopRequested) { t.Errorf("unexpected error %v", err) } @@ -193,6 +371,7 @@ func TestReflectorWatchHandler(t *testing.T) { return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}} } + // Validate that the Store was updated by the events table := []struct { Pod *v1.Pod exists bool @@ -215,12 +394,7 @@ func TestReflectorWatchHandler(t *testing.T) { } } - // RV should send the last version we see. - if e, a := "32", g.LastSyncResourceVersion(); e != a { - t.Errorf("expected %v, got %v", e, a) - } - - // last sync resource version should be the last version synced with store + // Validate that setLastSyncResourceVersion was called with the RV from the last event. if e, a := "32", g.LastSyncResourceVersion(); e != a { t.Errorf("expected %v, got %v", e, a) } @@ -230,8 +404,8 @@ func TestReflectorStopWatch(t *testing.T) { s := NewStore(MetaNamespaceKeyFunc) g := NewReflector(&testLW{}, &v1.Pod{}, s, 0) fw := watch.NewFake() - stopWatch := make(chan struct{}, 1) - stopWatch <- struct{}{} + stopWatch := make(chan struct{}) + close(stopWatch) err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch) if err != errorStopRequested { t.Errorf("expected stop error, got %q", err) @@ -361,6 +535,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { } } watchRet, watchErr := item.events, item.watchErr + stopCh := make(chan struct{}) lw := &testLW{ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if watchErr != nil { @@ -372,7 +547,13 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { for _, e := range watchRet { fw.Action(e.Type, e.Object) } - fw.Stop() + // Because FakeWatcher doesn't buffer events, it's safe to + // close the stop channel immediately without missing events. + // But usually, the event producer would instead close the + // result channel, and wait for the consumer to stop the + // watcher, to avoid race conditions. + // TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh) + close(stopCh) }() return fw, nil }, @@ -381,7 +562,16 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) { }, } r := NewReflector(lw, &v1.Pod{}, s, 0) - r.ListAndWatch(wait.NeverStop) + err := r.ListAndWatch(stopCh) + if item.listErr != nil && !errors.Is(err, item.listErr) { + t.Errorf("unexpected ListAndWatch error: %v", err) + } + if item.watchErr != nil && !errors.Is(err, item.watchErr) { + t.Errorf("unexpected ListAndWatch error: %v", err) + } + if item.listErr == nil && item.watchErr == nil { + assert.NoError(t, err) + } } }