fix: Watcher deadlock from Stop not being called

Change:
- refactor Reflector.ListAndWatch and Reflector.watch to always call
  watcher.Stop.
- refactor Reflector.handleAnyWatch to always call watcher.Stop,
  EXCEPT when exitOnWatchListBookmarkReceived &&
  watchListBookmarkReceived.
- Update unit tests with these new expectations.

Effect:
- ensures watcher.Stop is always called at least once
- avoids deadlocks in watcher implementations when watcher.Watch is
  called, but watcher.Stop is never called.

Note: It's impossible to guarantee that Stop will only be called once.
So watch.Interface implementations must tollerate Stop being called
multiple times.

Kubernetes-commit: 3e609ecf6e945bf4562bddfc563fde9a4c3d0d90
This commit is contained in:
Karl Isenberg 2025-04-11 14:30:29 -07:00 committed by Kubernetes Publisher
parent 45e758d19c
commit e9ce1dd558
2 changed files with 272 additions and 110 deletions

View File

@ -406,6 +406,12 @@ func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
useWatchList := ptr.Deref(r.UseWatchList, false) useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList fallbackToList := !useWatchList
defer func() {
if w != nil {
w.Stop()
}
}()
if useWatchList { if useWatchList {
w, err = r.watchList(ctx) w, err = r.watchList(ctx)
if w == nil && err == nil { if w == nil && err == nil {
@ -476,12 +482,21 @@ func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) erro
return r.watch(ctx, w, resyncerrc) return r.watch(ctx, w, resyncerrc)
} }
// watch simply starts a watch request with the server. // watch starts a watch request with the server, consumes watch events, and
// restarts the watch until an exit scenario is reached.
//
// If a watch is provided, it will be used, otherwise another will be started.
// If the watcher has started, it will always be stopped before returning.
func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error { func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
stopCh := ctx.Done() stopCh := ctx.Done()
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
var err error var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
defer func() {
if w != nil {
w.Stop()
}
}()
for { for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
@ -489,9 +504,6 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
case <-stopCh: case <-stopCh:
// we can only end up here when the stopCh // we can only end up here when the stopCh
// was closed after a successful watchlist or list request // was closed after a successful watchlist or list request
if w != nil {
w.Stop()
}
return nil return nil
default: default:
} }
@ -529,8 +541,8 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc) r.clock, resyncerrc)
// Ensure that watch will not be reused across iterations. // handleWatch always stops the watcher. So we don't need to here.
w.Stop() // Just set it to nil to trigger a retry on the next loop.
w = nil w = nil
retry.After(err) retry.After(err)
if err != nil { if err != nil {
@ -863,6 +875,12 @@ func handleAnyWatch(
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived) initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived)
defer initialEventsEndBookmarkWarningTicker.Stop() defer initialEventsEndBookmarkWarningTicker.Stop()
stopWatcher := true
defer func() {
if stopWatcher {
w.Stop()
}
}()
loop: loop:
for { for {
@ -929,6 +947,7 @@ loop:
} }
eventCount++ eventCount++
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
stopWatcher = false
watchDuration := clock.Since(start) watchDuration := clock.Since(start)
klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration) klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration)
return watchListBookmarkReceived, nil return watchListBookmarkReceived, nil

View File

