diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector.go b/staging/src/k8s.io/client-go/tools/cache/reflector.go index 119f80ac45f..099eeabbdda 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector.go @@ -764,7 +764,7 @@ loop: } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion - if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok { + if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" { if exitOnInitialEventsEndBookmark != nil { *exitOnInitialEventsEndBookmark = true } diff --git a/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go b/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go index ae1750c7bb7..4b2d0f87e06 100644 --- a/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go +++ b/staging/src/k8s.io/client-go/tools/cache/reflector_watchlist_test.go @@ -350,6 +350,27 @@ func TestWatchList(t *testing.T) { expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p3", "3")}, expectedError: apierrors.NewResourceExpired("rv already expired"), }, + { + name: "prove that the reflector is checking the value of the initialEventsEnd annotation", + closeAfterWatchEvents: 3, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Added, Object: makePod("p2", "2")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{"k8s.io/initial-events-end": "false"}, + }, + }}, + }, + expectedWatchRequests: 1, + expectedRequestOptions: []metav1.ListOptions{{ + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }}, + }, } for _, s := range scenarios { t.Run(s.name, func(t *testing.T) {