From d0ea06d5972fd7b32a1756ce08c78c6d196914d1 Mon Sep 17 00:00:00 2001 From: Lukasz Szaszkiewicz Date: Tue, 19 Sep 2023 12:59:23 +0200 Subject: [PATCH] cache/reflector: check the value of the initial-events-end annotation Kubernetes-commit: 04668c00432cbd552b08eb94f829643facbd2061 --- tools/cache/reflector.go | 2 +- tools/cache/reflector_watchlist_test.go | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/tools/cache/reflector.go b/tools/cache/reflector.go index 119f80ac..099eeabb 100644 --- a/tools/cache/reflector.go +++ b/tools/cache/reflector.go @@ -764,7 +764,7 @@ loop: } case watch.Bookmark: // A `Bookmark` means watch has synced here, just update the resourceVersion - if _, ok := meta.GetAnnotations()["k8s.io/initial-events-end"]; ok { + if meta.GetAnnotations()["k8s.io/initial-events-end"] == "true" { if exitOnInitialEventsEndBookmark != nil { *exitOnInitialEventsEndBookmark = true } diff --git a/tools/cache/reflector_watchlist_test.go b/tools/cache/reflector_watchlist_test.go index ae1750c7..4b2d0f87 100644 --- a/tools/cache/reflector_watchlist_test.go +++ b/tools/cache/reflector_watchlist_test.go @@ -350,6 +350,27 @@ func TestWatchList(t *testing.T) { expectedStoreContent: []v1.Pod{*makePod("p1", "1"), *makePod("p3", "3")}, expectedError: apierrors.NewResourceExpired("rv already expired"), }, + { + name: "prove that the reflector is checking the value of the initialEventsEnd annotation", + closeAfterWatchEvents: 3, + watchEvents: []watch.Event{ + {Type: watch.Added, Object: makePod("p1", "1")}, + {Type: watch.Added, Object: makePod("p2", "2")}, + {Type: watch.Bookmark, Object: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + ResourceVersion: "2", + Annotations: map[string]string{"k8s.io/initial-events-end": "false"}, + }, + }}, + }, + expectedWatchRequests: 1, + expectedRequestOptions: []metav1.ListOptions{{ + SendInitialEvents: pointer.Bool(true), + AllowWatchBookmarks: true, + ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan, + TimeoutSeconds: pointer.Int64(1), + }}, + }, } for _, s := range scenarios { t.Run(s.name, func(t *testing.T) {