Merge pull request #131266 from karlkfi/karl-reflector-refactor

fix: Watcher deadlock from Stop not being called

Kubernetes-commit: d6d6aad0d89abea236996b12d0545642f96684fb
This commit is contained in:
Kubernetes Publisher
2025-04-23 17:10:13 -07:00
4 changed files with 275 additions and 113 deletions

2
go.mod
View File

@@ -25,7 +25,7 @@ require (
golang.org/x/time v0.9.0 golang.org/x/time v0.9.0
google.golang.org/protobuf v1.36.5 google.golang.org/protobuf v1.36.5
gopkg.in/evanphx/json-patch.v4 v4.12.0 gopkg.in/evanphx/json-patch.v4 v4.12.0
k8s.io/api v0.0.0-20250423231958-d18a46229505 k8s.io/api v0.0.0-20250424031748-4c4912b7f13d
k8s.io/apimachinery v0.0.0-20250423231524-954960919938 k8s.io/apimachinery v0.0.0-20250423231524-954960919938
k8s.io/klog/v2 v2.130.1 k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff

4
go.sum
View File

@@ -146,8 +146,8 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20250423231958-d18a46229505 h1:CzcGzeX7BpE8DYfaOipLutwMTAZHcii4Hf4OrE65sX8= k8s.io/api v0.0.0-20250424031748-4c4912b7f13d h1:pL9NMGsBXJatDP3x2gdS/+S98wrLRsZtcQFzyPILoU4=
k8s.io/api v0.0.0-20250423231958-d18a46229505/go.mod h1:PEZtCkQxQ/XohDih7vofdKAdIIiS9kFruj1lF/EUMLs= k8s.io/api v0.0.0-20250424031748-4c4912b7f13d/go.mod h1:PEZtCkQxQ/XohDih7vofdKAdIIiS9kFruj1lF/EUMLs=
k8s.io/apimachinery v0.0.0-20250423231524-954960919938 h1:yoIMbzO4of8M4auqFKjNsbFlHJG9jCuoD+4sUJUPdn4= k8s.io/apimachinery v0.0.0-20250423231524-954960919938 h1:yoIMbzO4of8M4auqFKjNsbFlHJG9jCuoD+4sUJUPdn4=
k8s.io/apimachinery v0.0.0-20250423231524-954960919938/go.mod h1:tJ77gZ1upNffdrQVxg+oIoEmvSIyTbz3RIPi9HKw+nw= k8s.io/apimachinery v0.0.0-20250423231524-954960919938/go.mod h1:tJ77gZ1upNffdrQVxg+oIoEmvSIyTbz3RIPi9HKw+nw=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=

View File

@@ -406,6 +406,12 @@ func (r *Reflector) ListAndWatchWithContext(ctx context.Context) error {
useWatchList := ptr.Deref(r.UseWatchList, false) useWatchList := ptr.Deref(r.UseWatchList, false)
fallbackToList := !useWatchList fallbackToList := !useWatchList
defer func() {
if w != nil {
w.Stop()
}
}()
if useWatchList { if useWatchList {
w, err = r.watchList(ctx) w, err = r.watchList(ctx)
if w == nil && err == nil { if w == nil && err == nil {
@@ -476,12 +482,21 @@ func (r *Reflector) watchWithResync(ctx context.Context, w watch.Interface) erro
return r.watch(ctx, w, resyncerrc) return r.watch(ctx, w, resyncerrc)
} }
// watch simply starts a watch request with the server. // watch starts a watch request with the server, consumes watch events, and
// restarts the watch until an exit scenario is reached.
//
// If a watch is provided, it will be used, otherwise another will be started.
// If the watcher has started, it will always be stopped before returning.
func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error { func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc chan error) error {
stopCh := ctx.Done() stopCh := ctx.Done()
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
var err error var err error
retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock) retry := NewRetryWithDeadline(r.MaxInternalErrorRetryDuration, time.Minute, apierrors.IsInternalError, r.clock)
defer func() {
if w != nil {
w.Stop()
}
}()
for { for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
@@ -489,9 +504,6 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
case <-stopCh: case <-stopCh:
// we can only end up here when the stopCh // we can only end up here when the stopCh
// was closed after a successful watchlist or list request // was closed after a successful watchlist or list request
if w != nil {
w.Stop()
}
return nil return nil
default: default:
} }
@@ -529,8 +541,8 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha
err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion,
r.clock, resyncerrc) r.clock, resyncerrc)
// Ensure that watch will not be reused across iterations. // handleWatch always stops the watcher. So we don't need to here.
w.Stop() // Just set it to nil to trigger a retry on the next loop.
w = nil w = nil
retry.After(err) retry.After(err)
if err != nil { if err != nil {
@@ -863,6 +875,12 @@ func handleAnyWatch(
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived) initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived)
defer initialEventsEndBookmarkWarningTicker.Stop() defer initialEventsEndBookmarkWarningTicker.Stop()
stopWatcher := true
defer func() {
if stopWatcher {
w.Stop()
}
}()
loop: loop:
for { for {
@@ -929,6 +947,7 @@ loop:
} }
eventCount++ eventCount++
if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { if exitOnWatchListBookmarkReceived && watchListBookmarkReceived {
stopWatcher = false
watchDuration := clock.Since(start) watchDuration := clock.Since(start)
klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration) klog.FromContext(ctx).V(4).Info("Exiting watch because received the bookmark that marks the end of initial events stream", "reflector", name, "totalItems", eventCount, "duration", watchDuration)
return watchListBookmarkReceived, nil return watchListBookmarkReceived, nil

View File

@@ -31,6 +31,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -48,6 +50,7 @@ import (
"k8s.io/klog/v2/ktesting" "k8s.io/klog/v2/ktesting"
"k8s.io/utils/clock" "k8s.io/utils/clock"
testingclock "k8s.io/utils/clock/testing" testingclock "k8s.io/utils/clock/testing"
"k8s.io/utils/ptr"
) )
var nevererrc chan error var nevererrc chan error
@@ -161,8 +164,8 @@ func TestReflectorWatchStoppedBefore(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
} }
// TestReflectorWatchStoppedAfter ensures that neither the watcher is stopped if // TestReflectorWatchStoppedAfter ensures that Reflector.watch always stops
// the stop channel is closed after Reflector.watch has started watching. // the watcher when the stop channel is closed.
func TestReflectorWatchStoppedAfter(t *testing.T) { func TestReflectorWatchStoppedAfter(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
@@ -206,9 +209,9 @@ func BenchmarkReflectorResyncChanMany(b *testing.B) {
} }
} }
// TestReflectorHandleWatchStoppedBefore ensures that handleWatch stops when // TestReflectorHandleWatchStoppedBefore ensures that handleWatch returns when
// stopCh is already closed before handleWatch was called. It also ensures that // stopCh is already closed before handleWatch was called. It also ensures that
// ResultChan is only called once and that Stop is called after ResultChan. // ResultChan and Stop are both called once.
func TestReflectorHandleWatchStoppedBefore(t *testing.T) { func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
@@ -218,7 +221,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
cancel(errors.New("don't run")) cancel(errors.New("don't run"))
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
close(resultCh) close(resultCh)
@@ -229,18 +232,14 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) {
}, },
} }
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { require.Equal(t, err, errorStopRequested)
t.Errorf("unexpected non-error") // Ensure handleWatch calls ResultChan and Stop
} assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
// TestReflectorHandleWatchStoppedAfter ensures that handleWatch stops when // TestReflectorHandleWatchStoppedAfter ensures that handleWatch returns when
// stopCh is closed after handleWatch was called. It also ensures that // stopCh is closed after handleWatch was called. It also ensures that
// ResultChan is only called once and that Stop is called after ResultChan. // ResultChan and Stop are both called once.
func TestReflectorHandleWatchStoppedAfter(t *testing.T) { func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
@@ -248,7 +247,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
close(resultCh) close(resultCh)
@@ -266,24 +265,21 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) {
}, },
} }
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { require.Equal(t, err, errorStopRequested)
t.Errorf("unexpected non-error") // Ensure handleWatch calls ResultChan and Stop
} assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// Ensure the watcher methods are called exactly once in this exact order.
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
// TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch // TestReflectorHandleWatchResultChanClosedBefore ensures that handleWatch
// stops when the result channel is closed before handleWatch was called. // returns when the result channel is closed before handleWatch was called.
// It also ensures that ResultChan and Stop are both called once.
func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
}, },
@@ -295,24 +291,22 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) {
// Simulate the result channel being closed by the producer before handleWatch is called. // Simulate the result channel being closed by the producer before handleWatch is called.
close(resultCh) close(resultCh)
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { // TODO(karlkfi): Add exact error type for "very short watch"
t.Errorf("unexpected non-error") require.Error(t, err)
} // Ensure handleWatch calls ResultChan and Stop
// Ensure the watcher methods are called exactly once in this exact order. assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
// TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch // TestReflectorHandleWatchResultChanClosedAfter ensures that handleWatch
// stops when the result channel is closed after handleWatch has started watching. // returns when the result channel is closed after handleWatch has started
// watching. It also ensures that ResultChan and Stop are both called once.
func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
var calls []string var calls []string
resultCh := make(chan watch.Event) resultCh := make(chan watch.Event)
fw := watch.MockWatcher{ fw := &watch.MockWatcher{
StopFunc: func() { StopFunc: func() {
calls = append(calls, "Stop") calls = append(calls, "Stop")
}, },
@@ -329,22 +323,17 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) {
}, },
} }
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err == nil { // TODO(karlkfi): Add exact error type for "very short watch"
t.Errorf("unexpected non-error") require.Error(t, err)
} // Ensure handleWatch calls ResultChan and Stop
// Ensure the watcher methods are called exactly once in this exact order. assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
// TODO(karlkfi): Fix watchHandler to call Stop()
// assert.Equal(t, []string{"ResultChan", "Stop"}, calls)
assert.Equal(t, []string{"ResultChan"}, calls)
} }
func TestReflectorWatchHandler(t *testing.T) { func TestReflectorWatchHandler(t *testing.T) {
s := NewStore(MetaNamespaceKeyFunc) s := NewStore(MetaNamespaceKeyFunc)
g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0) g := NewReflector(&ListWatch{}, &v1.Pod{}, s, 0)
// Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop // Wrap setLastSyncResourceVersion so we can tell the watchHandler to stop
// watching after all the events have been consumed. This avoids race // watching after all the events have been consumed.
// conditions which can happen if the producer calls Stop(), instead of the
// consumer.
_, ctx := ktesting.NewTestContext(t) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
setLastSyncResourceVersion := func(rv string) { setLastSyncResourceVersion := func(rv string) {
@@ -361,13 +350,11 @@ func TestReflectorWatchHandler(t *testing.T) {
fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) fw.Delete(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})
fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}}) fw.Modify(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "55"}})
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}}) fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "32"}})
fw.Stop() // Stop means that the consumer is done reading events.
// So let handleWatch call fw.Stop, after the Context is cancelled.
}() }()
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, setLastSyncResourceVersion, g.clock, nevererrc)
// TODO(karlkfi): Fix FakeWatcher to avoid race condition between watcher.Stop() & close(stopCh) require.Equal(t, err, errorStopRequested)
if err != nil && !errors.Is(err, errorStopRequested) {
t.Errorf("unexpected error %v", err)
}
mkPod := func(id string, rv string) *v1.Pod { mkPod := func(id string, rv string) *v1.Pod {
return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}} return &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: rv}}
@@ -410,70 +397,226 @@ func TestReflectorStopWatch(t *testing.T) {
ctx, cancel := context.WithCancelCause(ctx) ctx, cancel := context.WithCancelCause(ctx)
cancel(errors.New("don't run")) cancel(errors.New("don't run"))
err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc)
if err != errorStopRequested { require.Equal(t, err, errorStopRequested)
t.Errorf("expected stop error, got %q", err)
}
} }
func TestReflectorListAndWatch(t *testing.T) { func TestReflectorListAndWatch(t *testing.T) {
_, ctx := ktesting.NewTestContext(t) type listResult struct {
ctx, cancel := context.WithCancel(ctx) Object runtime.Object
defer cancel() Error error
createdFakes := make(chan *watch.FakeWatcher) }
table := []struct {
name string
useWatchList bool
listResults []listResult
watchEvents []watch.Event
expectedListOptions []metav1.ListOptions
expectedWatchOptions []metav1.ListOptions
expectedStore []metav1.Object
}{
{
name: "UseWatchList enabled",
useWatchList: true,
watchEvents: []watch.Event{
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}},
},
{
Type: watch.Bookmark,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{
Name: "foo",
ResourceVersion: "1",
Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"},
}},
},
{
Type: watch.Modified,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
expectedWatchOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: true,
ResourceVersion: "",
// ResourceVersionMatch defaults to "NotOlderThan" when
// ResourceVersion and Limit are empty.
ResourceVersionMatch: "NotOlderThan",
SendInitialEvents: ptr.To(true),
},
},
expectedStore: []metav1.Object{
// Pod "foo" with rv "1" is de-duped by rv 2
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
{
name: "UseWatchList disabled",
useWatchList: false,
listResults: []listResult{
{
Object: &v1.PodList{
ListMeta: metav1.ListMeta{ResourceVersion: "1"},
Items: []v1.Pod{
{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1"}},
},
},
},
},
watchEvents: []watch.Event{
{
Type: watch.Modified,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
},
{
Type: watch.Added,
Object: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
expectedListOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: false,
ResourceVersion: "0",
// ResourceVersionMatch defaults to "NotOlderThan" when
// ResourceVersion is set and non-zero.
Limit: 500,
SendInitialEvents: nil,
},
},
expectedWatchOptions: []metav1.ListOptions{
{
AllowWatchBookmarks: true,
ResourceVersion: "1",
// ResourceVersionMatch is not used by Watch calls
SendInitialEvents: nil,
},
},
expectedStore: []metav1.Object{
// Pod "foo" with rv "1" is de-duped by rv 2
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "2"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "3"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "baz", ResourceVersion: "4"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "qux", ResourceVersion: "5"}},
&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "zoo", ResourceVersion: "6"}},
},
},
}
for _, tc := range table {
t.Run(tc.name, func(t *testing.T) {
watcherCh := make(chan *watch.FakeWatcher)
var listOpts, watchOpts []metav1.ListOptions
// The ListFunc says that it's at revision 1. Therefore, we expect our WatchFunc // The ListFunc will never be called. So we expect Watch to only be called
// to get called at the beginning of the watch with 1, and again with 3 when we // with options.ResourceVersion="" to start the WatchList.
// inject an error. lw := &ListWatch{
expectedRVs := []string{"1", "3"} WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
lw := &ListWatch{ watchOpts = append(watchOpts, options)
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { if len(watchOpts) > len(tc.expectedWatchOptions) {
rv := options.ResourceVersion return nil, fmt.Errorf("Expected ListerWatcher.Watch to only be called %d times",
fw := watch.NewFake() len(tc.expectedWatchOptions))
if e, a := expectedRVs[0], rv; e != a { }
t.Errorf("Expected rv %v, but got %v", e, a) w := watch.NewFake()
// Enqueue for event producer to use
go func() { watcherCh <- w }()
t.Log("Watcher Started")
return w, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
listOpts = append(listOpts, options)
if len(listOpts) > len(tc.listResults) {
return nil, fmt.Errorf("Expected ListerWatcher.List to only be called %d times",
len(tc.listResults))
}
listResult := tc.listResults[len(listOpts)-1]
return listResult.Object, listResult.Error
},
} }
expectedRVs = expectedRVs[1:] s := NewFIFO(MetaNamespaceKeyFunc)
// channel is not buffered because the for loop below needs to block. But r := NewReflector(lw, &v1.Pod{}, s, 0)
// we don't want to block here, so report the new fake via a go routine. r.UseWatchList = ptr.To(tc.useWatchList)
go func() { createdFakes <- fw }()
return fw, nil
},
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "1"}}, nil
},
}
s := NewFIFO(MetaNamespaceKeyFunc)
r := NewReflector(lw, &v1.Pod{}, s, 0)
go func() { assert.NoError(t, r.ListAndWatchWithContext(ctx)) }()
ids := []string{"foo", "bar", "baz", "qux", "zoo"} // Start ListAndWatch in the background.
var fw *watch.FakeWatcher // When it returns, it will send an error or nil on the error
for i, id := range ids { // channel and close the error channel.
if fw == nil { _, ctx := ktesting.NewTestContext(t)
fw = <-createdFakes ctx, cancel := context.WithCancel(ctx)
} defer cancel()
sendingRV := strconv.FormatUint(uint64(i+2), 10) errCh := make(chan error)
fw.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: id, ResourceVersion: sendingRV}}) go func() {
if sendingRV == "3" { defer close(errCh)
// Inject a failure. errCh <- r.ListAndWatchWithContext(ctx)
fw.Stop() }()
fw = nil // Stop ListAndWatch and wait for the error channel to close.
} // Validate it didn't error in Cleanup, not a defer.
} t.Cleanup(func() {
cancel()
for err := range errCh {
assert.NoError(t, err)
}
})
// Verify we received the right ids with the right resource versions. // Send watch events
for i, id := range ids { var fw *watch.FakeWatcher
pod := Pop(s).(*v1.Pod) for _, event := range tc.watchEvents {
if e, a := id, pod.Name; e != a { if fw == nil {
t.Errorf("%v: Expected %v, got %v", i, e, a) // Wait for ListerWatcher.Watch to be called
} fw = <-watcherCh
if e, a := strconv.FormatUint(uint64(i+2), 10), pod.ResourceVersion; e != a { }
t.Errorf("%v: Expected %v, got %v", i, e, a) obj := event.Object.(metav1.Object)
} t.Logf("Sending %s event: name=%s, resourceVersion=%s",
} event.Type, obj.GetName(), obj.GetResourceVersion())
fw.Action(event.Type, event.Object)
}
if len(expectedRVs) != 0 { // Verify we received the right objects with the right resource versions.
t.Error("called watchStarter an unexpected number of times") for _, expectedObj := range tc.expectedStore {
storeObj := Pop(s).(metav1.Object)
assert.Equal(t, expectedObj.GetName(), storeObj.GetName())
assert.Equal(t, expectedObj.GetResourceVersion(), storeObj.GetResourceVersion())
}
// Verify we received the right number of List & Watch calls,
// with the expected options.
diffOpts := cmpopts.IgnoreFields(metav1.ListOptions{}, "TimeoutSeconds")
if diff := cmp.Diff(tc.expectedListOptions, listOpts, diffOpts); diff != "" {
t.Errorf("Unexpected List calls by ListAndWatch:\n%s", diff)
}
if diff := cmp.Diff(tc.expectedWatchOptions, watchOpts, diffOpts); diff != "" {
t.Errorf("Unexpected Watch calls by ListAndWatch:\n%s", diff)
}
})
} }
} }