diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 694a77fce..4d8d73d3b 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -520,6 +520,8 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha // start the clock before sending the request, since some proxies won't flush headers until after the first watch event is sent start := r.clock.Now() + // if w is already initialized, it must be past any synthetic non-rv-ordered added events + propagateRVFromStart := true if w == nil { timeoutSeconds := int64(r.minWatchTimeout.Seconds() * (rand.Float64() + 1.0)) options := metav1.ListOptions{ @@ -532,6 +534,11 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha // watch bookmarks, it will ignore this field). AllowWatchBookmarks: true, } + if options.ResourceVersion == "" || options.ResourceVersion == "0" { + // if we're starting the watch at a resource version that will get synthetic ADDED events in non-rv order, + // wait until we're through that set of events before propagating the RV + propagateRVFromStart = false + } w, err = r.listerWatcher.WatchWithContext(ctx, options) if err != nil { @@ -548,7 +555,25 @@ func (r *Reflector) watch(ctx context.Context, w watch.Interface, resyncerrc cha } } - err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, r.setLastSyncResourceVersion, + err = handleWatch(ctx, start, w, r.store, r.expectedType, r.expectedGVK, r.name, r.typeDescription, + func(rv string, eventReceivedBesidesAdded bool) { + // We update the resource version in the store only if we have received at least one event that is + // not an added event, or if the resource version has been set previously. This is because we can + // encounter 2 scenarios: + // 1. The watch is started from a resource version specified by the LastSyncResourceVersion field. + // In this case, we can update the resource version in the store without worrying about it being + // out of order since we will not receive any synthetic added events for resources that may be + // out of order. + // 2. The watch is started when the LastSyncResourceVersion field is empty. In this case, we may not + // update the LastSyncResourceVersion until we receive at least one event that is not an added + // event, since that is the only way to ensure that the watch has exited the initial list phase. + if propagateRVFromStart || eventReceivedBesidesAdded { + r.setLastSyncResourceVersion(rv) + if rvu, ok := r.store.(ResourceVersionUpdater); ok { + rvu.UpdateResourceVersion(rv) + } + } + }, r.clock, resyncerrc) // handleWatch always stops the watcher. So we don't need to here. // Just set it to nil to trigger a retry on the next loop. @@ -775,7 +800,11 @@ func (r *Reflector) watchList(ctx context.Context) (watch.Interface, error) { return nil, err } watchListBookmarkReceived, err := handleListWatch(ctx, start, w, temporaryStore, r.expectedType, r.expectedGVK, r.name, r.typeDescription, - func(rv string) { resourceVersion = rv }, + func(rv string, eventReceivedBesidesAdded bool) { + if eventReceivedBesidesAdded { + resourceVersion = rv + } + }, r.clock, make(chan error)) if err != nil { w.Stop() // stop and retry with clean state @@ -832,7 +861,7 @@ func handleListWatch( expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, - setLastSyncResourceVersion func(string), + setLastSyncResourceVersion func(string, bool), clock clock.Clock, errCh chan error, ) (bool, error) { @@ -853,7 +882,7 @@ func handleWatch( expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, - setLastSyncResourceVersion func(string), + setLastSyncResourceVersion func(string, bool), clock clock.Clock, errCh chan error, ) error { @@ -881,12 +910,13 @@ func handleAnyWatch( expectedGVK *schema.GroupVersionKind, name string, expectedTypeName string, - setLastSyncResourceVersion func(string), + setLastSyncResourceVersion func(string, bool), exitOnWatchListBookmarkReceived bool, clock clock.Clock, errCh chan error, ) (bool, error) { watchListBookmarkReceived := false + eventReceivedBesidesAdded := false eventCount := 0 logger := klog.FromContext(ctx) initialEventsEndBookmarkWarningTicker := newInitialEventsEndBookmarkTicker(logger, name, clock, start, exitOnWatchListBookmarkReceived) @@ -946,6 +976,7 @@ loop: utilruntime.HandleErrorWithContext(ctx, err, "Unable to add watch event object to store", "reflector", name, "object", event.Object) } case watch.Modified: + eventReceivedBesidesAdded = true err := store.Update(event.Object) if err != nil { utilruntime.HandleErrorWithContext(ctx, err, "Unable to update watch event object to store", "reflector", name, "object", event.Object) @@ -954,22 +985,22 @@ loop: // TODO: Will any consumers need access to the "last known // state", which is passed in event.Object? If so, may need // to change this. + eventReceivedBesidesAdded = true err := store.Delete(event.Object) if err != nil { utilruntime.HandleErrorWithContext(ctx, err, "Unable to delete watch event object from store", "reflector", name, "object", event.Object) } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion + eventReceivedBesidesAdded = true if meta.GetAnnotations()[metav1.InitialEventsAnnotationKey] == "true" { watchListBookmarkReceived = true } default: utilruntime.HandleErrorWithContext(ctx, err, "Unknown watch event", "reflector", name, "event", event) } - setLastSyncResourceVersion(resourceVersion) - if rvu, ok := store.(ResourceVersionUpdater); ok { - rvu.UpdateResourceVersion(resourceVersion) - } + // when eventReceivedBesidesAdded is true, that indicates we are definitely past any initial synthetic Added events + setLastSyncResourceVersion(resourceVersion, eventReceivedBesidesAdded) eventCount++ if exitOnWatchListBookmarkReceived && watchListBookmarkReceived { stopWatcher = false diff --git a/tools/cache/reflector_test.go b/tools/cache/reflector_test.go index 5554b16f2..1d1400feb 100644 --- a/tools/cache/reflector_test.go +++ b/tools/cache/reflector_test.go @@ -235,7 +235,7 @@ func TestReflectorHandleWatchStoppedBefore(t *testing.T) { return resultCh }, } - err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc) require.Equal(t, err, errorStopRequested) // Ensure handleWatch calls ResultChan and Stop assert.Equal(t, []string{"ResultChan", "Stop"}, calls) @@ -268,7 +268,7 @@ func TestReflectorHandleWatchStoppedAfter(t *testing.T) { return resultCh }, } - err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc) require.Equal(t, err, errorStopRequested) // Ensure handleWatch calls ResultChan and Stop assert.Equal(t, []string{"ResultChan", "Stop"}, calls) @@ -294,7 +294,7 @@ func TestReflectorHandleWatchResultChanClosedBefore(t *testing.T) { } // Simulate the result channel being closed by the producer before handleWatch is called. close(resultCh) - err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc) require.Equal(t, &VeryShortWatchError{Name: g.name}, err) // Ensure handleWatch calls ResultChan and Stop assert.Equal(t, []string{"ResultChan", "Stop"}, calls) @@ -325,7 +325,7 @@ func TestReflectorHandleWatchResultChanClosedAfter(t *testing.T) { return resultCh }, } - err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc) require.Equal(t, &VeryShortWatchError{Name: g.name}, err) // Ensure handleWatch calls ResultChan and Stop assert.Equal(t, []string{"ResultChan", "Stop"}, calls) @@ -338,7 +338,7 @@ func TestReflectorWatchHandler(t *testing.T) { // watching after all the events have been consumed. _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancelCause(ctx) - setLastSyncResourceVersion := func(rv string) { + setLastSyncResourceVersion := func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) if rv == "32" { cancel(errors.New("LastSyncResourceVersion is 32")) @@ -398,7 +398,7 @@ func TestReflectorStopWatch(t *testing.T) { _, ctx := ktesting.NewTestContext(t) ctx, cancel := context.WithCancelCause(ctx) cancel(errors.New("don't run")) - err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, g.setLastSyncResourceVersion, g.clock, nevererrc) + err := handleWatch(ctx, time.Now(), fw, s, g.expectedType, g.expectedGVK, g.name, g.typeDescription, func(rv string, _ bool) { g.setLastSyncResourceVersion(rv) }, g.clock, nevererrc) require.Equal(t, err, errorStopRequested) } diff --git a/tools/cache/reflector_watchlist_test.go b/tools/cache/reflector_watchlist_test.go index 50ebe3e62..ef86ca6de 100644 --- a/tools/cache/reflector_watchlist_test.go +++ b/tools/cache/reflector_watchlist_test.go @@ -168,6 +168,68 @@ func TestInitialEventsEndBookmarkTicker(t *testing.T) { }) } +func TestWatchListResourceVersion(t *testing.T) { + scenarios := []struct { + name string + watchEvents []watch.Event + expectedResourceVersion string + }{ + { + name: "empty resource version", + watchEvents: []watch.Event{}, + expectedResourceVersion: "", + }, + { + name: "empty resource version without bookmark event", + watchEvents: []watch.Event{{Type: watch.Added, Object: makePod("p1", "1")}}, + expectedResourceVersion: "", + }, + { + name: "non empty resource version with bookmark event and initial events annotation", + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{metav1.InitialEventsAnnotationKey: "true"}, + }, + }}, + }, + expectedResourceVersion: "2", + }, + } + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + scenario := scenario + _, ctx := ktesting.NewTestContext(t) + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + + lw, s, r, ctx, cancel := testData(ctx) + + go func() { + for _, e := range scenario.watchEvents { + lw.fakeWatcher.Action(e.Type, e.Object) + } + cancel(errors.New("done")) + }() + + curRV := "" + trackRV := func(rv string, eventReceivedBesidesAdded bool) { + if eventReceivedBesidesAdded { + curRV = rv + } + } + + _, err := handleListWatch(ctx, time.Now(), lw, s, r.expectedType, r.expectedGVK, r.name, r.typeDescription, trackRV, r.clock, nevererrc) + if err != nil && !errors.Is(err, errorStopRequested) { + t.Errorf("expected errorStopRequested, got %v", err) + } + + require.Equal(t, scenario.expectedResourceVersion, curRV) + }) + } +} + func TestWatchList(t *testing.T) { scenarios := []struct { name string @@ -632,7 +694,7 @@ func testData(ctx context.Context) (*fakeListWatcher, Store, *Reflector, context s := NewStore(MetaNamespaceKeyFunc) lw := &fakeListWatcher{ fakeWatcher: watch.NewFake(), - stop: func() { + stopFunc: func() { cancel(errors.New("time to stop")) }, } @@ -648,7 +710,7 @@ type fakeListWatcher struct { watchCounter int closeAfterWatchRequests int closeAfterListRequests int - stop func() + stopFunc func() requestOptions []metav1.ListOptions @@ -660,7 +722,7 @@ func (lw *fakeListWatcher) List(options metav1.ListOptions) (runtime.Object, err lw.listCounter++ lw.requestOptions = append(lw.requestOptions, options) if lw.listCounter == lw.closeAfterListRequests { - lw.stop() + lw.stopFunc() } if lw.customListResponse != nil { return lw.customListResponse, nil @@ -672,7 +734,7 @@ func (lw *fakeListWatcher) Watch(options metav1.ListOptions) (watch.Interface, e lw.watchCounter++ lw.requestOptions = append(lw.requestOptions, options) if lw.watchCounter == lw.closeAfterWatchRequests { - lw.stop() + lw.stopFunc() } if lw.watchOptionsPredicate != nil { if err := lw.watchOptionsPredicate(options); err != nil { @@ -690,3 +752,15 @@ func (lw *fakeListWatcher) StopAndRecreateWatch() { lw.fakeWatcher.Stop() lw.fakeWatcher = watch.NewFake() } + +func (lw *fakeListWatcher) Stop() { + lw.lock.Lock() + defer lw.lock.Unlock() + lw.fakeWatcher.Stop() +} + +func (lw *fakeListWatcher) ResultChan() <-chan watch.Event { + lw.lock.Lock() + defer lw.lock.Unlock() + return lw.fakeWatcher.ResultChan() +}