Improve Reflector unit tests

- Add tests to confirm that Stop is always called.
- Add TODOs to show were Stop is not currently being called
  (to fix in a future PR)

Kubernetes-commit: ab5aa4762fd5206d0dbd8412d7c6f3b76533a122
This commit is contained in:
Karl Isenberg 2024-06-03 12:15:38 -07:00 committed by Kubernetes Publisher
parent 52e5651101
commit 8c9cb8838f

View File

@ -28,6 +28,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
@ -97,19 +98,35 @@ func TestRunUntil(t *testing.T) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
go r.Run(stopCh)
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
r.Run(stopCh)
}()
// Synchronously add a dummy pod into the watch channel so we
// know the RunUntil go routine is in the watch handler.
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
close(stopCh)
select {
case _, ok := <-fw.ResultChan():
if ok {
t.Errorf("Watch channel left open after stopping the watch")
resultCh := fw.ResultChan()
for {
select {
case <-doneCh:
if resultCh == nil {
return // both closed
}
doneCh = nil
case _, ok := <-resultCh:
if ok {
t.Fatalf("Watch channel left open after stopping the watch")
}
if doneCh == nil {
return // both closed
}
resultCh = nil
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("the cancellation is at least %s late", wait.ForeverTestTimeout.String())
break
}
}
@ -126,26 +143,61 @@ func TestReflectorResyncChan(t *testing.T) {
}
}
// TestEstablishedWatchStoppedAfterStopCh ensures that
// an established watch will be closed right after
// the StopCh was also closed.
func TestEstablishedWatchStoppedAfterStopCh(t *testing.T) {
ctx, ctxCancel := context.WithCancel(context.TODO())
ctxCancel()
w := watch.NewFake()
require.False(t, w.IsStopped())
// TestReflectorWatchStoppedBefore ensures that neither List nor Watch are
// called if the stop channel is closed before Reflector.watch is called.
func TestReflectorWatchStoppedBefore(t *testing.T) {
stopCh := make(chan struct{})
close(stopCh)
// w is stopped when the stopCh is closed
target := NewReflector(nil, &v1.Pod{}, nil, 0)
err := target.watch(w, ctx.Done(), nil)
require.NoError(t, err)
require.True(t, w.IsStopped())
lw := &ListWatch{
ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
t.Fatal("ListFunc called unexpectedly")
return nil, nil
},
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
// If WatchFunc is never called, the watcher it returns doesn't need to be stopped.
t.Fatal("WatchFunc called unexpectedly")
return nil, nil
},
}
target := NewReflector(lw, &v1.Pod{}, nil, 0)
// noop when the w is nil and the ctx is closed
err = target.watch(nil, ctx.Done(), nil)
err := target.watch(nil, stopCh, nil)
require.NoError(t, err)
}
// TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if
// the stop channel is closed after Reflector.watch has started watching.
func TestReflectorWatchStoppedAfter(t *testing.T) {
stopCh := make(chan struct{})
var watchers []*watch.FakeWatcher
lw := &ListWatch{
ListFunc: func(_ metav1.ListOptions) (runtime.Object, error) {
t.Fatal("ListFunc called unexpectedly")
return nil, nil
},
WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
// Simulate the stop channel being closed after watching has started
go func() {
time.Sleep(10 * time.Millisecond)
close(stopCh)
}()
// Use a fake watcher that never sends events
w := watch.NewFake()
watchers = append(watchers, w)
return w, nil
},
}
target := NewReflector(lw, &v1.Pod{}, nil, 0)
err := target.watch(nil, stopCh, nil)
require.NoError(t, err)
require.Equal(t, 1, len(watchers))
require.True(t, watchers[0].IsStopped())
}
func BenchmarkReflectorResyncChanMany(b *testing.B) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 25*time.Millisecond)
@ -158,22 +210,148 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
}
}
func TestReflectorWatchHandlerError(t *testing.T) {
// TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when
// stopCh is already closed before handleWatch was called. It also ensures that
// ResultChan is only called once and that Stop is called after ResultChan.
func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
fw := watch.NewFake()
go func() {
fw.Stop()
}()
stopCh := make(chan struct{})
// Simulate the watch channel being closed before the watchHandler is called
close(stopCh)
var calls []string
resultCh := make(chan watch.Event)
fw := watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
close(resultCh)
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
return resultCh
},
}
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
if err == nil {
t.Errorf("unexpected non-error")
}
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
}
// TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when
// stopCh is closed after handleWatch was called. It also ensures that
// ResultChan is only called once and that Stop is called after ResultChan.
func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
var calls []string
stopCh := make(chan struct{})
resultCh := make(chan watch.Event)
fw := watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
close(resultCh)
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
resultCh = make(chan watch.Event)
// Simulate the watch handler being stopped asynchronously by the
// caller, after watching has started.
go func() {
time.Sleep(10 * time.Millisecond)
close(stopCh)
}()
return resultCh
},
}
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
if err == nil {
t.Errorf("unexpected non-error")
}
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
}
// TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch
// stops when the result channel is closed before handleWatch was called.
func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
var calls []string
resultCh := make(chan watch.Event)
fw := watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
return resultCh
},
}
// Simulate the result channel being closed by the producer before handleWatch is called.
close(resultCh)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
if err == nil {
t.Errorf("unexpected non-error")
}
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
}
// TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch
// stops when the result channel is closed after handleWatch has started watching.
func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
var calls []string
resultCh := make(chan watch.Event)
fw := watch.MockWatcher{
StopFunc: func() {
calls = append(calls, "Stop")
},
ResultChanFunc: func() <-chan watch.Event {
calls = append(calls, "ResultChan")
resultCh = make(chan watch.Event)
// Simulate the result channel being closed by the producer, after
// watching has started.
go func() {
time.Sleep(10 * time.Millisecond)
close(resultCh)
}()
return resultCh
},
}
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
if err == nil {
t.Errorf("unexpected non-error")
}
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
}
func TestReflectorWatchHandler(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
// watching after all the events have been consumed. This avoids race
// conditions which can happen if the producer calls Stop(), instead of the
// consumer.
stopCh := make(chan struct{})
setLastSyncResourceVersion := func(rv string) {
g.setLastSyncResourceVersion(rv)
if rv == "32" {
close(stopCh)
}
}
fw := watch.NewFake()
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
s.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}})
@ -184,8 +362,8 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop()
}()
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, wait.NeverStop)
if err != nil {
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, nil, g.clock, nevererrc, stopCh)
if !errors.Is(err, errorStopRequested) {
t.Errorf("unexpected error %v", err)
}
@ -193,6 +371,7 @@ func TestReflectorWatchHandler(t *testing.T) {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
}
// Validate that the Store was updated by the events
table := []struct {
Pod *v1.Pod
exists bool
@ -215,12 +394,7 @@ func TestReflectorWatchHandler(t *testing.T) {
}
}
// RV should send the last version we see.
if e, a := "32", g.LastSyncResourceVersion(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
// last sync resource version should be the last version synced with store
// Validate that setLastSyncResourceVersion was called with the RV from the last event.
if e, a := "32", g.LastSyncResourceVersion(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
@ -230,8 +404,8 @@ func TestReflectorStopWatch(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&testLW{}, &v1.Pod{}, s, 0)
fw := watch.NewFake()
stopWatch := make(chan struct{}, 1)
stopWatch <- struct{}{}
stopWatch := make(chan struct{})
close(stopWatch)
err := watchHandler(time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, nil, g.clock, nevererrc, stopWatch)
if err != errorStopRequested {
t.Errorf("expected stop error, got %q", err)
@ -361,6 +535,7 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
}
}
watchRet, watchErr := item.events, item.watchErr
stopCh := make(chan struct{})
lw := &testLW{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if watchErr != nil {
@ -372,7 +547,13 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
for _, e := range watchRet {
fw.Action(e.Type, e.Object)
}
fw.Stop()
// Because FakeWatcher doesn't buffer events, it's safe to
// close the stop channel immediately without missing events.
// But usually, the event producer would instead close the
// result channel, and wait for the consumer to stop the
// watcher, to avoid race conditions.
// TODO: Fix the FakeWatcher to separate watcher.Stop from close(resultCh)
close(stopCh)
}()
return fw, nil
},
@ -381,7 +562,16 @@ func TestReflectorListAndWatchWithErrors(t *testing.T) {
},
}
r := NewReflector(lw, &v1.Pod{}, s, 0)
r.ListAndWatch(wait.NeverStop)
err := r.ListAndWatch(stopCh)
if item.listErr != nil && !errors.Is(err, item.listErr) {
t.Errorf("unexpected ListAndWatch error: %v", err)
}
if item.watchErr != nil && !errors.Is(err, item.watchErr) {
t.Errorf("unexpected ListAndWatch error: %v", err)
}
if item.listErr == nil && item.watchErr == nil {
assert.NoError(t, err)
}
}
}