@ -31,6 +31,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -48,6 +50,7 @@ import (
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock" "k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
) )
var nevererrc chan error var nevererrc chan error
@ -161,8 +164,8 @@ func TestReflectorWatchStoppedBefore(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
// TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if // TestReflectorWatchStoppedAfter ensures that Reflector.watch always stops
// the stop channel is closed after Reflector.watch has started watching. // the watcher when the stop channel is closed.
func TestReflectorWatchStoppedAfter(t *testing.T) { func TestReflectorWatchStoppedAfter(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
@ -206,9 +209,9 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
} }
} }
// TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when // TestReflectorHandleWatchStoppedBefore ensures that handleWatch returns when
// stopCh is already closed before handleWatch was called. It also ensures that // stopCh is already closed before handleWatch was called. It also ensures that
// ResultChan is only called once and that Stop is called after ResultChan. // ResultChan and Stop are both called once.
func TestReflectorHandleWatchStoppedBefore(t *testing.T) { func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
@ -218,7 +221,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
cancel(errors.New("don't run")) cancel(errors.New("don't run"))
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
close(resultCh) close(resultCh)
@ -229,18 +232,14 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
}, },
} }
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { require.Equal(t, err, errorStopRequested)
t.Errorf("unexpected non-error") // Ensure handleWatch calls ResultChan and Stop
} assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
// TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when // TestReflectorHandleWatchStoppedAfter ensures that handleWatch returns when
// stopCh is closed after handleWatch was called. It also ensures that // stopCh is closed after handleWatch was called. It also ensures that
// ResultChan is only called once and that Stop is called after ResultChan. // ResultChan and Stop are both called once.
func TestReflectorHandleWatchStoppedAfter(t *testing.T) { func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
@ -248,7 +247,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
close(resultCh) close(resultCh)
@ -266,24 +265,21 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
}, },
} }
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { require.Equal(t, err, errorStopRequested)
t.Errorf("unexpected non-error") // Ensure handleWatch calls ResultChan and Stop
} assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
// TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch // TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch
// stops when the result channel is closed before handleWatch was called. // returns when the result channel is closed before handleWatch was called.
// It also ensures that ResultChan and Stop are both called once.
func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
}, },
@ -295,24 +291,22 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
// Simulate the result channel being closed by the producer before handleWatch is called. // Simulate the result channel being closed by the producer before handleWatch is called.
close(resultCh) close(resultCh)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { // TODO(karlkfi): Add exact error type for "very short watch"
t.Errorf("unexpected non-error") require.Error(t, err)
} // Ensure handleWatch calls ResultChan and Stop
// Ensure the watcher methods are called exactly once in this exact order. assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
// TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch // TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch
// stops when the result channel is closed after handleWatch has started watching. // returns when the result channel is closed after handleWatch has started
// watching. It also ensures that ResultChan and Stop are both called once.
func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
}, },
@ -329,22 +323,17 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
}, },
} }
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { // TODO(karlkfi): Add exact error type for "very short watch"
t.Errorf("unexpected non-error") require.Error(t, err)
} // Ensure handleWatch calls ResultChan and Stop
// Ensure the watcher methods are called exactly once in this exact order. assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
func TestReflectorWatchHandler(t *testing.T) { func TestReflectorWatchHandler(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop // Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
// watching after all the events have been consumed. This avoids race // watching after all the events have been consumed.
// conditions which can happen if the producer calls Stop(), instead of the
// consumer.
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
setLastSyncResourceVersion := func(rv string) { setLastSyncResourceVersion := func(rv string) {
@ -361,13 +350,11 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}}) fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}})
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop() // Stop means that the consumer is done reading events.
// So let handleWatch call fw.Stop, after the Context is cancelled.
}() }()
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc)
// TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh) require.Equal(t, err, errorStopRequested)
if err != nil && !errors.Is(err, errorStopRequested) {
t.Errorf("unexpected error %v", err)
}
mkPod := func(id string, rv string) *v1.Pod { mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}} return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
@ -410,70 +397,226 @@ func TestReflectorStopWatch(t *testing.T) {
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
cancel(errors.New("don't run")) cancel(errors.New("don't run"))
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err != errorStopRequested { require.Equal(t, err, errorStopRequested)
t.Errorf("expected stop error, got %q", err)
}
} }
func TestReflectorListAndWatch(t *testing.T) { func TestReflectorListAndWatch(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) type listResult struct {
ctx, cancel := context.WithCancel(ctx) Object runtime.Object
defer cancel() Error error
createdFakes := make(chan *watch.FakeWatcher) }
table := []struct {
name string
useWatchList bool
listResults []listResult
watchEvents []watch.Event
expectedListOptions []metav1.ListOptions
expectedWatchOptions []metav1.ListOptions
expectedStore []metav1.Object
}{
{
name: "UseWatchList enabled",
useWatchList: true,
watchEvents: []watch.Event{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}},
},
{
Type: watch.Bookmark,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
}},
},
{
Type: watch.Modified,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
expectedWatchOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: true,
ResourceVersion: "",
// ResourceVersionMatch defaults to "NotOlderThan" when
// ResourceVersion and Limit are empty.
ResourceVersionMatch: "NotOlderThan",
SendInitialEvents: ptr.To(true),
},
},
expectedStore: []metav1.Object{
// Pod "foo" with rv "1" is de-duped by rv 2
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
{
name: "UseWatchList disabled",
useWatchList: false,
listResults: []listResult{
{
Object: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}},
},
},
},
},
watchEvents: []watch.Event{
{
Type: watch.Modified,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
expectedListOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: false,
ResourceVersion: "0",
// ResourceVersionMatch defaults to "NotOlderThan" when
// ResourceVersion is set and non-zero.
Limit: 500,
SendInitialEvents: nil,
},
},
expectedWatchOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: true,
ResourceVersion: "1",
// ResourceVersionMatch is not used by Watch calls
SendInitialEvents: nil,
},
},
expectedStore: []metav1.Object{
// Pod "foo" with rv "1" is de-duped by rv 2
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
}
for _, tc := range table {
t.Run(tc.name, func(t *testing.T) {
watcherCh := make(chan *watch.FakeWatcher)
var listOpts, watchOpts []metav1.ListOptions
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc // The ListFunc will never be called. So we expect Watch to only be called
// to get called at the beginning of the watch with 1, and again with 3 when we // with options.ResourceVersion="" to start the WatchList.
// inject an error. lw := &ListWatch{
expectedRVs := []string{"1", "3"} WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
lw := &ListWatch{ watchOpts = append(watchOpts, options)
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if len(watchOpts) > len(tc.expectedWatchOptions) {
rv := options.ResourceVersion return nil, fmt.Errorf("Expected ListerWatcher.Watch to only be called %d times",
fw := watch.NewFake() len(tc.expectedWatchOptions))
if e, a := expectedRVs[0], rv; e != a { }
t.Errorf("Expected rv %v, but got %v", e, a) w := watch.NewFake()
// Enqueue for event producer to use
go func() { watcherCh <- w }()
t.Log("Watcher Started")
return w, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listOpts = append(listOpts, options)
if len(listOpts) > len(tc.listResults) {
return nil, fmt.Errorf("Expected ListerWatcher.List to only be called %d times",
len(tc.listResults))
}
listResult := tc.listResults[len(listOpts)-1]
return listResult.Object, listResult.Error
},
} }
expectedRVs = expectedRVs[1:] s := NewFIFO(MetaNamespaceKeyFunc)
// channel is not buffered because the for loop below needs to block. But r := NewReflector(lw, &v1.Pod{}, s, 0)
// we don't want to block here, so report the new fake via a go routine. r.UseWatchList = ptr.To(tc.useWatchList)
go func() { createdFakes <- fw }()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }()
ids := []string{"foo", "bar", "baz", "qux", "zoo"} // Start ListAndWatch in the background.
var fw *watch.FakeWatcher // When it returns, it will send an error or nil on the error
for i, id := range ids { // channel and close the error channel.
if fw == nil { _, ctx := ktesting.NewTestContext(t)
fw = <-createdFakes ctx, cancel := context.WithCancel(ctx)
} defer cancel()
sendingRV := strconv.FormatUint(uint64(i+2), 10) errCh := make(chan error)
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}}) go func() {
if sendingRV == "3" { defer close(errCh)
// Inject a failure. errCh <- r.ListAndWatchWithContext(ctx)
fw.Stop() }()
fw = nil // Stop ListAndWatch and wait for the error channel to close.
} // Validate it didn't error in Cleanup, not a defer.
} t.Cleanup(func() {
cancel()
for err := range errCh {
assert.NoError(t, err)
}
})
// Verify we received the right ids with the right resource versions. // Send watch events
for i, id := range ids { var fw *watch.FakeWatcher
pod := Pop(s).(*v1.Pod) for _, event := range tc.watchEvents {
if e, a := id, pod.Name; e != a { if fw == nil {
t.Errorf("%v: Expected %v, got %v", i, e, a) // Wait for ListerWatcher.Watch to be called
} fw = <-watcherCh
if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a { }
t.Errorf("%v: Expected %v, got %v", i, e, a) obj := event.Object.(metav1.Object)
} t.Logf("Sending %s event: name=%s, resourceVersion=%s",
} event.Type, obj.GetName(), obj.GetResourceVersion())
fw.Action(event.Type, event.Object)
}
if len(expectedRVs) != 0 { // Verify we received the right objects with the right resource versions.
t.Error("called watchStarter an unexpected number of times") for _, expectedObj := range tc.expectedStore {
storeObj := Pop(s).(metav1.Object)
assert.Equal(t, expectedObj.GetName(), storeObj.GetName())
assert.Equal(t, expectedObj.GetResourceVersion(), storeObj.GetResourceVersion())
}
// Verify we received the right number of List & Watch calls,
// with the expected options.
diffOpts := cmpopts.IgnoreFields(metav1.ListOptions{}, "TimeoutSeconds")
if diff := cmp.Diff(tc.expectedListOptions, listOpts, diffOpts); diff != "" {
t.Errorf("Unexpected List calls by ListAndWatch:\n%s", diff)
}
if diff := cmp.Diff(tc.expectedWatchOptions, watchOpts, diffOpts); diff != "" {
t.Errorf("Unexpected Watch calls by ListAndWatch:\n%s", diff)
}
})
} }
} }